Pulsar的落地实践

摘要

Pulsar部署和实践(一) 前言本地Docker部署Pulsar消息代理实现消息发布和消息订阅 介绍 相关概念,后面有时间再花时间整理下。 实践步骤1.使用dokcer本地部署pulsardocker run -it \ -p 6650:6650 \ -p 8080:8080 \ …

正文

Pulsar部署和实践(一)

前言

本地Docker部署Pulsar消息代理实现消息发布和消息订阅

 

介绍

Apache Pulsar 介绍

 

相关概念,后面有时间再花时间整理下。

 

实践步骤

1.使用dokcer本地部署pulsar

docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  --mount source=pulsardata,target=/pulsar/data \
  --mount source=pulsarconf,target=/pulsar/conf \
  apachepulsar/pulsar:2.7.1 \
  bin/pulsar standalone

  

2.docker ps -a 查看pulsar运行是否正常,可以看到下图已经部署成功

pulsar连接地址:http://localhost:8080

         pulsar://localhost:6650

3.使用C#客户端Publish Message到pulsar broker中

(1)为了演示,我这里创建了一个C#控制台项目

 

(2)我们使用官网推荐的C# pulsar客户端包,添加安装DotPulsar nuget包

(3)创建client

  //1。创建pulsar客户端
           var client = PulsarClient.Builder()
                        .ServiceUrl(new Uri("pulsar://localhost:6650"))
                        .RetryInterval(new TimeSpan(3))
                        .Build();

 

(4)创建生产者,发送消息

            //2、创建Pulsar Producer(生产者)
            var producer = client.NewProducer()
                     .Topic("persistent://public/default/mytopic")
                     .Create();
            var data = Encoding.UTF8.GetBytes("Hello Pulsar");
            await producer.Send(data);

上图可见显示创建producer成功。

(5)下面再创建一个客户端来消费发送者发送的消息(“Hello Pulsar”)。

            //2、创建Pulsar Producer(消费者)
            var consumer = client.NewConsumer()
                     .SubscriptionName("MySubscription")
                     .Topic("persistent://public/default/mytopic")
                     .Create();
​
            //3.消费消息
            await foreach (var message in consumer.Messages())
            {
                Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
            }

见上图,发布者发送消息成功被订阅者消费。

4.代码示例

//PublisherClient
  static async Task Main(string[] args)
        {
            Console.WriteLine("Hello Pulsar");
​
            //1。创建pulsar客户端
            var client = PulsarClient.Builder()
                        .ServiceUrl(new Uri("pulsar://localhost:6650"))
                        .RetryInterval(new TimeSpan(3))
                        .Build();
​
            //2、创建Pulsar Producer(生产者)
            var producer = client.NewProducer()
                     .Topic("persistent://public/default/mytopic")
                     .Create();
​
            for (int i = 0; i < 5; i++)
            {
                var data = Encoding.UTF8.GetBytes($"Hello Pulsar {i}");
                await producer.Send(data);
                Console.WriteLine($"发送消息成功");
            }
            
            Console.ReadKey();
        }
        
        
  //SubscriberClient
  static async Task Main(string[] args)
        {
            //1。创建pulsar客户端
            var client = PulsarClient.Builder()
                        .ServiceUrl(new Uri("pulsar://localhost:6650"))
                        .RetryInterval(new TimeSpan(3))
                        .Build();
​
            //2、创建Pulsar Producer(消费者)
            var consumer = client.NewConsumer()
                     .SubscriptionName("MySubscription")
                     .Topic("persistent://public/default/mytopic")
                     .Create();
​
            //3.消费消息
            await foreach (var message in consumer.Messages())
            {
                Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
            }
​
​
            Console.ReadKey();
        }

关注不迷路

扫码下方二维码,关注宇凡盒子公众号,免费获取最新技术内幕!

温馨提示:如果您访问和下载本站资源,表示您已同意只将下载文件用于研究、学习而非其他用途。
文章版权声明 1、本网站名称:宇凡盒子
2、本站文章未经许可,禁止转载!
3、如果文章内容介绍中无特别注明,本网站压缩包解压需要密码统一是:yufanbox.com
4、本站仅供资源信息交流学习,不保证资源的可用及完整性,不提供安装使用及技术服务。点此了解
5、如果您发现本站分享的资源侵犯了您的权益,请及时通知我们,我们会在接到通知后及时处理!提交入口
0

评论0

请先

站点公告

🚀 【宇凡盒子】全网资源库转储中心

👉 注册即送VIP权限👈

👻 全站资源免费下载✅,欢迎注册!

记得 【收藏】+【关注】 谢谢!~~~

立即注册
没有账号?注册  忘记密码?

社交账号快速登录