Java redis实现消息队列

Java redis实现消息队列文章目录一、单元测试Java多线程二、redis实现消息队列三、java多线程模拟生产者消费者一、单元测试Java多线程使用junit测试多线程代码,但是等到程序结束,输出结果不完整,或者是完全没结果,因此,可能是其他线程还没结束,主线程就结束了。原因:junit在运行时,在主线程结束后就关闭了进程,不会等待各个线程运行结束。==解决方法:==①要是要求不高,可以通过thread.sleep(),让主线程暂时休眠(TimeUnit.MILLISECONDS.sleep(2

大家好,欢迎来到IT知识分享网。

一、单元测试Java多线程

使用junit测试多线程代码,但是等到程序结束,输出结果不完整,或者是完全没结果,因此,可能是其他线程还没结束,主线程就结束了。

原因: junit在运行时,在主线程结束后就关闭了进程,不会等待各个线程运行结束。

==解决方法:==①要是要求不高,可以通过thread.sleep(),让主线程暂时休眠(TimeUnit.MILLISECONDS.sleep(20000);),其他线程运行完在结束;

​ ②比较严谨的做法,可以用 CountDownLatch ,具体使用在代码里有注释(参考:https://blog.csdn.net/goxingman/article/details/105663113);

让主线程休眠,等待其他线程执行完成

public class Learn_Thead { 
   
    /** * 使用主线程休眠的方式来解决:Junit单元测试无法测试多线程问题 * 参考:https://blog.csdn.net/goxingman/article/details/105663113 * */
    @Test
    public void runThread() throws InterruptedException { 
   
        MyThread m1  = new MyThread("t1");
        MyThread m2 = new MyThread("t2");
        m1.start();
        m2.start();
        TimeUnit.MILLISECONDS.sleep(8000);//等待时间

    }
}
/** * 创建新的线程类 */
class MyThread extends Thread{ 
   
    private String name;
    public MyThread(String name){ 
   
        this.name = name;
    }
    public void run(){ 
   
        for(int i = 0; i < 5; i++){ 
   
            System.out.println(name + " 运行, i = " + i);
            try { 
   
                Thread.sleep(1000);
            } catch (InterruptedException e) { 
   
                e.printStackTrace();
            }
        }
    }
}

二、redis实现消息队列

Redis的列表类型可以很容易实现消息队列,类似于MQ的队列模型,任何时候都可以消费消息,但是每一条消息只能允许消费一次。

命令 描述 用法
LPUSH (1)将一个或多个值value插入到列表key的表头
(2)如果有多个value值,那么各个value值按从左到右的顺序依次插入表头
(3)key不存在,一个空列表会被创建并执行LPUSH操作
(4)key存在但不是列表类型,返回错误
LPUSH key value [value …]
LPUSHX (1)将值value插入到列表key的表头,当且晋档key存在且为一个列表
(2)key不存在时,LPUSHX命令什么都不做
LPUSHX key value
LPOP (1)移除并返回列表key的头元素 LPOP key
LRANGE (1)返回列表key中指定区间内的元素,区间以偏移量start和stop指定
(2)start和stop都以0位底
(3)可使用负数下标,-1表示列表最后一个元素,-2表示列表倒数第二个元素,以此类推
(4)start大于列表最大下标,返回空列表
(5)stop大于列表最大下标,stop=列表最大下标
LRANGE key start stop
LREM (1)根据count的值,移除列表中与value相等的元素
(2)count>0表示从头到尾搜索,移除与value相等的元素,数量为count
(3)count<0表示从从尾到头搜索,移除与value相等的元素,数量为count
(4)count=0表示移除表中所有与value相等的元素
LREM key count value
LSET (1)将列表key下标为index的元素值设为value
(2)index参数超出范围,或对一个空列表进行LSET时,返回错误
LSET key index value
LINDEX (1)返回列表key中,下标为index的元素 LINDEX key index
LINSERT (1)将值value插入列表key中,位于pivot前面或者后面
(2)pivot不存在于列表key时,不执行任何操作(3)key不存在,不执行任何操作
LINSERT key BEFORE|AFTER pivot value
LLEN (1)返回列表key的长度(2)key不存在,返回0 LLEN key
LTRIM (1)对一个列表进行修剪,让列表只返回指定区间内的元素,不存在指定区间内的都将被移除 LTRIM key start stop
RPOP (1)移除并返回列表key的尾元素 RPOP key
RPOPLPUSH 在一个原子时间内,执行两个动作:(1)将列表source中最后一个元素弹出并返回给客户端
(2)将source弹出的元素插入到列表desination,作为destination列表的头元素
RPOPLPUSH source destination
RPUSH (1)将一个或多个值value插入到列表key的表尾 RPUSH key value [value …]
RPUSHX (1)将value插入到列表key的表尾,当且仅当key存在并且是一个列表
(2)key不存在,RPUSHX什么都不做
RPUSHX key value
  •   通过命令 lpush+lpop
  • 队列  命令 lpush+rpop
  • 有限集合  命令 lpush+ltrim
  • 消息队列  命令 lpush+rpop
127.0.0.1:6379> lpush list a b c d
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "d"
2) "c"
3) "b"
4) "a"
127.0.0.1:6379> rpop list
"a"
127.0.0.1:6379> rpop list
"b"

三、java多线程模拟生产者消费者

RedisLearn.java

/** * 生产者类 * 生产者每隔600ms生成一条消息 * */
class MessageProducer extends Thread{ 
   
    public static final String MESSAGE_KEY = "message:queue";
    private volatile int count;

