admin 管理员组文章数量: 1087649
MQ消息队列中间件:
MQ消息队列中间件:
微服务间通讯有同步和异步两种方式:
同步通讯:就像打电话,需要实时响应。
异步通讯:就像发信息,不需要马上回复。
同步调用的优点:
- 时效性较强,可以立即得到结果
同步调用的问题:
- 耦合度高(新需求要改源码)
- 性能和吞吐能力下降(等待各服务链都完成才响应返回)
- 有额外的资源消耗(等待过程占用资源)
- 有级联失败问题(服务提供者出现问题,调用方也会出问题)
异步调用通过事件驱动Broker可以很好解决同步调用的缺点,broker相当于中间人,发送发和接收方不直接通信,发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。
异步调用的优点:
-
吞吐量提升:无需等待订阅者处理完成,响应更快速
-
故障隔离:服务没有直接调用,不存在级联失败问题
-
调用间没有阻塞,不会造成无效的资源占用
-
耦合度极低,每个服务都可以灵活插拔,可替换
-
流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
缺点:
- 架构复杂了,业务没有明显的流程线,不好管理,追踪困难
- 需要依赖于Broker的可靠、安全、性能
MQ(MessageQueue)消息队列,也就是事件驱动架构中的Broker。
比较常见的MQ实现:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
入门:
安装Rabbit:在centos7虚拟机中使用docker安装
1、在线拉取:
docker pull rabbitmq:3-management
2、安装:
docker run \ -e RABBITMQ_DEFAULT_USER=root \ -e RABBITMQ_DEFAULT_PASS=123456 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
(报错:IPv4 forwarding is disabled. Networking will not work.网上一大堆解决方法)
访问(输入自己设置的用户密码):
RabbitMQ中的一些角色:
- publisher:生产者
- consumer:消费者
- exchange个:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
案例:
官方给出了六个入门案例:
简单实现案例一:
两个springboot的model
comsumer的Test中有:
``
package cn.itcast.mq.helloworld;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.58.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("123456");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}
publisher的Test中有:
package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.58.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("123456");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}
先执行publisher的测试类,结果如下:
它会先创建连接,出现在connection中,然后创建通道,出现在Channels中,最后出现在队列Queues中
再执行comsumer中的测试类,结果如下:
此过程编写太过于复杂,可以用SpringAMQP来简化
SpringAMQP
AMQP:Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
SpringAMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接受消息。
用springAMP实现官方案例中的HelloWorld:
项目还是用的先前的那个mq-demo
在父工程中引入依赖:
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在publisher服务的application.yml中添加配置:
``
spring:rabbitmq:host: 192.168.58.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: root # 用户名password: 123456 # 密码
然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test001(){String queueName="simple.queue";String message="hello,spring Amqp!!!!!!!";rabbitTemplate.convertAndSend(queueName,message);}
}
运行测试类,结果:
comsumer接受消息:
与publisher一样,先配置application.yml,在创建一个类:
``
@Component
public class listener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}
结果:
omponent
public class listener {
@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}
结果:
**跟着黑马学技术!!!**
本文标签: MQ消息队列中间件
版权声明:本文标题:MQ消息队列中间件: 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/b/1686651569a20529.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论