搭建Kafka,并使用C#发布和订阅消息
安装Java
1.安装JDK https://www.oracle.com/java/technologies/downloads/#jdk20-windows
2.新建环境变量JAVA_HOME
3.添加JDK路径到Path环境变量里
安装ZooKeeper
1.下载ZooKeeper,用于集群管理kafka,https://zookeeper.apache.org/releases.html#download
2.解压ZooKeeper,然后进入该目录下的conf,复制zoo_sample.cfg一份并改名zoo.cfg
3.打开编辑zoo.cfg,找到dataDir并修改dataDir=E:\\apache-zookeeper-3.8.1-bin\\tmp,注意必须双斜杠
4.添加环境变量ZOOKEEPER_HOME
5.编辑Path环境变量,添加%ZOOKEEPER_HOME%\bin
6.打开PowerShell,输入zkServer回车,然后等待启动成功,然后不要关闭,让最小化一直运行即可
安装Kafka
1.下载Kafka最新版编译好的二进制文件 https://kafka.apache.org/downloads
2解压Kafka,进入config目录打开server.properties
3.找到log.dirs修改为log.dirs=E:\\kafka\\kafka-logs,注意必须双斜杠
4.打开PowerShell,输入E:\kafka\bin\windows\kafka-server-start.bat E:\kafka\config\server.properties,回车执行,注意路径要改成你解压的后的kafka路径,必须使用PowerShell,否则Windows会报错
使用C# 订阅和发布消息
1.VS创建C#控制台项目,实现订阅功能
2.NuGet添加Confluent.Kafka库,添加以下代码实现订阅主题,并且保持运行,注意不要强制结束,否则需要等待心跳包监测导致延迟几十秒后才能被其他消费者接收数据
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest }; using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) { c.Subscribe("test-topic"); CancellationTokenSource cts = new CancellationTokenSource(); //按下Ctrl+C结束,强制结束的话,需要等待心跳包检查才能断开,会导致延迟几十秒后其他消费者才能收到消息 Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. c.Close(); } }
3.VS另外再创建一个控制台项目,实现发布功能
4.NuGet添加Confluent.Kafka库,添加以下代码实现发布功能
var config = new ProducerConfig { BootstrapServers = "localhost:9092" }; using (var p = new ProducerBuilder<Null, string>(config).Build()) { try { var dr = p.ProduceAsync("test-topic", new Message<Null, string> { Value = "test" }).Result; Console.WriteLine($"发布数据: '{dr.Value}' to '{dr.TopicPartitionOffset}'"); } catch (ProduceException<Null, string> e) { Console.WriteLine($"发生错误: {e.Error.Reason}"); } } Console.ReadLine();
至此结束,下面效果: