大家好,欢迎来到IT知识分享网。
文章目录
前言
最近apache pulsar出镜率挺高的,这里新开一篇文档记录一下pulsar的学习之路
前面有介绍Apache Pulsar基本理论,这里再开一篇介绍一个Pulsar的基本部署与可视化,还有一个小demo
1 Pulsar安装部署
1.1 Pulsar集群搭建
温馨提示
- 单集群的Pulsar实例可以满足绝大多数学习、开发、验证需求,如果没有特殊的需要,建议使用单集群Pulsar实例,多集群的安装部署参考多集群部署。
- 在部署Pulsar过程中,如需要使用所有内置的Pulsar IO 连接器,需要下载apache-pulsar-io-connectors(apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz),并安装到Pulsar的connectors目录下。
Pulsar集群的部署步骤如下:
- 部署ZooKeeper集群(可选)
- 初始化集群元信息
- 部署BookKeeper集群
- 部署一个或多个Pulsar broker
1.2 安装前准备
如果你已经有一个ZooKeeper集群,并且愿意重用该集群,则无需准备安装ZooKeeper集群的资源,也无需部署ZooKeeper集群
- 至少6台Linux机器或虚拟机
- 三台机器用于部署ZooKeeper集群,Pulsar仅会定期使用ZooKeeper进行协调和配置任务,业务操作不依赖ZooKeeper集群,部署时可以使用性能规格较低的机器。
- 三台部署BookKeeper集群和Pulsar broker。Puslar集群实际承载业务,建议使用性能规格更高的机器,比如计算能力更强的CPU、10Gbps NIC、SSD硬盘或高性能存储。
- 覆盖所有节点的DNS名称,如果没有DNS服务器可以通过hosts文件实现。
- 所有的机器需要安装Java 8或更高版本
本文安装部署Pulsar集群的节点信息
节点 | 规格 | 部署组件 | 主机名地址 |
---|---|---|---|
Pulsar-zk-01 | 4vCPU,8G内存 | ZooKeeper集群 | host06 |
Pulsar-zk-02 | 4vCPU,8G内存 | ZooKeeper集群 | host03 |
Pulsar-zk-03 | 4vCPU,8G内存 | ZooKeeper集群 | host04 |
Pulsar-bk-01 | 8vCPU,16G内存 | BookKeeper集群 | host02 |
Pulsar-bk-02 | 8vCPU,16G内存 | BookKeeper集群 | host01 |
Pulsar-bk-03 | 8vCPU,16G内存 | BookKeeper集群 | host05 |
1.3 安装Pulsar
集群中的每个节点都需要安装Pulsar二进制包,包括ZooKeeper和BookKeeper节点。
- 获取Pulsar安装包(apache-pulsar-2.8.0-bin.tar.gz)
- 将软件包拷贝到/opt目录下解压,并将解压的目录重命名为pulsarCluster
tar -zxvf apache-pulsar-2.1.1-incubating-bin.tar.gz
mv apache-pulsar-2.1.1-incubating pulsarCluster·
*确保/opt目录有足够的磁盘空间,或使用其他目录安装
Pulsar的目录结构如下表:
目录 | 内容 |
---|---|
bin | Pulsar的命令行工具 |
conf | Pulsar的配置文件 |
data | 存储ZooKeeper和BookKeeper数据 |
lib | Pulsar使用的第三方库 |
logs | 日志存储路径 |
examples | Pulsar提供的样例 |
1.4 安装Pulsar 连接器(可选)
从2.1.0-inclubating版本开始,Pulsar单独发布了包含所有内置连接器的二进制包,如果想使用这些内置的连接器,可以参考下面的步骤安装,如果不需要可以直接跳过。
- 获取Pulsar IO Connectors软件包(apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz)
- 解压软件包,并将connector目录拷贝到Pulsar安装目录(/opt/puslarCluster)
tar -zxvf apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz
cd apache-pulsar-io-connectors-2.1.1-incubating
cp -r connectors/ /opt/pulsarCluster
1.5 部署ZooKeeper集群
- 获取ZooKeeper安装包
- 将安装包拷贝到三个节点的/opt目录解压
tar -zxvf zookeeper-3.4.12.tar.gz
- 修改 conf/zookeeper.conf配置,增加ZooKeeper集群节点信息
server.1=host06:2888:3888
server.2=host03:2888:3888
server.3=host04:2888:3888
cd /opt/pulsarCluster
echo “server.1=host06:2888:3888” >> conf/zookeeper.conf
echo “server.1=host03:2888:3888” >> conf/zookeeper.conf
echo “server.1=host04:2888:3888” >> conf/zookeeper.conf
- 配置ZooKeeper myid信息
cd /opt/pulsarCluster
mkdir -p data/zookeeper
echo 1 > data/zookeeper/myid
- 以守护进程启动ZooKeeper
cd /opt/pulsarCluster/bin
./pulsar-daemon start zookeeper
1.6 配置集群信息
部署完ZooKeeper集群后,需要将一些Pulsar集群的元信息写入ZooKeeper集群的每个节点,由于数据在ZooKeeper集群内部会互相同步,因此只需要将元信息写入ZooKeeper的一个节点。可以在ZooKeeper集群的任意节点通过pulsar工具的initialize-cluster-metadata方法配置数据,配置命令只需执行一次,否则ZooKeeper会报节点已经存在的错误。命令的一个简单样例如下:
$ ./pulsar initialize-cluster-metadata \
--cluster pulsar-cluster-zk-1 \
--zookeeper host06:2181 \
--configuration-store host06:2181 \
--web-service-url http://pulsar.cluster.com:8080 \
--web-service-url-tls https://pulsar.cluster.com:8443 \
--broker-service-url pulsar://pulsar.cluster.com:6650 \
--broker-service-url-tls pulsar+ssl://pulsar.cluster.com:6651
在本文的安装部署过程中,Pulsar集群的名称为pulsar-cluster,统一域名pulsar.cluster.com。命令参数的具体含义如下:
Flag | Description |
---|---|
–cluster | 集群名称 |
–zookeeper | ZooKeeper集群连接参数,仅需要包含集群中的一个节点即可 |
–configuration-store | Pulsar实例的配置存储集群(ZooKeeper),和-zookeeper参数一样只需要包含集群中的一个节点即可 |
–web-service-url | 集群Web服务的URL+端口,URL必须是一个i标准的DNS名称,默认端口8080,不建议修改。 |
–web-service-url-tls | 集群Web提供TLS服务的URL+端口,端口默认8443,不建议修改。 |
–broker-service-url | 集群brokers服务URL,URL中DNS的名称和Web服务保持一致,URL使用pulsar替代http/http,端口默认6650,不建议修改。 |
–broker-service-url-tls | 集群brokers提供TLS服务的URL,默认端口6551,不建议修改。 |
1.7 部署BookKeeper集群
- 配置BookKeeper集群
Pulsar集群中所有持久数据的存储都由BookKeeper负责,因此如果想使用Pulsar需要部署一个BookKeeper集群,建议部署一个包含3个bookie节点的BookKeeper集群。BookKeeper集群的配置使用conf/bookkeeper.conf文件,BookKeeper最终的配置是配置ZooKeeper集群的地址,一个具体的配置例子如下
zkServers=host06:2181,host03:2181,host04:2181
Pulsar从2.1.0版本开始引入了有状态函数,如果想使用该功能,还需要在conf/bookkeeper.conf文件中增加如下配置:
extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
- 启动BooKeeper集群
- 后台进程启动
cd /opt/pulsarCluster/bin
./pulsar-daemon start bookie
- 前台进程启动
cd /opt/pulsarCluster/bin
./bookkeeper bookie
- 检查BookKeeper集群状态
cd /opt/pulsarCluster/bin
./bookkeeper shell bookiesanity
如果BookKeeper成功运行,输出的最后一行结果为
org.apache.bookkeeper.bookie.BookieShell - Bookie sanity test succeeded
2 Pulsar可视化
Apache Pulsar Manager 是一个基于网页的 GUI 管理和监控工具,帮助 Pulsar 管理员和用户管理和监控 Tenant、Namespace、Topic、Subscription、Broker 和 Cluster 等,并支持动态配置多种环境。
2.1 Pulsar Manager安装
最简单的方式是docker安装:
$ yum install -y docker
$ systemctl start docker
$ systemctl enable docker
- 下载镜像并安装Pulsar Manager
$ docker pull apachepulsar/pulsar-manager:v0.2.0
$ docker run -dit \
-p 9527:9527 -p 7750:7750 \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
apachepulsar/pulsar-manager:v0.2.0
2.2 Pulsar Manager使用
2.2.1 初始化账号
需要使用curl命令添加一个账户
$ CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
$ curl \
-H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
-H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
-H "Content-Type: application/json" \
-X PUT http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
调用完成之后,可以使用用户名admin,密码apachepulsar登录pulsar manager
使用浏览器http://hostname:9527
2.2.2 添加environment
如需管理多集群,只需要配置 serviceURL,即能在多个环境间自由切换
- 管理 Tenant
支持对 Tenant 进行增加、修改和删除等操作
- 管理 Namespace
支持对 Namespace 进行增加、删除和修改 policy 等操作。
- 管理 Subscription
支持对 Subscription 进行 skip、expire、clear 和 reset 等操作。
- 管理 Cluster
支持对 Cluster 进行浏览和配置等操作。
- 管理 Broker
支持对 Broker 进行浏览、健康检查和配置查询等操作
- 监控 Topic 和 Subscription
3 使用Pulsar发布、订阅消息(Java)
3.1 安装Pulsar Java客户端
如果使用Maven,在工程的pom文件增加如下配置:
<!-- 在<properties> 块中增加版本号信息 -->
<pulsar.version>2.1.1-incubating</pulsar.version>
<!--增加依赖 -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${
pulsar.version}</version>
</dependency>
3.2 构造Client
- Pular连接 URLs
- 本地Pulsar
pulsar://localhost:6650
- 生成环境Pulsar集群(使用域名
pulsar://pulsar.cluster.com:6650
- 开启TLS的条件下,Pulsar集群连接URLs
pulsar+ssl://pulsar.cluster.com:6651
- 配置Pulsar客户端
客户端的配置主要包括:Pulsar集群信息配置、鉴权信息配置、TLS配置、线程数连接数配置等。具体的配置可以参考Pulsar Client配置。
基于本文搭建的集群,只保留最简单的配置信息,Pulsar Client的构造如下:
private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVER_URL)
.enableTcpNoDelay(true)
.build();
3.3 构造生产者
一个完整的生产者样例如下:
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class ProducerDemm {
private static final Logger log = LoggerFactory.getLogger(ProducerDemm.class);
private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";
public static void main(String[] args) throws Exception {
// 构造Pulsar Client
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVER_URL)
.enableTcpNoDelay(true)
.build();
// 构造生产者
Producer<String> producer = client.newProducer(Schema.STRING)
.producerName("my-producer")
.topic("persistent://public/default/my-topic")
.batchingMaxMessages(1024)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.enableBatching(true)
.blockIfQueueFull(true)
.maxPendingMessages(512)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
// 同步发送消息
MessageId messageId = producer.send("Hello World");
log.info("message id is {}",messageId);
CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message");
// 阻塞线程,直到返回结果
log.info("async message id is {}",asyncMessageId.get());
// 配置发送的消息元信息,同步发送
producer.newMessage()
.key("my-message-key")
.value("my-message")
.property("my-key", "my-value")
.property("my-other-key", "my-other-value")
.send();
producer.newMessage()
.key("my-async-message-key")
.value("my-async-message")
.property("my-async-key", "my-async-value")
.property("my-async-other-key", "my-async-other-value")
.sendAsync();
// 关闭producer的方式有两种:同步和异步
// producer.closeAsync();
producer.close();
// 关闭licent的方式有两种,同步和异步
// client.close();
client.closeAsync();
}
}
3.4 构造消费者
- 单订阅
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class MyConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";
public static void main(String[] args) throws Exception {
// 构造Pulsar Client
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVER_URL)
.enableTcpNoDelay(true)
.build();
Consumer consumer = client.newConsumer()
.consumerName("my-consumer")
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECONDS)
.maxTotalReceiverQueueSizeAcrossPartitions(10)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
do {
// 接收消息有两种方式:异步和同步
// CompletableFuture<Message<String>> message = consumer.receiveAsync();
Message message = consumer.receive();
log.info("get message from pulsar cluster,{}", message);
} while (true);
}
- 多订阅
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public class MultiConsumer {
private static final Logger log = LoggerFactory.getLogger(MultiConsumer.class);
private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";
private static final String DEFAULT_NS_TOPICS = "persistent://public/default/.*";
private static final String DEFATULT_NS_REG_TOPICS= "persistent://public/default/my.*";
private static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVER_URL)
.enableTcpNoDelay(true)
.build();
ConsumerBuilder consumerBuilder = client.newConsumer()
.subscriptionName("multi-sub");
// 订阅namespace下所有的topic
Pattern allTopicsInNamespace = Pattern.compile(DEFAULT_NS_TOPICS);
consumerBuilder.topicsPattern("").subscribe();
// 订阅namespace下满足正则匹配的topic
Pattern someTopicsInNamespace = Pattern.compile(DEFATULT_NS_REG_TOPICS);
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();
List<String> topics = Arrays.asList(
"topic-1",
"topic-2",
"topic-3"
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.subscribe();
}
}
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/22121.html