admin 管理员组

文章数量: 1086019

c#服务器后端

1:RabbitMQ是个啥?(专业术语参考自网络)

     RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

  RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库

2:使用RabbitMQ有啥好处?

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。

对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。

RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,

3:RabbitMq的安装以及环境搭建等:

   网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!

3.1:运行容器的命令如下:

docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management

4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?

4.1什么时候使用MQ?

1)数据驱动的任务依赖

2)上游不关心多下游执行结果

3)异步返回执行时间长

4.2什么时候不使用MQ?

需要实时关注执行结果 (eg:同步调用)

5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)

6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:

 

Code:

 1 //简单生产端 ui调用者 2  3 using System; 4 namespace RabbitMqPublishDemo 5 { 6     using MyRabbitMqService; 7     using System.Runtime.CompilerServices; 8  9     class Program10     {11         static void Main(string[] args)12         {13                 //就是简单的队列,生产者14                 Console.WriteLine("====RabbitMqPublishDemo====");15                 for (int i = 0; i < 500; i++)16                 {17                     ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");18                 }19                 Console.WriteLine("生成完毕!");20                 Console.ReadLine();21         }22     }23 }24 25 /// 26 /// 简单生产者 逻辑27 /// 28 /// 29 /// 30 public static void PublishSampleMsg(string queueName, string msg)31 {32 33     using (IConnection conn = connectionFactory.CreateConnection())34     {35         using (IModel channel = conn.CreateModel())36         {37             channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);38             var msgBody = Encoding.UTF8.GetBytes(msg);39             channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);40         }41     }42 }43         44 45 //简单消费端46 using System;47 48 namespace RabbitMqConsumerDemo49 {50     using MyRabbitMqService;51     using System.Runtime.InteropServices;52 53     class Program54     {55         static void Main(string[] args)56         {57             Console.WriteLine("====RabbitMqConsumerDemo====");58             ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>59             {60                 Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");61             });62             Console.ReadLine();63         }64     }65 }66 67      #region 简单生产者后端逻辑68         /// 69         /// 简单消费者70         /// 71         /// 队列名称72         /// 失败后是否自动放到队列73         /// 有就自己对字符串的处理,如果要存储到数据库请自行扩展74         public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false, 75         {76             Console.WriteLine("ConsumeSampleMsg Waiting for messages....");77             IConnection conn = connectionFactory.CreateConnection();78             IModel channel = conn.CreateModel();79             channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);80             var consumer = new EventingBasicConsumer(channel);81             consumer.Received += (sender, ea) =>82             {83                 byte[] bymsg = ea.Body.ToArray();84                 string msg = Encoding.UTF8.GetString(bymsg);85                 if (handleMsgStr != null)86                 {87                     handleMsgStr.Invoke(msg);88                 }89                 else90                 {91                     Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");92                 }93             };94             channel.BasicConsume(queueName, autoAck: true, consumer);95         }96         #endregion97         98

7:Work模式

 1 //就如下的code, 多次生产,3个消费者都可以自动开始消费 2  3 //生产者 4 using System; 5 namespace RabbitMqPublishDemo 6 { 7     using MyRabbitMqService; 8     using System.Runtime.CompilerServices; 9     class Program10     {11         static void Main(string[] args)12         {13             for (int i = 0; i < 500; i++)14             {15                 ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :发布消息成功{i}");16             }17             Console.WriteLine("工作队列模式 生成完毕......!");          18             Console.ReadLine();19         }20     }21 }22 23 //生产者后端逻辑24 public static void PublishWorkQueueModel(string queueName, string msg)25         {26             using (var connection = connectionFactory.CreateConnection())27             using (var channel = connection.CreateModel())28             {29                 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);30                 var body = Encoding.UTF8.GetBytes(msg);31                 var properties = channel.CreateBasicProperties();32                 properties.Persistent = true;33 34                 channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);35                 Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");36             }37         }38 39 //work消费端40 using System;41 42 namespace RabbitMqConsumerDemo43 {44     using MyRabbitMqService;45     using System.Runtime.InteropServices;46     class Program47     {48         static void Main(string[] args)49         {50             Console.WriteLine("====Work模式开启了====");51             ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>52             {53                 Console.WriteLine($"work模式获取到消息{msg}");54             });55             Console.ReadLine();56         }57     }58 }59 60 //work后端逻辑61        public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)62         {63             var connection = connectionFactory.CreateConnection();64             var channel = connection.CreateModel();65 66             channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);67             channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);68 69             var consumer = new EventingBasicConsumer(channel);70             Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");71 72             consumer.Received += (sender, ea) =>73             {74                 var body = ea.Body.ToArray();75                 var message = Encoding.UTF8.GetString(body);76                 if (handserMsg != null)77                 {78                     if (!string.IsNullOrEmpty(message))79                     {80                         handserMsg.Invoke(message);81                     }82                 }83                 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);84             };85             channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);86         }

