博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
5.Redis消息订阅/发布
阅读量:4048 次
发布时间:2019-05-25

本文共 4848 字,大约阅读时间需要 16 分钟。

Redis可以很容的实现消息订阅/发布功能

 

一.JedisPubSub

需要实现一个JedisPubSub,相当于Redis消息的Listener

package com.gqshao.redis.channels;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import redis.clients.jedis.JedisPubSub;public class MyJedisPubSub extends JedisPubSub {    protected static Logger logger = LoggerFactory.getLogger(MyJedisPubSub.class);    // 取得订阅的消息后的处理      public void onMessage(String channel, String message) {        logger.info("取得订阅的消息后的处理 : " + channel + "=" + message);    }    // 初始化订阅时候的处理      public void onSubscribe(String channel, int subscribedChannels) {        logger.info("初始化订阅时候的处理 : " + channel + "=" + subscribedChannels);    }    // 取消订阅时候的处理      public void onUnsubscribe(String channel, int subscribedChannels) {        logger.info("取消订阅时候的处理 : " + channel + "=" + subscribedChannels);    }    // 初始化按表达式的方式订阅时候的处理      public void onPSubscribe(String pattern, int subscribedChannels) {        logger.info("初始化按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels);    }    // 取消按表达式的方式订阅时候的处理      public void onPUnsubscribe(String pattern, int subscribedChannels) {        logger.info(" 取消按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels);    }    // 取得按表达式的方式订阅的消息后的处理      public void onPMessage(String pattern, String channel, String message) {        logger.info("取得按表达式的方式订阅的消息后的处理 :" + pattern + "=" + channel + "=" + message);    }}

 

 

二.消息订阅/发布

1.消息的订阅需要一个Redis连接始终保持连接,Jedis中停止订阅的unsubscribe是在JedisPubSub中

2.程序中因为需要Jedis始终保持连接,又有可能需要停止订阅,所以用到了ExecutorService

package com.gqshao.redis.channels;import com.gqshao.redis.JedisTest;import org.junit.Test;import redis.clients.jedis.Jedis;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;/** * 发布/订阅 */public class MessageTest extends JedisTest {    /**     * SUBSCRIBE [channel...] 订阅一个匹配的通道     * PSUBSCRIBE [pattern...] 订阅匹配的通道     * PUBLISH [channel] [message] 将value推送到channelone通道中     * UNSUBSCRIBE [channel...] 取消订阅消息     * PUNSUBSCRIBE [pattern ...] 取消匹配的消息订阅     * web环境中可以编写一个JedisPubSub 继承 @see redis.clients.jedis.JedisPubSub来实现监听     * Jedis中通过使用 JedisPubSub.UNSUBSCRIBE/PUNSUBSCRIBE 来取消订阅     */    @Test    public void testSubscribe() {        final MyJedisPubSub listener = new MyJedisPubSub();        Thread thread = new Thread(new Runnable() {            @Override            public void run() {                logger.info("subscribe channelA.test channelB.send_message");                jedis.subscribe(listener, "channelA.test", "channelB.send_message");            }        });        ExecutorService executor = Executors.newSingleThreadExecutor();        executor.execute(thread);        // 测试发送        Jedis pubJedis = pool.getResource();        logger.info("publish channelA.test OK : " + pubJedis.publish("channelA.test", "OK"));        logger.info("publish channelB.send_message \"Hello World!\" : " + pubJedis.publish("channelB.send_message", "Hello World!"));        listener.unsubscribe("channelA.test", "channelB.send_message");        try {            executor.shutdownNow();            logger.info("executor.shutdownNow");            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {                logger.warn("Pool did not terminated");            }        } catch (InterruptedException ie) {            Thread.currentThread().interrupt();        }        logger.info("完成subscribe测试");    }    /**     * SUBSCRIBE channelone 订阅一个通道     * PSUBSCRIBE channel* 订阅一批通道     * PUBLISH channelone value 将value推送到channelone通道中     * web环境中可以编写一个Listener 继承 @see redis.clients.jedis.JedisPubSub来实现监听     */    @Test    public void testPsubscribe() {        final MyJedisPubSub listener = new MyJedisPubSub();        Thread thread = new Thread(new Runnable() {            @Override            public void run() {                logger.info("psubscribe channel*");                jedis.psubscribe(listener, "channel*");            }        });        ExecutorService executor = Executors.newSingleThreadExecutor();        executor.execute(thread);        // 测试发送        Jedis pubJedis = pool.getResource();        logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK"));        logger.info("publish channelB.send_message \"Hello World!\"" + pubJedis.publish("channelB.send_message", "Hello World!"));        pool.returnResource(pubJedis);        listener.punsubscribe();        try {            executor.shutdownNow();            logger.info("executor.shutdownNow");            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {                logger.warn("Pool did not terminated");            }        } catch (InterruptedException ie) {            Thread.currentThread().interrupt();        }        logger.info("完成psubscribe测试");        logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK"));    }}

 

 

 

转载地址:http://fqyci.baihongyu.com/

你可能感兴趣的文章
vc学习之键盘事件OnKeyDown
查看>>
近期工作
查看>>
春寒料峭,原来春天这样走近
查看>>
电脑坏了--关于联想笔记本声卡驱动
查看>>
C#中枚举类型在switch语句中值对照问题
查看>>
GridView中根据特殊标记设置不可编辑
查看>>
PowerDesinger中生成数据库时将域的内容转化为实际的字段
查看>>
通过模板将GridView导出为Excel
查看>>
迭代开发过程及一些原则
查看>>
Windows IIS服务器CA认证安装
查看>>
Asp.net 中Excel通过模板导出中发布问题
查看>>
64位机器配置CA认证一些小问题
查看>>
SVN自动完全备份
查看>>
开发中的Warning原来也很有用
查看>>
Silverlight Toolkit例子代码中缺少System.Windows.Controls.Samples.Common.dll的解决办法
查看>>
项目经理培训后的一点感概
查看>>
vc学习之窗口大小发生变更时使控件自动摆放到合适的位置
查看>>
Oracle 获取系统日期时间,导出数据库
查看>>
黄巢的菊花,非常喜欢,贴出来共享一下
查看>>
在光纤环网中的b/s与c/s的比对
查看>>