    public void putMessage(String mess){ 
   
        Jedis jedis = new Jedis("IP", 6379);
        jedis.auth("123456");
        Long size = jedis.lpush(MESSAGE_KEY, mess);
        System.out.println("Put " + Thread.currentThread().getName() + " put message " + count);
        count++;
    }
    @Override
    public synchronized  void run() { 
   
        for(int i = 0 ; i < 5; i++){ 
   
            putMessage("message" + count);
            try { 
   
                Thread.sleep(600);
            } catch (InterruptedException e) { 
   
                e.printStackTrace();
            }
        }
    }
}


/** * 消费者类 * 消费者每隔1000ms消费一条消息 */
class MessageConsumer extends Thread{ 
   
    public static final String MESSAGE_KEY = "message:queue";
    private volatile int count;

    public void consumerMessage(){ 
   
        Jedis jedis = new Jedis("IP", 6379);
        jedis.auth("123456");
        String message = jedis.rpop(MESSAGE_KEY);
        System.out.println("Pop " + Thread.currentThread().getName() + "comsumer message = " + message);
        count++;
    }
    @Override
    public synchronized  void run() { 
   
        for(int i = 0; i < 9; i++){ 
   
            consumerMessage();
            try { 
   
                Thread.sleep(1000);
            } catch (InterruptedException e) { 
   
                e.printStackTrace();
            }
        }
    }
}

public class RedisLearn { 
   
    /** * 使用单元测试,测试多线程下模拟生产者消费者 * 使用Redis List : lpush rpop 模拟消息队列 * @throws InterruptedException */
    @Test
    public void runMessageProducter() throws InterruptedException { 
   
        MessageProducer producer = new MessageProducer();
        MessageConsumer comsumer = new MessageConsumer();
        Thread t1 = new Thread(producer, "thread1");
        Thread t2 = new Thread(producer, "thread2");
        Thread t3 = new Thread(producer, "thread3");
        Thread t4 = new Thread(comsumer, "thread4");
        Thread t5 = new Thread(comsumer, "thread5");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
        TimeUnit.MILLISECONDS.sleep(20000);
    }
}

result

Put thread1 put message 0
Pop thread5comsumer message = message0
Put thread1 put message 1
Pop thread5comsumer message = message1
Put thread1 put message 2
Pop thread5comsumer message = message2
Put thread1 put message 3
Put thread1 put message 4
Pop thread5comsumer message = message3
Put thread2 put message 5
Put thread2 put message 6
Pop thread5comsumer message = message4
Put thread2 put message 7
Pop thread5comsumer message = message5
Put thread2 put message 8
Put thread2 put message 9
Pop thread5comsumer message = message6
Put thread3 put message 10
Pop thread5comsumer message = message7
Put thread3 put message 11
Put thread3 put message 12
Pop thread5comsumer message = message8
Put thread3 put message 13
Pop thread4comsumer message = message9
Put thread3 put message 14
Pop thread4comsumer message = message10
Pop thread4comsumer message = message11
Pop thread4comsumer message = message12
Pop thread4comsumer message = message13
Pop thread4comsumer message = message14
Pop thread4comsumer message = null
Pop thread4comsumer message = null
Pop thread4comsumer message = null

四、阻塞读

Redis Brpop 命令移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

/** * 消费者类 */
class MessageConsumer extends Thread{ 
   
    public static final String MESSAGE_KEY = "message:queue";

    public void consumerMessage(){ 
   
        Jedis jedis = new Jedis("nuptpisa.top", 6379);
        jedis.auth("123456");
// String message = jedis.rpop(MESSAGE_KEY);
// System.out.println("Pop " + Thread.currentThread().getName() + "comsumer message = " + message);
        List<String> message = jedis.brpop(0, MESSAGE_KEY);
        System.out.println("Pop " + Thread.currentThread().getName() + "comsumer message = " + message);
    }
    @Override
    public synchronized  void run() { 
   
        for(int i = 0; i < 8; i++){ 
   
            consumerMessage();
            try { 
   
                Thread.sleep(1000);
            } catch (InterruptedException e) { 
   
                e.printStackTrace();
            }
        }
    }
}

result

Put thread1 put message 0
Pop thread4comsumer message = [message:queue, message0]
Put thread1 put message 1
Pop thread4comsumer message = [message:queue, message1]
Put thread1 put message 2
Put thread1 put message 3
Pop thread4comsumer message = [message:queue, message2]
Put thread1 put message 4
Pop thread4comsumer message = [message:queue, message3]
Put thread3 put message 5
Put thread3 put message 6
Pop thread4comsumer message = [message:queue, message4]
Put thread3 put message 7
Pop thread4comsumer message = [message:queue, message5]
Put thread3 put message 8
Put thread3 put message 9
Pop thread4comsumer message = [message:queue, message6]
Put thread2 put message 10
Pop thread4comsumer message = [message:queue, message7]
Put thread2 put message 11
Put thread2 put message 12
Pop thread5comsumer message = [message:queue, message8]
Put thread2 put message 13
Pop thread5comsumer message = [message:queue, message9]
Put thread2 put message 14
Pop thread5comsumer message = [message:queue, message10]
Pop thread5comsumer message = [message:queue, message11]
Pop thread5comsumer message = [message:queue, message12]
Pop thread5comsumer message = [message:queue, message13]
Pop thread5comsumer message = [message:queue, message14]

Process finished with exit code 0

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/24094.html

(0)
上一篇 2023-08-30 14:33
下一篇 2023-08-30 20:45

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信