8:Fanout

 Code:

 1 //同一个消息会被多个订阅者消费 2  3 //发布者 4 using System; 5  6 namespace RabbitMqPublishDemo 7 { 8     using MyRabbitMqService; 9     using System.Runtime.CompilerServices;10 11     class Program12     {13         static void Main(string[] args)14         {15 16             #region 发布订阅模式,带上了exchange17             for (int i = 0; i < 500; i++)18             {19                 ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"发布的消息是:{i}");20             }21             Console.WriteLine("发布ok!");22             #endregion23             Console.ReadLine();24         }25     }26 }27 //发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout28    public static void PublishExchangeModel(string exchangeName, string message)29         {30             using (var connection = connectionFactory.CreateConnection())31             using (var channel = connection.CreateModel())32             {33                 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);34                 var body = Encoding.UTF8.GetBytes(message);35                 channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);36                 Console.WriteLine($" Sent {message}");37             }38         }39 40 41 //订阅者42 using System;43 namespace RabbitMqConsumerDemo44 {45     using MyRabbitMqService;46     using System.Runtime.InteropServices;47     class Program48     {49         static void Main(string[] args)50         {51 52             #region 发布订阅模式 Exchange53             ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>54             {55                 Console.WriteLine($"订阅到消息:{msg}");56             });57             #endregion58             Console.ReadLine();59         }60     }61 }62 63 //订阅者后端的逻辑64  public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)65         {66             var connection = connectionFactory.CreateConnection();67             var channel = connection.CreateModel();68 69             channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉70 71             var queueName = channel.QueueDeclare().QueueName;72             channel.QueueBind(queue: queueName,73                               exchange: exchangeName,74                               routingKey: "");75 76             Console.WriteLine(" Waiting for msg....");77 78             var consumer = new EventingBasicConsumer(channel);79             consumer.Received += (model, ea) =>80             {81                 var body = ea.Body.ToArray();82                 var message = Encoding.UTF8.GetString(body);83                 if (handlerMsg != null)84                 {85                     if (!string.IsNullOrEmpty(message))86                     {87                         handlerMsg.Invoke(message);88                     }89                 }90                 else91                 {92                     Console.WriteLine($"订阅到消息:{message}");93                 }94             };95             channel.BasicConsume(queue: queueName,  autoAck: true, consumer: consumer);96         }

9:Direct

 

