1、jdk 8+
2、zookeeper
3、kafka
说明:在kafka较新版本中已经集成了zookeeper,所以不用单独安装zookeeper,只需要在kafka文件目录中启动zookeeper即可

-- 启动 ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties -- 查看是否启动成功 ps -ef | grep zoo
vi config/server.properties
-- 重点配置节点说明 listeners=PLAINTEXT://localhost:9092 -- 将localhost修改为主机ip log.dirs=/bigdata/kafka/logs-1 -- 默认不修改也可

./bin/kafka-server-start.sh config/server.properties
客户端下载地址:Offset Explorer
关于客户端如何使用可查看:kafka可视化客户端工具(Kafka Tool)的基本使用 - Frankdeng - 博客园


NuGet:Confluent.Kafka

static void Main(string[] args)
{
// 发送消息
var producerConfig = new ProducerConfig
{
BootstrapServers = "192.168.140.131:9092",
MessageTimeoutMs = 50000
};
var builder = new ProducerBuilder(producerConfig);
using (var producer = builder.Build())
{
var data = new { key = "1", value = "001" };
var json = JsonConvert.SerializeObject(data);
var dr = producer.ProduceAsync("order", new Message { Key = "order", Value = json }).GetAwaiter().GetResult();
Console.WriteLine($"发送事件{dr.Value}到{dr.TopicPartitionOffset}成功");
}
// 消费消息
var consumerConfig = new ConsumerConfig {
BootstrapServers = "192.168.140.131:9092",
AutoOffsetReset=AutoOffsetReset.Earliest,
GroupId="1111", // 自定义
EnableAutoCommit=true
};
var consumerBuilder = new ConsumerBuilder(consumerConfig);
using (var consumer = consumerBuilder.Build())
{
// 1、订阅
consumer.Subscribe("order");
while (true)
{
try
{
// 2、消费(自动确认)
var result = consumer.Consume();
// 3、业务逻辑
string key = result.Key;
string value = result.Value;
Console.WriteLine($"创建商品:Key:{key}");
Console.WriteLine($"创建商品:Order:{value}");
consumer.Commit(result);
}
catch (Exception e)
{
Console.WriteLine($"异常:Order:{e}");
}
}
}
}

