最近研究分布式消息队列,分享下!

首先zookeeper  和 kafka 压缩包 解压 并配置好!

我本机zookeeper环境配置如下:

D:\Worksoftware\ApacheZookeeper3\conf\zoo.cfg

以下是kafka的配置

D:\Worksoftware\Apachekafka2.11\config\server.properties

我已经加了path环境变量,没加的话需要到zookeeper对应bin目录下执行zkServer

然后执行cmd命令:

结果:

然后打开第二个dos窗口,我没加环境变量path,执行kafka命令如下:

重头戏来了,开始kafka C#客户端处理:

首先引用kafka-net.dll,可以用vs2013的nuget下载,

以下是Prorame.cs:

  1. class Program
  2. {
  3. static void Main(string[] args)
  4. {
  5. const string topicName = "test";
  6. var options = new KafkaOptions(new Uri("http://localhost:9092"))
  7. {
  8. Log = new ConsoleLog()
  9. };
  10. Task.Run(() =>
  11. {
  12. var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)) { Log = new ConsoleLog() });
  13. foreach (var data in consumer.Consume())
  14. {
  15. Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String());
  16. }
  17. });
  18. //创建一个生产者发消息
  19. var producer = new Producer(new BrokerRouter(options))
  20. {
  21. BatchSize = 100,
  22. BatchDelayTime = TimeSpan.FromMilliseconds(2000)
  23. };
  24. Console.WriteLine("打出一条消息按 enter...");
  25. while (true)
  26. {
  27. var message = Console.ReadLine();
  28. if (message == "quit") break;
  29. if (string.IsNullOrEmpty(message))
  30. {
  31. //
  32. SendRandomBatch(producer, topicName, 200);
  33. }
  34. else
  35. {
  36. producer.SendMessageAsync(topicName, new[] { new Message(message) });
  37. }
  38. }
  39. //释放资源
  40. using (producer)
  41. {
  42. }
  43. }
  44. private static async void SendRandomBatch(Producer producer, string topicName, int count)
  45. {
  46. //发送多个消息
  47. var sendTask = producer.SendMessageAsync(topicName, Enumerable.Range(0, count).Select(x => new Message(x.ToString())));
  48. Console.WriteLine("传送了 #{0} messages.  Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);
  49. var response = await sendTask;
  50. Console.WriteLine("已完成批量发送: {0}. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);
  51. foreach (var result in response.OrderBy(x => x.PartitionId))
  52. {
  53. Console.WriteLine("主题:{0} PartitionId:{1} Offset:{2}", result.Topic, result.PartitionId, result.Offset);
  54. }
  55. }
  56. }

结果:

闲的蛋疼,随便研究一些好东西,.net环境太封闭,每个.net程序员都要扩展视野,技术交流,本人QQ827937686

最新文章

  1. 玩转JavaScript OOP[1]——复杂类型
  2. (Array)121. Best Time to Buy and Sell Stock
  3. hadoop中MapReduce多种join实现实例分析
  4. JSON 问题
  5. atitit。自定义uml MOF EMF体系eclipse emf 教程o7t
  6. treap 1296 营业额统计
  7. 史上最佳 Mac+PhpStorm+XAMPP+Xdebug 集成开发和断点调试环境的配置
  8. POJ 2976 Dropping tests 01分数规划
  9. 10 Super Useful Tools for Web Designers
  10. ios学习之路四(新建Sprite Kit 项目的时候出现apple LLVM 5.0 error)
  11. React-Native 开发(二) 在react-native 中 运用 redux
  12. 在form里面,放了四个UEditor,怎么在后台分别获取它们值
  13. 利用jieba,word2vec,LR进行搜狐新闻文本分类
  14. 20171023xlVBA递归统计WORD字数
  15. linux代码笔记
  16. SolrJ查询条件组合查询实现——(十六)
  17. pandas(四)唯一值、值计数以及成员资格
  18. C#知识点<4>
  19. POJ - 3037 Skiing SPFA
  20. 九度OJ 1127:简单密码 (翻译)

热门文章

  1. aar的使用(module或者library)
  2. arcgis 属性表字段值计算
  3. 虹软人脸识别SDK的接入方法
  4. js下数据库 nedb lokijs
  5. java基础题集
  6. Spring Boot的@SpringBootApplication无法引入的问题
  7. vue 脚手架搭建新项目以及element-ui等vue组件的使用
  8. 图片方向 image orientation Exif
  9. [转] @JoinColumn 详解 (javax.persistence.JoinColumn)
  10. datatabe 与string