Code:

  1 //发布者  2 using System;  3   4 namespace RabbitMqPublishDemo  5 {  6     using MyRabbitMqService;  7     using System.Runtime.CompilerServices;  8   9     class Program 10     { 11         static void Main(string[] args) 12         { 13             #region 发布订阅 交换机路由模式 Direct 14             string routerKeyValue = args[0].Split("=")[1];//如 abc.exe --name='qq' 15             Console.WriteLine("开始发布中。。。"); 16             for (int i = 0; i < 20; i++) 17             { 18                 string msg = $"小明有{i}只宝剑"; 19                 ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg, routerKey: routerKeyValue); 20  21                 //下面的为固定的写法 22                 //ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg); 23                 //ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish($"你好我好大家好{i}", routerKey:"onlylog"); 24             } 25             Console.WriteLine("这次发布完毕。。。"); 26             #endregion 27             Console.ReadLine(); 28         } 29     } 30 } 31  32 //发布者后端逻辑 发布订阅的路由模式 Direct 33         ///  34         /// 发布 Direct 路由模式 Direct 35         ///  36         ///  37         ///  38         ///  39         public static void ExchangeRoutersByDirectModelPublish(string message, string exchangeName = "qqai", string routerKey = "insertToStudent") 40         { 41             using (IConnection connection = connectionFactory.CreateConnection()) 42             { 43                 using (IModel channelmodel = connection.CreateModel()) 44                 { 45                     channelmodel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct); 46                     byte[] bymsg = Encoding.UTF8.GetBytes(message); 47                     channelmodel.BasicPublish(exchange: exchangeName, routingKey: routerKey, body: bymsg); 48  49                     // byte[] bytemsg = Encoding.UTF8.GetBytes(message); 50                     //  channelmodel.BasicPublish(exchange: exchangeName,routingKey: routerKey,basicProperties: null,body: bytemsg); 51                 } 52             } 53         } 54          55 //订阅者 Exchange Router路由 Director 56 using System; 57  58 namespace RabbitMqConsumerDemo 59 { 60     using MyRabbitMqService; 61     using System.Runtime.InteropServices; 62  63     class Program 64     { 65         static void Main(string[] args) 66         { 67             Console.WriteLine("开始消费中。。!"); 68             if (args.Length > 0) 69             { 70                 string routerKeyValue = args[0].Split("=")[1]; 71                 Console.WriteLine($"routerKey=>{routerKeyValue}"); 72                 if (!string.IsNullOrEmpty(routerKeyValue)) 73                     ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(routerKey: routerKeyValue, handler: msg => 74                     { 75                         Console.WriteLine($"拿到消息:{msg}"); 76                     }); 77                 else 78                     Console.WriteLine("没有获取到routerKey !"); 79             } 80             //else 81             //{ 82             //    ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(handler: msg => 83             //    { 84             //        Console.WriteLine($"拿到消息:{msg}"); 85             //    }); 86             //} 87             Console.ReadLine(); 88         } 89     } 90 } 91  92 //订阅者 Exchange Router路由 Director 后端逻辑 93        public static void ExchangeRoutersByDirectModelConsumer(string exchangeName = "qqai", string routerKey = "insertToStudent", Action<string> handler = null) 94         { 95             var connection = connectionFactory.CreateConnection(); 96             var channel = connection.CreateModel(); 97             channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct); 98             var queueName = channel.QueueDeclare().QueueName; 99             channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routerKey);100 101             Console.WriteLine("wating for message...!");102             var consumer = new EventingBasicConsumer(channel);103             //(object sender, BasicDeliverEventArgs e)104             consumer.Received += (sender, e) =>105             {106                 var bytedata = e.Body.ToArray();107                 var getRoutekey = e.RoutingKey;108                 string msg = Encoding.UTF8.GetString(bytedata);109                 if (handler != null)110                     handler.Invoke(msg);111                 else112                     Console.WriteLine($"路由{getRoutekey},订阅到消息{msg}!");113             };114             channel.BasicConsume(queue: queueName, autoAck: true, consumer);115         }116 

 需要完整的code,可以留言获取!

如有疑问或者错误的地方,请跟帖,本人会第一时间答复以及相互学习,谢谢!个人会不断的上传自己的学习心得!

我的博客园地址:

微信搜索“DotNet开发跳槽”或扫描文章底部二维码关注公众:回复“.NET5牛”领取海量学习资料。

版权申明:本文来源于网友收集或网友提供,如果有侵权,请转告版主或者留言,本公众号立即删除。

本文标签: c服务器后端