admin 管理员组

文章数量: 1087649

MQ消息队列中间件:

MQ消息队列中间件:

微服务间通讯有同步和异步两种方式:

同步通讯:就像打电话,需要实时响应。

异步通讯:就像发信息,不需要马上回复。

同步调用的优点:
  • 时效性较强,可以立即得到结果

同步调用的问题:

  • 耦合度高(新需求要改源码)
  • 性能和吞吐能力下降(等待各服务链都完成才响应返回)
  • 有额外的资源消耗(等待过程占用资源)
  • 有级联失败问题(服务提供者出现问题,调用方也会出问题)

异步调用通过事件驱动Broker可以很好解决同步调用的缺点,broker相当于中间人,发送发和接收方不直接通信,发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。

异步调用的优点:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速

  • 故障隔离:服务没有直接调用,不存在级联失败问题

  • 调用间没有阻塞,不会造成无效的资源占用

  • 耦合度极低,每个服务都可以灵活插拔,可替换

  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理,追踪困难
  • 需要依赖于Broker的可靠、安全、性能

MQ(MessageQueue)消息队列,也就是事件驱动架构中的Broker。

比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

入门:

安装Rabbit:在centos7虚拟机中使用docker安装

1、在线拉取:

docker pull rabbitmq:3-management

2、安装:

docker run \ -e RABBITMQ_DEFAULT_USER=root \ -e RABBITMQ_DEFAULT_PASS=123456 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management

(报错:IPv4 forwarding is disabled. Networking will not work.网上一大堆解决方法)

访问(输入自己设置的用户密码):

RabbitMQ中的一些角色:

  • publisher:生产者
  • consumer:消费者
  • exchange个:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

案例:

官方给出了六个入门案例:

简单实现案例一:

两个springboot的model

comsumer的Test中有:
``

package cn.itcast.mq.helloworld;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.58.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("123456");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

publisher的Test中有:

package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.58.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("123456");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

先执行publisher的测试类,结果如下:

它会先创建连接,出现在connection中,然后创建通道,出现在Channels中,最后出现在队列Queues中

再执行comsumer中的测试类,结果如下:

此过程编写太过于复杂,可以用SpringAMQP来简化

SpringAMQP

AMQP:Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

SpringAMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接受消息。

用springAMP实现官方案例中的HelloWorld:

项目还是用的先前的那个mq-demo

在父工程中引入依赖:

<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

在publisher服务的application.yml中添加配置:

``

spring:rabbitmq:host: 192.168.58.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: root # 用户名password: 123456 # 密码

然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test001(){String queueName="simple.queue";String message="hello,spring Amqp!!!!!!!";rabbitTemplate.convertAndSend(queueName,message);}
}

运行测试类,结果:

comsumer接受消息:

与publisher一样,先配置application.yml,在创建一个类:

``

@Component
public class listener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

结果:

omponent
public class listener {

    @RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}

}


结果:
![在这里插入图片描述](.png)**跟着黑马学技术!!!**

本文标签: MQ消息队列中间件