admin 管理员组文章数量: 1086019
c#服务器后端
1:RabbitMQ是个啥?(专业术语参考自网络)
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库
2:使用RabbitMQ有啥好处?
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。
对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。
RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,
3:RabbitMq的安装以及环境搭建等:
网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!
3.1:运行容器的命令如下:
docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management
4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?
4.1什么时候使用MQ?
1)数据驱动的任务依赖
2)上游不关心多下游执行结果
3)异步返回执行时间长
4.2什么时候不使用MQ?
需要实时关注执行结果 (eg:同步调用)
5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)
6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:
Code:
1 //简单生产端 ui调用者 2 3 using System; 4 namespace RabbitMqPublishDemo 5 { 6 using MyRabbitMqService; 7 using System.Runtime.CompilerServices; 8 9 class Program10 {11 static void Main(string[] args)12 {13 //就是简单的队列,生产者14 Console.WriteLine("====RabbitMqPublishDemo====");15 for (int i = 0; i < 500; i++)16 {17 ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");18 }19 Console.WriteLine("生成完毕!");20 Console.ReadLine();21 }22 }23 }24 25 /// 26 /// 简单生产者 逻辑27 /// 28 /// 29 /// 30 public static void PublishSampleMsg(string queueName, string msg)31 {32 33 using (IConnection conn = connectionFactory.CreateConnection())34 {35 using (IModel channel = conn.CreateModel())36 {37 channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);38 var msgBody = Encoding.UTF8.GetBytes(msg);39 channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);40 }41 }42 }43 44 45 //简单消费端46 using System;47 48 namespace RabbitMqConsumerDemo49 {50 using MyRabbitMqService;51 using System.Runtime.InteropServices;52 53 class Program54 {55 static void Main(string[] args)56 {57 Console.WriteLine("====RabbitMqConsumerDemo====");58 ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>59 {60 Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");61 });62 Console.ReadLine();63 }64 }65 }66 67 #region 简单生产者后端逻辑68 /// 69 /// 简单消费者70 /// 71 /// 队列名称72 /// 失败后是否自动放到队列73 /// 有就自己对字符串的处理,如果要存储到数据库请自行扩展74 public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false, 75 {76 Console.WriteLine("ConsumeSampleMsg Waiting for messages....");77 IConnection conn = connectionFactory.CreateConnection();78 IModel channel = conn.CreateModel();79 channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);80 var consumer = new EventingBasicConsumer(channel);81 consumer.Received += (sender, ea) =>82 {83 byte[] bymsg = ea.Body.ToArray();84 string msg = Encoding.UTF8.GetString(bymsg);85 if (handleMsgStr != null)86 {87 handleMsgStr.Invoke(msg);88 }89 else90 {91 Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");92 }93 };94 channel.BasicConsume(queueName, autoAck: true, consumer);95 }96 #endregion97 98
7:Work模式
1 //就如下的code, 多次生产,3个消费者都可以自动开始消费 2 3 //生产者 4 using System; 5 namespace RabbitMqPublishDemo 6 { 7 using MyRabbitMqService; 8 using System.Runtime.CompilerServices; 9 class Program10 {11 static void Main(string[] args)12 {13 for (int i = 0; i < 500; i++)14 {15 ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :发布消息成功{i}");16 }17 Console.WriteLine("工作队列模式 生成完毕......!"); 18 Console.ReadLine();19 }20 }21 }22 23 //生产者后端逻辑24 public static void PublishWorkQueueModel(string queueName, string msg)25 {26 using (var connection = connectionFactory.CreateConnection())27 using (var channel = connection.CreateModel())28 {29 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);30 var body = Encoding.UTF8.GetBytes(msg);31 var properties = channel.CreateBasicProperties();32 properties.Persistent = true;33 34 channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);35 Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");36 }37 }38 39 //work消费端40 using System;41 42 namespace RabbitMqConsumerDemo43 {44 using MyRabbitMqService;45 using System.Runtime.InteropServices;46 class Program47 {48 static void Main(string[] args)49 {50 Console.WriteLine("====Work模式开启了====");51 ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>52 {53 Console.WriteLine($"work模式获取到消息{msg}");54 });55 Console.ReadLine();56 }57 }58 }59 60 //work后端逻辑61 public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)62 {63 var connection = connectionFactory.CreateConnection();64 var channel = connection.CreateModel();65 66 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);67 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);68 69 var consumer = new EventingBasicConsumer(channel);70 Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");71 72 consumer.Received += (sender, ea) =>73 {74 var body = ea.Body.ToArray();75 var message = Encoding.UTF8.GetString(body);76 if (handserMsg != null)77 {78 if (!string.IsNullOrEmpty(message))79 {80 handserMsg.Invoke(message);81 }82 }83 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);84 };85 channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);86 }
8:Fanout
Code:
1 //同一个消息会被多个订阅者消费 2 3 //发布者 4 using System; 5 6 namespace RabbitMqPublishDemo 7 { 8 using MyRabbitMqService; 9 using System.Runtime.CompilerServices;10 11 class Program12 {13 static void Main(string[] args)14 {15 16 #region 发布订阅模式,带上了exchange17 for (int i = 0; i < 500; i++)18 {19 ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"发布的消息是:{i}");20 }21 Console.WriteLine("发布ok!");22 #endregion23 Console.ReadLine();24 }25 }26 }27 //发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout28 public static void PublishExchangeModel(string exchangeName, string message)29 {30 using (var connection = connectionFactory.CreateConnection())31 using (var channel = connection.CreateModel())32 {33 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);34 var body = Encoding.UTF8.GetBytes(message);35 channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);36 Console.WriteLine($" Sent {message}");37 }38 }39 40 41 //订阅者42 using System;43 namespace RabbitMqConsumerDemo44 {45 using MyRabbitMqService;46 using System.Runtime.InteropServices;47 class Program48 {49 static void Main(string[] args)50 {51 52 #region 发布订阅模式 Exchange53 ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>54 {55 Console.WriteLine($"订阅到消息:{msg}");56 });57 #endregion58 Console.ReadLine();59 }60 }61 }62 63 //订阅者后端的逻辑64 public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)65 {66 var connection = connectionFactory.CreateConnection();67 var channel = connection.CreateModel();68 69 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉70 71 var queueName = channel.QueueDeclare().QueueName;72 channel.QueueBind(queue: queueName,73 exchange: exchangeName,74 routingKey: "");75 76 Console.WriteLine(" Waiting for msg....");77 78 var consumer = new EventingBasicConsumer(channel);79 consumer.Received += (model, ea) =>80 {81 var body = ea.Body.ToArray();82 var message = Encoding.UTF8.GetString(body);83 if (handlerMsg != null)84 {85 if (!string.IsNullOrEmpty(message))86 {87 handlerMsg.Invoke(message);88 }89 }90 else91 {92 Console.WriteLine($"订阅到消息:{message}");93 }94 };95 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);96 }
9:Direct
Code:
1 //发布者 2 using System; 3 4 namespace RabbitMqPublishDemo 5 { 6 using MyRabbitMqService; 7 using System.Runtime.CompilerServices; 8 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 #region 发布订阅 交换机路由模式 Direct 14 string routerKeyValue = args[0].Split("=")[1];//如 abc.exe --name='qq' 15 Console.WriteLine("开始发布中。。。"); 16 for (int i = 0; i < 20; i++) 17 { 18 string msg = $"小明有{i}只宝剑"; 19 ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg, routerKey: routerKeyValue); 20 21 //下面的为固定的写法 22 //ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg); 23 //ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish($"你好我好大家好{i}", routerKey:"onlylog"); 24 } 25 Console.WriteLine("这次发布完毕。。。"); 26 #endregion 27 Console.ReadLine(); 28 } 29 } 30 } 31 32 //发布者后端逻辑 发布订阅的路由模式 Direct 33 /// 34 /// 发布 Direct 路由模式 Direct 35 /// 36 /// 37 /// 38 /// 39 public static void ExchangeRoutersByDirectModelPublish(string message, string exchangeName = "qqai", string routerKey = "insertToStudent") 40 { 41 using (IConnection connection = connectionFactory.CreateConnection()) 42 { 43 using (IModel channelmodel = connection.CreateModel()) 44 { 45 channelmodel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct); 46 byte[] bymsg = Encoding.UTF8.GetBytes(message); 47 channelmodel.BasicPublish(exchange: exchangeName, routingKey: routerKey, body: bymsg); 48 49 // byte[] bytemsg = Encoding.UTF8.GetBytes(message); 50 // channelmodel.BasicPublish(exchange: exchangeName,routingKey: routerKey,basicProperties: null,body: bytemsg); 51 } 52 } 53 } 54 55 //订阅者 Exchange Router路由 Director 56 using System; 57 58 namespace RabbitMqConsumerDemo 59 { 60 using MyRabbitMqService; 61 using System.Runtime.InteropServices; 62 63 class Program 64 { 65 static void Main(string[] args) 66 { 67 Console.WriteLine("开始消费中。。!"); 68 if (args.Length > 0) 69 { 70 string routerKeyValue = args[0].Split("=")[1]; 71 Console.WriteLine($"routerKey=>{routerKeyValue}"); 72 if (!string.IsNullOrEmpty(routerKeyValue)) 73 ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(routerKey: routerKeyValue, handler: msg => 74 { 75 Console.WriteLine($"拿到消息:{msg}"); 76 }); 77 else 78 Console.WriteLine("没有获取到routerKey !"); 79 } 80 //else 81 //{ 82 // ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(handler: msg => 83 // { 84 // Console.WriteLine($"拿到消息:{msg}"); 85 // }); 86 //} 87 Console.ReadLine(); 88 } 89 } 90 } 91 92 //订阅者 Exchange Router路由 Director 后端逻辑 93 public static void ExchangeRoutersByDirectModelConsumer(string exchangeName = "qqai", string routerKey = "insertToStudent", Action<string> handler = null) 94 { 95 var connection = connectionFactory.CreateConnection(); 96 var channel = connection.CreateModel(); 97 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct); 98 var queueName = channel.QueueDeclare().QueueName; 99 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routerKey);100 101 Console.WriteLine("wating for message...!");102 var consumer = new EventingBasicConsumer(channel);103 //(object sender, BasicDeliverEventArgs e)104 consumer.Received += (sender, e) =>105 {106 var bytedata = e.Body.ToArray();107 var getRoutekey = e.RoutingKey;108 string msg = Encoding.UTF8.GetString(bytedata);109 if (handler != null)110 handler.Invoke(msg);111 else112 Console.WriteLine($"路由{getRoutekey},订阅到消息{msg}!");113 };114 channel.BasicConsume(queue: queueName, autoAck: true, consumer);115 }116
需要完整的code,可以留言获取!
如有疑问或者错误的地方,请跟帖,本人会第一时间答复以及相互学习,谢谢!个人会不断的上传自己的学习心得!
我的博客园地址:
微信搜索“DotNet开发跳槽”或扫描文章底部二维码关注公众:回复“.NET5牛”领取海量学习资料。
版权申明:本文来源于网友收集或网友提供,如果有侵权,请转告版主或者留言,本公众号立即删除。
本文标签: c服务器后端
版权声明:本文标题:c#服务器后端 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/b/1693409368a220262.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论