Kafka.net使用编程入门(一)
最近研究分布式消息队列,分享下!
首先zookeeper 和 kafka 压缩包 解压 并配置好!
我本机zookeeper环境配置如下:
D:\Worksoftware\ApacheZookeeper3\conf\zoo.cfg
以下是kafka的配置
D:\Worksoftware\Apachekafka2.11\config\server.properties
我已经加了path环境变量,没加的话需要到zookeeper对应bin目录下执行zkServer
然后执行cmd命令:
结果:
然后打开第二个dos窗口,我没加环境变量path,执行kafka命令如下:
重头戏来了,开始kafka C#客户端处理:
首先引用kafka-net.dll,可以用vs2013的nuget下载,
以下是Prorame.cs:
- class Program
- {
- static void Main(string[] args)
- {
- const string topicName = "test";
- var options = new KafkaOptions(new Uri("http://localhost:9092"))
- {
- Log = new ConsoleLog()
- };
- Task.Run(() =>
- {
- var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)) { Log = new ConsoleLog() });
- foreach (var data in consumer.Consume())
- {
- Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String());
- }
- });
- //创建一个生产者发消息
- var producer = new Producer(new BrokerRouter(options))
- {
- BatchSize = 100,
- BatchDelayTime = TimeSpan.FromMilliseconds(2000)
- };
- Console.WriteLine("打出一条消息按 enter...");
- while (true)
- {
- var message = Console.ReadLine();
- if (message == "quit") break;
- if (string.IsNullOrEmpty(message))
- {
- //
- SendRandomBatch(producer, topicName, 200);
- }
- else
- {
- producer.SendMessageAsync(topicName, new[] { new Message(message) });
- }
- }
- //释放资源
- using (producer)
- {
- }
- }
- private static async void SendRandomBatch(Producer producer, string topicName, int count)
- {
- //发送多个消息
- var sendTask = producer.SendMessageAsync(topicName, Enumerable.Range(0, count).Select(x => new Message(x.ToString())));
- Console.WriteLine("传送了 #{0} messages. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);
- var response = await sendTask;
- Console.WriteLine("已完成批量发送: {0}. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);
- foreach (var result in response.OrderBy(x => x.PartitionId))
- {
- Console.WriteLine("主题:{0} PartitionId:{1} Offset:{2}", result.Topic, result.PartitionId, result.Offset);
- }
- }
- }
结果:
闲的蛋疼,随便研究一些好东西,.net环境太封闭,每个.net程序员都要扩展视野,技术交流,本人QQ827937686
最新文章
- 玩转JavaScript OOP[1]——复杂类型
- (Array)121. Best Time to Buy and Sell Stock
- hadoop中MapReduce多种join实现实例分析
- JSON 问题
- atitit。自定义uml MOF EMF体系eclipse emf 教程o7t
- treap 1296 营业额统计
- 史上最佳 Mac+PhpStorm+XAMPP+Xdebug 集成开发和断点调试环境的配置
- POJ 2976 Dropping tests 01分数规划
- 10 Super Useful Tools for Web Designers
- ios学习之路四(新建Sprite Kit 项目的时候出现apple LLVM 5.0 error)
- React-Native 开发(二) 在react-native 中 运用 redux
- 在form里面,放了四个UEditor,怎么在后台分别获取它们值
- 利用jieba,word2vec,LR进行搜狐新闻文本分类
- 20171023xlVBA递归统计WORD字数
- linux代码笔记
- SolrJ查询条件组合查询实现——(十六)
- pandas(四)唯一值、值计数以及成员资格
- C#知识点<;4>;
- POJ - 3037 Skiing SPFA
- 九度OJ 1127:简单密码 (翻译)