RabbitMQ 之发布订阅模式[亲测有效]

RabbitMQ 之发布订阅模式[亲测有效]publish/subscribe发布订阅模式中,生产者不再直接与队列绑定,而是将数据发送至交换机Exchange交换机Exchange用于将数据按某种规则送入与之绑定的队列,进而供消费者使用。发布订阅模式中,交换机将无差别的将所有消息送入与之绑定的队列,所有消费者拿到的消息完全相同,交换机的类型为fanoutexchange的类型有directtopicheadersfanout生产者packagemainimport( “log” “os” “rabbit/uti

大家好,欢迎来到IT知识分享网。

publish/subscribe

发布订阅模式中,生产者不再直接与队列绑定,而是将数据发送至交换机Exchange

交换机Exchange用于将数据按某种规则送入与之绑定的队列,进而供消费者使用。

发布订阅模式中,交换机将无差别的将所有消息送入与之绑定的队列,所有消费者拿到的消息完全相同,交换机的类型为fanout

pubsub

exchange的类型有

  • direct
  • topic
  • headers
  • fanout

生产者

package main

import (
	"log"
	"os"
	"rabbit/utils"
	"strings"

	"github.com/streadway/amqp"
)

func main() { 
   
	ch, _ := utils.GetRabbitMQChannel()

	_ = ch.ExchangeDeclare(
		"logs",   // name
		"fanout", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)

	body := bodyFrom(os.Args)
	_ = ch.Publish(
		"logs", // exchange
		"",     // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{ 
   
			ContentType: "text/plain",
			Body:        []byte(body),
		})

	log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string { 
   
	var s string
	if (len(args) < 2) || os.Args[1] == "" { 
   
		s = "hello"
	} else { 
   
		s = strings.Join(args[1:], " ")
	}
	return s
}

消费者

package main

import (
	"log"
	"rabbit/utils"
)

func main() { 
   
	ch, _ := utils.GetRabbitMQChannel()

	_ = ch.ExchangeDeclare(
		"logs",   // name
		"fanout", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	q, _ := ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)

	_ = ch.QueueBind(
		q.Name, // queue name
		"",     // routing key
		"logs", // exchange
		false,
		nil,
	)

	msgs, _ := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)

	forever := make(chan bool)

	go func() { 
   
		for d := range msgs { 
   
			log.Printf(" [x] %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

开启的两个消费者收到了相同的消息

image-20220419173218748

image-20220419173142866

image-20220419171900953

临时队列

队列实例包含RabbitMQ生成的随机队列名称, 当声明它的连接关闭时,队列将被删除,因为它被声明为exclusive

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/24273.html

(0)
上一篇 2023-08-15 09:33
下一篇 2024-06-29 22:26

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信