正文
Pulsar部署和实践(一)
小程序:扫一扫查出行
【扫一扫了解最新限行尾号】
复制小程序
【扫一扫了解最新限行尾号】
复制小程序
前言
本地Docker部署Pulsar消息代理实现消息发布和消息订阅
介绍
相关概念,后面有时间再花时间整理下。
实践步骤
1.使用dokcer本地部署pulsar
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.7.1 \
bin/pulsar standalone
2.docker ps -a 查看pulsar运行是否正常,可以看到下图已经部署成功
pulsar连接地址:http://localhost:8080
pulsar://localhost:6650
3.使用C#客户端Publish Message到pulsar broker中
(1)为了演示,我这里创建了一个C#控制台项目
(2)我们使用官网推荐的C# pulsar客户端包,添加安装DotPulsar nuget包
(3)创建client
//1。创建pulsar客户端
var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://localhost:6650"))
.RetryInterval(new TimeSpan(3))
.Build();
(4)创建生产者,发送消息
//2、创建Pulsar Producer(生产者)
var producer = client.NewProducer()
.Topic("persistent://public/default/mytopic")
.Create();
var data = Encoding.UTF8.GetBytes("Hello Pulsar");
await producer.Send(data);
上图可见显示创建producer成功。
(5)下面再创建一个客户端来消费发送者发送的消息(“Hello Pulsar”)。
//2、创建Pulsar Producer(消费者)
var consumer = client.NewConsumer()
.SubscriptionName("MySubscription")
.Topic("persistent://public/default/mytopic")
.Create();
//3.消费消息
await foreach (var message in consumer.Messages())
{
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}
见上图,发布者发送消息成功被订阅者消费。
4.代码示例
//PublisherClient
static async Task Main(string[] args)
{
Console.WriteLine("Hello Pulsar");
//1。创建pulsar客户端
var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://localhost:6650"))
.RetryInterval(new TimeSpan(3))
.Build();
//2、创建Pulsar Producer(生产者)
var producer = client.NewProducer()
.Topic("persistent://public/default/mytopic")
.Create();
for (int i = 0; i < 5; i++)
{
var data = Encoding.UTF8.GetBytes($"Hello Pulsar {i}");
await producer.Send(data);
Console.WriteLine($"发送消息成功");
} Console.ReadKey();
} //SubscriberClient
static async Task Main(string[] args)
{
//1。创建pulsar客户端
var client = PulsarClient.Builder()
.ServiceUrl(new Uri("pulsar://localhost:6650"))
.RetryInterval(new TimeSpan(3))
.Build();
//2、创建Pulsar Producer(消费者)
var consumer = client.NewConsumer()
.SubscriptionName("MySubscription")
.Topic("persistent://public/default/mytopic")
.Create();
//3.消费消息
await foreach (var message in consumer.Messages())
{
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}
Console.ReadKey();
}