admin 管理员组文章数量: 1086019
2024年3月13日发(作者:vs2010ultimate密钥)
用redis实现支持优先级的消息队列
系统中引入消息队列机制是对系统一个非常大的改善。例如一个web系统中,用户做了某
项操作后需要发送邮件通知到用户邮箱中。你可以使用同步方式让用户等待邮件发送完成后
反馈给用户,但是这样可能会因为网络的不确定性造成用户长时间的等待从而影响用户体
验。
有些场景下是不可能使用同步方式等待完成的,那些需要后台花费大量时间的操作。例如极
端例子,一个在线编译系统任务,后台编译完成需要30分钟。这种场景的设计不可能同步
等待后在回馈,必须是先反馈用户随后异步处理完成,再等待处理完成后根据情况再此反馈
用户与否。
另外适用消息队列的情况是那些系统处理能力有限的情况下,先使用队列机制把任务暂时存
放起来,系统再一个个轮流处理掉排队的任务。这样在系统吞吐量不足的情况下也能稳定的
处理掉高并发的任务。
消息队列可以用来做排队机制,只要系统需要用到排队机制的地方就可以使用消息队列来
作。
目前成熟的消息队列产品有很多,著名的例如rabbitmq。它使用起来相对还是比较简单的,
功能也相对比较丰富,一般场合下是完全够用的。但是有个很烦人的就是它不支持优先级。
例如一个发邮件的任务,某些特权用户希望它的邮件能够更加及时的发送出去,至少比普通
用户要优先对待。默认情况下rabbitmq是无法处理掉的,扔给rabbitmq的任务都是FIFO
先进先出。但是我们可以使用一些变通的技巧来支持这些优先级。创建多个队列,并为
rabbitmq的消费者设置相应的路由规则。
例如默认情况下有这样一个队列,我们拿list来模拟 [task1, task2, task3],消费者轮流
按照FIFO的原则一个个拿出task来处理掉。如果有高优先级的任务进来,它也只能跟在
最后被处理[task1, task2, task3, higitask1]. 但是如果使用两个队列,一个高优先级队
列,一个普通优先级队列。 普通优先级[task1, task2, task3], 高优先级[hightask1 ] 然
后我们设置消费者的路由让消费者随机从任意队列中取数据即可。
并且我们可以定义一个专门处理高优先级队列的消费者,它空闲的时候也不处理低优先级队
列的数据。这类似银行的VIP柜台,普通客户在银行取号排队,一个VIP来了他虽然没有
从取号机里拿出一个排在普通会员前面的票,但是他还是可以更快地直接走VIP通道。
使用rabbitmq来做支持优先级的消息队列的话,就像是上面所述同银行VIP会员一样,
走不同的通道。但是这种方式只是相对的优先级,做不到绝对的优先级控制,例如我希望某
一个优先级高的任务在绝对意义上要比其他普通任务优先处理掉,这样上面的方案是行不通
的。因为rabbitmq的消费者只知道再自己空闲的情况下从自己关心的队列中“随机”取某一
个队列里面的第一个数据来处理,它没法控制优先取找哪一个队列。或者更加细粒度的优先
级控制。或者你系统里面设置的优先级有10多种。这样使用rabbitmq也是很难实现的。
但是如果使用redis来做队列的话上面的需求都可以实现。
使用redis怎么做消息队列
首先redis它的设计是用来做缓存的,但是由于它自身的某种特性使得他可以用来做消息队
列。它有几个阻塞式的API可以使用,正是这些阻塞式的API让他有做消息队列的能力。
试想一下在”数据库解决所有问题“的思路下,不使用消息队列也是可以完成你的需求的。我
们把任务全部存放在数据库然后通过不断的轮询方式来取任务处理。这种做法虽然可以完成
你的任务但是做法很粗劣。但是如果你的数据库接口提供一个阻塞的方法那么就可以避免轮
询操作了,你的数据库也可以用来做消息队列,只不过目前的数据库还没有这样的接口。
另外做消息队列的其他特性例如FIFO也很容易实现,只需要一个List对象从头取数据,
从尾部塞数据即可实现。
redis能做消息队列得益于他list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些
接口。他们都是阻塞版的,所以可以用来做消息队列。
redis消息队列优先级的实现
一些基础redis基础知识的说明
redis> blpop tasklist 0"im task 01"
这个例子使用blpop命令会阻塞方式地从tasklist列表中取头一个数据,最后一个参数就
是等待超时的时间。如果设置为0则表示无限等待。另外redis存放的数据都只能是string
类型,所以在任务传递的时候只能是传递字符串。我们只需要简单的将负责数据序列化成
json格式的字符串,然后消费者那边再转换一下即可。
这里我们的示例语言使用python,链接redis的库使用redis-py. 如果你有些编程基础把
它切换成自己喜欢的语言应该是没问题的。
1.简单的FIFO队列
import redis, time
def handle(task):
print task
(4)
def main():
pool = tionPool(host='localhost', port=6379, db=0)
r = (connection_pool=pool)
while 1:
result = ('tasklist', 0)
handle(result[1])
if __name__ == "__main__":
main()
上例子即使一个最简单的消费者,我们通过一个无限循环不断地从redis的队列中取数据。
如果队列中没有数据则没有超时的阻塞在那里,有数据则取出往下执行。
一般情况取出来是个复杂的字符串,我们可能需要将其格式化后作为再传给处理函数,但是
为了简单我们的例子就是一个普通字符串。另外例子中的处理函数不做任何处理,仅仅
sleep 用来模拟耗时的操作。
我们另开一个redis的客户端来模拟生产者,自带的客户端就可以。多往tasklist 队列里
面塞上一些数据。
redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> lpush tasklist 'im task 03'
redis> lpush tasklist 'im task 04'
redis> lpush tasklist 'im task 05'
随后在消费者端便会看到这些模拟出来的任务被挨个消费掉。
2.简单优先级的队列
假设一种简单的需求,只需要高优先级的比低优先级的任务率先处理掉。其他任务之间的顺
序一概不管,这种我们只需要在在遇到高优先级任务的时候将它塞到队列的前头,而不是
push到最后面即可。
因为我们的队列是使用的redis的 list,所以很容易实现。遇到高优先级的使用rpush 遇到
低优先级的使用lpush
redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> rpush tasklist 'im high task 01'
redis> rpush tasklist 'im high task 01'
redis> lpush tasklist 'im task 03'
redis> rpush tasklist 'im high task 03'
随后会看到,高优先级的总是比低优先级的率先执行。但是这个方案的缺点是高优先级的任
务之间的执行顺序是先进后出的。
3.较为完善的队列
例子2中只是简单的将高优先级的任务塞到队列最前面,低优先级的塞到最后面。这样保
证不了高优先级任务之间的顺序。
假设当所有的任务都是高优先级的话,那么他们的执行顺序将是相反的。这样明显违背了队
列的FIFO原则。
不过只要稍加改进就可以完善我们的队列。
跟使用rabbitmq一样,我们设置两个队列,一个高优先级一个低优先级的队列。高优先级
任务放到高队列中,低的放在低优先队列中。redis和rabbitmq不同的是它可以要求队列
消费者从哪个队列里面先读。
def main():
pool = tionPool(host='localhost', port=6379, db=0)
r = (connection_pool=pool)
while 1:
result = (['high_task_queue', 'low_task_queue'], 0)
handle(result[1])
上面的代码,会阻塞地从'high_task_queue', 'low_task_queue'这两个队列里面取数据,
如果第一个没有再从第二个里面取。
所以只需要将队列消费者做这样的改进便可以达到目的。
redis> lpush low_task_queue low001
redis> lpush low_task_queue low002
redis> lpush low_task_queue low003
redis> lpush low_task_queue low004
redis> lpush high_task_queue low001
redis> lpush high_task_queue low002
redis> lpush high_task_queue low003
redis> lpush high_task_queue low004
通过上面的测试看到,高优先级的会被率先执行,并且高优先级之间也是保证了FIFO的原
则。
这种方案我们可以支持不同阶段的优先级队列,例如高中低三个级别或者更多的级别都可
以。
4.优先级级别很多的情况
假设有个这样的需求,优先级不是简单的高中低或者0-10这些固定的级别。而是类似
0-99999这么多级别。那么我们第三种方案将不太合适了。
虽然redis有sorted set这样的可以排序的数据类型,看是很可惜它没有阻塞版的接口。
于是我们还是只能使用list类型通过其他方式来完成目的。
有个简单的做法我们可以只设置一个队列,并保证它是按照优先级排序号的。然后通过二分
查找法查找一个任务合适的位置,并通过 lset 命令插入到相应的位置。
例如队列里面包含着写优先级的任务[1, 3, 6, 8, 9, 14],当有个优先级为7的任务过来,
我们通过自己的二分算法一个个从队列里面取数据出来反和目标数据比对,计算出相应的位
置然后插入到指定地点即可。
因为二分查找是比较快的,并且redis本身也都在内存中,理论上速度是可以保证的。但是
如果说数据量确实很大的话我们也可以通过一些方式来调优。
回想我们第三种方案,把第三种方案结合起来就会很大程度上减少开销。例如数据量十万的
队列,它们的优先级也是随机0-十万的区间。我们可以设置10个或者100个不同的队列,
0-一万的优先级任务投放到1号队列,一万-二万的任务投放到2号队列。这样将一个队列
按不同等级拆分后它单个队列的数据就减少许多,这样二分查找匹配的效率也会高一点。但
是数据所占的资源基本是不变的,十万数据该占多少内存还是多少。只是系统里面多了一些
队列而已。
消息队列的产品很多,什么RabbitMQ、ActiveMQ、收钱的Websphere MQ等等。因为项
目的缓存使用的是redis,所以公司的高手直接使用redis list实现了消息队列,网上也有文
章写了。这里顺便把整个事件服务框架都整理下。
事件服务本身就是一个thrift服务,服务端使用的语言是java。先看看消息队列的总体结构:
topic1(事件topic)
topic2
topic3
event1、
event1、
event1、
topic是事件的标题如:用户注册,event就是事件触发时提交的内容,我们这边使用的都是json格
式的对象。
有事件发布就有事件处理。
topic
当一个topic被触发时,事件的生产者发布这个topic的event( 存放在redis的list里面),事件
处理采用用定时轮询策略,定时取topic下的event有的话就顺序调用 eventHandle处理这个
event,因为这是个异步处理的过程,事件处理失败的话需要记录下来,以便后面处理。
首先看event发布的代码:
1 public void send(String topic, Object event) {
2 Topic t = ic(topic);
3 if (t != null && led()) {
4 List().leftPush(topic, event);
5 }
6 }
redis使用的是spring提供的RedisTemplate,这里就是向对应的事件topic list里面push
一个新的event。
取注册在这个topic下的事件处理程序,然后顺序执行。
1 private void poolRedisEvent() {
2 while (poolRunning) {
3 try {
4 e(new RedisCallback() {
5 @Override
6 public Object doInRedis(RedisConnection con) throws
7 DataAccessException {
8 if (y(rawKeys)) {
9 return null;
10 }
11 List
12 iveConnection().blpop(redisPoolPeriod, rawKeys);
13 if (y(list)) {
14 return null;
15 }
16 final String topic =
17 Serializer().deserialize((0));
18 final Object event =
19 ueSerializer().deserialize((1));
20 e(new Runnable() {
21 @Override
22 public void run() {
23 process(topic, null, event);
24 }
25 });
26 return null;
27 }
28 });
29 } catch (Throwable e) {
30 ("Event queue pool error," + sage());
31 try {
32 (5000);
33 } catch (InterruptedException ignored) {
}
}
}
}
看这段:
1 iveConnection().blpop(redisPoolPeriod, rawKeys);
redisPoolPeriod默认为30秒。rawKeys 为byte[][]类型存放所有topic。然后取topic和
event,异步调用eventHandle执行。
1 final String topic = Serializer().deserialize((0));
2 final Object event =
3 ueSerializer().deserialize((1));
4 e(new Runnable() {
5 @Override
6 public void run() {
7 process(topic, null, event);
8 }
});
进入process里面,处理事件时设置了超时事件,防止阻塞。
1 (new Runnable() {
2 @Override
3 public void run() {
4 process(topicId, handleId, context);
5 }
6
}).get(timeout, S); // timeout默认是10秒
现在看看真正的处理代码:
1 private void process(String topicId, String handleId, ContextImpl
2 context) {
3
Map
4
取所有注册的eventHandle
5 ......
6
if (handleId != null) {//指定了处理程序id,就单个调用
7 processHandle(handleId, (handleId),
8 ey(topicId).getHandleConfig(handleId), context);
9 } else {
10 for (
11
{//顺序执行
12 handleId = ();
13 HandleConfig config =
ey(topicId).getHandleConfig(handleId);
processHandle(handleId, ue(), config,
context);
}
} }
应用场景
最近在公司做项目,需要对聊天内容进行存储,考虑到数据库查询的IO连接数高、连
接频繁的因素,决定利用缓存做。
从网上了解到redis可以对所有的内容进行二进制的存储,而java是可以对所有对象
进行序列化的,序列化的方法会在下面的代码中提供实现。
序列化
这里我编写了一个java序列化的工具,主要是对对象转换成byte[],和根据byte[]
数组反序列化成java对象;
主要是用到了ByteArrayOutputStream和ByteArrayInputStream;
需要注意的是每个自定义的需要序列化的对象都要实现Serializable接口;
其代码如下:
1 package ;
2
3 import rayInputStream;
4 import rayOutputStream;
5 import ption;
6 import InputStream;
7 import OutputStream;
8 public class ObjectUtil {
9
/**对象转byte[]
10 * @param obj
11 * @return
12 * @throws IOException
13 */
14 public static byte[] objectToBytes(Object obj) throws Exception{
15 ByteArrayOutputStream bo = new ByteArrayOutputStream();
16 ObjectOutputStream oo = new ObjectOutputStream(bo);
17 bject(obj);
18 byte[] bytes = Array();
19 ();
20 ();
21 return bytes;
22 }
23
/**byte[]转对象
24 * @param bytes
25 * @return
26 * @throws Exception
27 */
28 public static Object bytesToObject(byte[] bytes) throws Exception{
29 ByteArrayInputStream in = new ByteArrayInputStream(bytes);
30 ObjectInputStream sIn = new ObjectInputStream(in);
31 return ject();
32 }
33 }
定义一个消息类,主要用于接收消息内容和消息下表的设置。
1 package ;
2
3 import izable;
4
5
/**定义消息类接收消息内容和设置消息的下标
6 * @author lenovo
7 *
8 */
9 public class Message implements Serializable{
10 private static final long serialVersionUID = 7792729L;
11 private int id;
12 private String content;
13 public int getId() {
14 return id;
15 }
16 public void setId(int id) {
17 = id;
18 }
19 public String getContent() {
20 return content;
21 }
22 public void setContent(String content) {
23 t = content;
24 }
25 }
利用redis做队列,我们采用的是redis中list的push和pop操作;
结合队列的特点:
只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则
redis中lpush(rpop)或rpush(lpop)可以满足要求,而redis中list 里要push或
pop的对象仅需要转换成byte[]即可
java采用Jedis进行redis的存储和redis的连接池设置
1 package ;
2
3 import ;
4 import ;
5 import ;
6
7 import ;
8 import ool;
9 import oolConfig;
1
0 public class JedisUtil {
1
1 private static String JEDIS_IP;
1 private static int JEDIS_PORT;
2 private static String JEDIS_PASSWORD;
1 //private static String JEDIS_SLAVE;
3
1 private static JedisPool jedisPool;
4
1 static {
5 Configuration conf = tance();
1 JEDIS_IP = ing("", "127.0.0.1");
6 JEDIS_PORT = ("", 6379);
1 JEDIS_PASSWORD = ing("rd", null);
7 JedisPoolConfig config = new JedisPoolConfig();
1 Active(5000);
8 Idle(256);//20
1 Wait(5000L);
9 tOnBorrow(true);
2 tOnReturn(true);
0 tWhileIdle(true);
2 EvictableIdleTimeMillis(60000l);
1 eBetweenEvictionRunsMillis(3000l);
2 TestsPerEvictionRun(-1);
2 jedisPool = new JedisPool(config, JEDIS_IP, JEDIS_PORT, 60000);
2 }
3
2 /**
4
* 获取数据
2 * @param key
5 * @return
2 */
6 public static String get(String key) {
2
7 String value = null;
2 Jedis jedis = null;
8 try {
2 jedis = ource();
9 value = (key);
3 } catch (Exception e) {
0
//释放redis对象
3 BrokenResource(jedis);
1 tackTrace();
3 } finally {
2
//返还到连接池
3 close(jedis);
3 }
3
4 return value;
3 }
5
3 public static void close(Jedis jedis) {
6 try {
3 Resource(jedis);
7
3 } catch (Exception e) {
8 if (ected()) {
3 ();
9 nect();
4 }
0 }
4 }
1
4 /**
2
* 获取数据
4 *
3 * @param key
4 * @return
4 */
4 public static byte[] get(byte[] key) {
5
4 byte[] value = null;
6 Jedis jedis = null;
4 try {
7 jedis = ource();
4 value = (key);
8 } catch (Exception e) {
4
//释放redis对象
9 BrokenResource(jedis);
5 tackTrace();
0 } finally {
5
//返还到连接池
1 close(jedis);
5 }
2
5 return value;
3 }
5
4 public static void set(byte[] key, byte[] value) {
5
5 Jedis jedis = null;
5 try {
6 jedis = ource();
5 (key, value);
7 } catch (Exception e) {
5
//释放redis对象
8 BrokenResource(jedis);
5 tackTrace();
9 } finally {
6
//返还到连接池
0 close(jedis);
6 }
1 }
6
2 public static void set(byte[] key, byte[] value, int time) {
6
3 Jedis jedis = null;
6 try {
4 jedis = ource();
6 (key, value);
5 (key, time);
6 } catch (Exception e) {
6
//释放redis对象
6 BrokenResource(jedis);
7 tackTrace();
6 } finally {
8
//返还到连接池
6 close(jedis);
9 }
7 }
0
7 public static void hset(byte[] key, byte[] field, byte[] value) {
1 Jedis jedis = null;
7 try {
2 jedis = ource();
7 (key, field, value);
3 } catch (Exception e) {
7
//释放redis对象
4 BrokenResource(jedis);
7 tackTrace();
5 } finally {
7
//返还到连接池
6 close(jedis);
7 }
7 }
7
8 public static void hset(String key, String field, String value) {
7 Jedis jedis = null;
9 try {
8 jedis = ource();
0 (key, field, value);
8 } catch (Exception e) {
1
//释放redis对象
8 BrokenResource(jedis);
2 tackTrace();
8 } finally {
3
//返还到连接池
8 close(jedis);
4 }
8 }
5
8 /**
6
* 获取数据
8 *
7 * @param key
8 * @return
8 */
8 public static String hget(String key, String field) {
9
9 String value = null;
0 Jedis jedis = null;
9 try {
1 jedis = ource();
9 value = (key, field);
2 } catch (Exception e) {
9
//释放redis对象
3 BrokenResource(jedis);
9 tackTrace();
4 } finally {
9
//返还到连接池
5 close(jedis);
9 }
6
9 return value;
7 }
9
8 /**
9
* 获取数据
9 *
1 * @param key
0 * @return
0 */
1 public static byte[] hget(byte[] key, byte[] field) {
0
1 byte[] value = null;
1 Jedis jedis = null;
0 try {
2 jedis = ource();
1 value = (key, field);
0 } catch (Exception e) {
3
//释放redis对象
1 BrokenResource(jedis);
0 tackTrace();
4 } finally {
1
//返还到连接池
0 close(jedis);
5 }
1
0 return value;
6 }
1
0 public static void hdel(byte[] key, byte[] field) {
7
1 Jedis jedis = null;
0 try {
8 jedis = ource();
1 (key, field);
0 } catch (Exception e) {
9
//释放redis对象
1 BrokenResource(jedis);
1 tackTrace();
0 } finally {
1
//返还到连接池
1 close(jedis);
1 }
1 }
1
2 /**
1
* 存储REDIS队列 顺序存储
1
* @param byte[] key reids键名
3
* @param byte[] value 键值
1 */
1 public static void lpush(byte[] key, byte[] value) {
4
1 Jedis jedis = null;
1 try {
5
1 jedis = ource();
1 (key, value);
6
1 } catch (Exception e) {
1
7
//释放redis对象
1 BrokenResource(jedis);
1 tackTrace();
8
1 } finally {
1
9
//返还到连接池
1 close(jedis);
2
0 }
1 }
2
1 /**
1
* 存储REDIS队列 反向存储
2
* @param byte[] key reids键名
2
* @param byte[] value 键值
1 */
2 public static void rpush(byte[] key, byte[] value) {
3
1 Jedis jedis = null;
2 try {
4
1 jedis = ource();
2 (key, value);
5
1 } catch (Exception e) {
2
6
//释放redis对象
1 BrokenResource(jedis);
2 tackTrace();
7
1 } finally {
2
8
//返还到连接池
1 close(jedis);
2
9 }
1 }
3
0 /**
1
* 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
3
* @param byte[] key reids键名
1
* @param byte[] value 键值
1 */
3 public static void rpoplpush(byte[] key, byte[] destination) {
2
1 Jedis jedis = null;
3 try {
3
1 jedis = ource();
3 ush(key, destination);
4
1 } catch (Exception e) {
3
5
//释放redis对象
1 BrokenResource(jedis);
3 tackTrace();
6
1 } finally {
3
7
//返还到连接池
1 close(jedis);
3
8 }
1 }
3
9 /**
1
* 获取队列数据
4
* @param byte[] key 键名
0 * @return
1 */
4 public static List
1
1 List
4 Jedis jedis = null;
2 try {
1
4 jedis = ource();
3 list = (key, 0, -1);
1
4 } catch (Exception e) {
4
1
//释放redis对象
4 BrokenResource(jedis);
5 tackTrace();
1
4 } finally {
6
1
//返还到连接池
4 close(jedis);
7
1 }
4 return list;
8 }
1
4 /**
9
* 获取队列数据
1
* @param byte[] key 键名
5 * @return
0 */
1 public static byte[] rpop(byte[] key) {
5
1 byte[] bytes = null;
1 Jedis jedis = null;
5 try {
2
1 jedis = ource();
5 bytes = (key);
3
1 } catch (Exception e) {
5
4
//释放redis对象
1 BrokenResource(jedis);
5 tackTrace();
5
1 } finally {
5
6
//返还到连接池
1 close(jedis);
5
7 }
1 return bytes;
5 }
8
1 public static void hmset(Object key, Map
5 Jedis jedis = null;
9 try {
1 jedis = ource();
6 (ng(), hash);
0 } catch (Exception e) {
1
//释放redis对象
6 BrokenResource(jedis);
1 tackTrace();
1
6 } finally {
2
//返还到连接池
1 close(jedis);
6
3 }
1 }
6
4 public static void hmset(Object key, Map
1time) {
6 Jedis jedis = null;
5 try {
1
6 jedis = ource();
6 (ng(), hash);
1 (ng(), time);
6 } catch (Exception e) {
7
//释放redis对象
1 BrokenResource(jedis);
6 tackTrace();
8
1 } finally {
6
//返还到连接池
9 close(jedis);
1
7 }
0 }
1
7 public static List
1 List
1 Jedis jedis = null;
7 try {
2
1 jedis = ource();
7 result = (ng(), fields);
3
1 } catch (Exception e) {
7
//释放redis对象
4 BrokenResource(jedis);
1 tackTrace();
7
5 } finally {
1
//返还到连接池
7 close(jedis);
6
1 }
7 return result;
7 }
1
7 public static Set
8 Set
1 Jedis jedis = null;
7 try {
9 jedis = ource();
1 result = (key);
8
0 } catch (Exception e) {
1
//释放redis对象
8 BrokenResource(jedis);
1 tackTrace();
1
8 } finally {
2
//返还到连接池
1 close(jedis);
8
3 }
1 return result;
8 }
4
1 public static List
8 List
5 Jedis jedis = null;
1 try {
8 jedis = ource();
6 result = (key, from, to);
1
8 } catch (Exception e) {
7
//释放redis对象
1 BrokenResource(jedis);
8 tackTrace();
8
1 } finally {
8
//返还到连接池
9 close(jedis);
1
9 }
0 return result;
1 }
9
1 public static Map
1 Map
9 Jedis jedis = null;
2 try {
1 jedis = ource();
9 result = l(key);
3 } catch (Exception e) {
1
//释放redis对象
9 BrokenResource(jedis);
4 tackTrace();
1
9 } finally {
5
//返还到连接池
1 close(jedis);
9 }
6 return result;
1 }
9
7 public static void del(byte[] key) {
1
9 Jedis jedis = null;
8 try {
1 jedis = ource();
9 (key);
9 } catch (Exception e) {
2
//释放redis对象
0 BrokenResource(jedis);
0 tackTrace();
2 } finally {
0
//返还到连接池
1 close(jedis);
2 }
0 }
2
2 public static long llen(byte[] key) {
0
3 long len = 0;
2 Jedis jedis = null;
0 try {
4 jedis = ource();
2 (key);
0 } catch (Exception e) {
5
//释放redis对象
2 BrokenResource(jedis);
0 tackTrace();
6 } finally {
2
//返还到连接池
0 close(jedis);
7 }
2 return len;
0 }
8
2}
0
9 ing>
2
1
0
2
1
1
2
1
2
2
1
3
2
1
4
2
1
5
2
1
6
2
1
7
2
1
8
2
1
9
2
2
0
2
2
1
2
2
2
2
2
3
2
2
4
2
2
5
2
2
6
2
2
7
2
2
8
2
2
9
2
3
0
2
3
1
2
3
2
2
3
3
2
3
4
2
3
5
2
3
6
2
3
7
2
3
8
2
3
9
2
4
0
2
4
1
2
4
2
2
4
3
2
4
4
2
4
5
2
4
6
2
4
7
2
4
8
2
4
9
2
5
0
2
5
1
2
5
2
2
5
3
2
5
4
2
5
5
2
5
6
2
5
7
2
5
8
2
5
9
2
6
0
2
6
1
2
6
2
2
6
3
2
6
4
2
6
5
2
6
6
2
6
7
2
6
8
2
6
9
2
7
0
2
7
1
2
7
2
2
7
3
2
7
4
2
7
5
2
7
6
2
7
7
2
7
8
2
7
9
2
8
0
2
8
1
2
8
2
2
8
3
2
8
4
2
8
5
2
8
6
2
8
7
2
8
8
2
8
9
2
9
0
2
9
1
2
9
2
2
9
3
2
9
4
2
9
5
2
9
6
2
9
7
2
9
8
2
9
9
3
0
0
3
0
1
3
0
2
3
0
3
3
0
4
3
0
5
3
0
6
3
0
7
3
0
8
3
0
9
3
1
0
3
1
1
3
1
2
3
1
3
3
1
4
3
1
5
3
1
6
3
1
7
3
1
8
3
1
9
3
2
0
3
2
1
3
2
2
3
2
3
3
2
4
3
2
5
3
2
6
3
2
7
3
2
8
3
2
9
3
3
0
3
3
1
3
3
2
3
3
3
3
3
4
3
3
5
3
3
6
3
3
7
3
3
8
3
3
9
3
4
0
3
4
1
3
4
2
3
4
3
3
4
4
3
4
5
3
4
6
3
4
7
3
4
8
3
4
9
3
5
0
3
5
1
3
5
2
3
5
3
3
5
4
3
5
5
3
5
6
3
5
7
3
5
8
3
5
9
3
6
0
3
6
1
3
6
2
3
6
3
3
6
4
3
6
5
3
6
6
3
6
7
3
6
8
3
6
9
3
7
0
3
7
1
3
7
2
3
7
3
3
7
4
3
7
5
3
7
6
3
7
7
3
7
8
3
7
9
3
8
0
3
8
1
3
8
2
3
8
3
3
8
4
3
8
5
3
8
6
3
8
7
3
8
8
3
8
9
3
9
0
3
9
1
3
9
2
3
9
3
3
9
4
3
9
5
3
9
6
3
9
7
3
9
8
3
9
9
4
0
0
4
0
1
4
0
2
4
0
3
4
0
4
4
0
5
4
0
6
4
0
7
4
0
8
4
0
9
4
1
0
4
1
1
4
1
2
4
1
3
4
1
4
4
1
5
4
1
6
4
1
7
4
1
8
4
1
9
4
2
0
4
2
1
4
2
2
4
2
3
4
2
4
4
2
5
4
2
6
4
2
7
4
2
8
4
2
9
4
3
0
4
3
1
4
3
2
4
3
3
4
3
4
4
3
5
4
3
6
4
3
7
4
3
8
4
3
9
4
4
0
4
4
1
4
4
2
4
4
3
4
4
4
4
4
5
4
4
6
4
4
7
4
4
8
4
4
9
4
5
0
4
5
1
4
5
2
4
5
3
4
5
4
4
5
5
4
5
6
4
5
7
4
5
8
4
5
9
4
6
0
4
6
1
4
6
2
4
6
3
4
6
4
4
6
5
4
6
6
4
6
7
4
6
8
4
6
9
4
7
0
4
7
1
4
7
2
4
7
3
4
7
4
4
7
5
4
7
6
4
7
7
4
7
8
4
7
9
4
8
0
4
8
1
4
8
2
4
8
3
4
8
4
4
8
5
4
8
6
4
8
7
4
8
8
4
8
9
4
9
0
4
9
1
4
9
2
4
9
3
4
9
4
4
9
5
4
9
6
4
9
7
4
9
8
4
9
9
5
0
0
5
0
1
5
0
2
5
0
3
5
0
4
5
0
5
5
0
6
5
0
7
5
0
8
5
0
9
5
1
0
5
1
1
5
1
2
5
1
3
5
1
4
5
1
5
5
1
6
5
1
7
Configuration主要用于读取redis配置信息
1 package ;
2
3 import ption;
4 import tream;
5 import ties;
6
7 public class Configuration extends Properties {
8
9 private static final long serialVersionUID = 53222L;
10
11 private static Configuration instance = null;
12
13 public static synchronized Configuration getInstance() {
14 if (instance == null) {
15 instance = new Configuration();
16 }
17 return instance;
18 }
19
20 public String getProperty(String key, String defaultValue) {
21 String val = getProperty(key);
22 return (val == null || y()) ? defaultValue : val;
23 }
24
25 public String getString(String name, String defaultValue) {
26 return perty(name, defaultValue);
27 }
28
29 public int getInt(String name, int defaultValue) {
30 String val = perty(name);
31 return (val == null || y()) ? defaultValue :
32 nt(val);
33 }
34
35 public long getLong(String name, long defaultValue) {
36 String val = perty(name);
37 return (val == null || y()) ? defaultValue :
38 nt(val);
39 }
40
41 public float getFloat(String name, float defaultValue) {
42 String val = perty(name);
43 return (val == null || y()) ? defaultValue :
44 loat(val);
45 }
46
47 public double getDouble(String name, double defaultValue) {
48 String val = perty(name);
49 return (val == null || y()) ? defaultValue :
50 ouble(val);
51 }
52
53 public byte getByte(String name, byte defaultValue) {
54 String val = perty(name);
55 return (val == null || y()) ? defaultValue :
56 yte(val);
57 }
58
59 public Configuration() {
60 InputStream in =
61 temClassLoader().getResourceAsStream("");
62 try {
omXML(in);
();
} catch (IOException e) {
}
}
}
测试redis队列
1 package ;
2
3 import e;
4 import Util;
5 import til;
6
7 public class TestRedisQuene {
8 public static byte[] redisKey = "key".getBytes();
9 static{
10 init();
11 }
12 public static void main(String[] args) {
13 pop();
14 }
15
16 private static void pop() {
17 byte[] bytes = (redisKey);
18 Message msg = (Message) oObject(bytes);
19 if(msg != null){
20 n(()+" "+tent());
21 }
22 }
23
24 private static void init() {
25
Message msg1 = new Message(1, "内容1");
26 (redisKey, ToBytes(msg1));
27
Message msg2 = new Message(2, "内容2");
28 (redisKey, ToBytes(msg2));
29
Message msg3 = new Message(3, "内容3");
30 (redisKey, ToBytes(msg3));
31 }
32
33 }
34
测试结果如下:
1
1 内容1
1
2 内容2
1
3 内容3
版权声明:本文标题:用redis实现支持优先级的消息队列 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/b/1710279976a565864.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论