• Kafka部署安装及简单使用


    一、环境准备

    1、jdk 8+

    2、zookeeper 

    3、kafka

    说明:在kafka较新版本中已经集成了zookeeper,所以不用单独安装zookeeper,只需要在kafka文件目录中启动zookeeper即可

    二、下载地址

    https://kafka.apache.org/downloads

     

     

    三、部署

    1、启动zookeeper

    -- 启动
    ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    
    -- 查看是否启动成功
    ps -ef | grep zoo

    2、进入解压的kafka目录,修改/config/kafka-server的配置文件

    vi config/server.properties
    -- 重点配置节点说明
    
    listeners=PLAINTEXT://localhost:9092   -- 将localhost修改为主机ip
    
    log.dirs=/bigdata/kafka/logs-1  -- 默认不修改也可

     

     3、使用kafka-server-start.sh,启动kafka服务

    ./bin/kafka-server-start.sh config/server.properties

    四、使用客户端kafka tools连接kafka

    客户端下载地址:https://www.kafkatool.com/download.html

    关于客户端如何使用可查看:https://www.cnblogs.com/frankdeng/p/9452982.html

     

     

     

     

     

     

     

     

    五、kafka实战简单使用

     

    NuGet:Confluent.Kafka

     1、新建.Net Core控制台项目,代码如下:

     

     

    复制代码
    static void Main(string[] args)
            {
                // 发送消息
                var producerConfig = new ProducerConfig
                {
                    BootstrapServers = "192.168.140.131:9092",
                    MessageTimeoutMs = 50000
                };
                var builder = new ProducerBuilder<string, string>(producerConfig);
                using (var producer = builder.Build())
                {
                    var data = new { key = "1", value = "001" };
                    var json = JsonConvert.SerializeObject(data);
                    var dr = producer.ProduceAsync("order", new Message<string, string> { Key = "order", Value = json }).GetAwaiter().GetResult();
                    Console.WriteLine($"发送事件{dr.Value}到{dr.TopicPartitionOffset}成功");
                }
    
                // 消费消息
                var consumerConfig = new ConsumerConfig {
                    BootstrapServers = "192.168.140.131:9092",
                    AutoOffsetReset=AutoOffsetReset.Earliest,
                    GroupId="1111", // 自定义
                    EnableAutoCommit=true
                };
                var consumerBuilder = new ConsumerBuilder<string, string>(consumerConfig);
    
    
                using (var consumer = consumerBuilder.Build())
                {
                    // 1、订阅
                    consumer.Subscribe("order");
                    while (true)
                    {
                        try
                        {
                            // 2、消费(自动确认)
                            var result = consumer.Consume();
    
                            // 3、业务逻辑
                            string key = result.Key;
                            string value = result.Value;
    
                            Console.WriteLine($"创建商品:Key:{key}");
                            Console.WriteLine($"创建商品:Order:{value}");
                            consumer.Commit(result);
    
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine($"异常:Order:{e}");
                        }
                    }
                }
            }
    复制代码

     

    2、使用kafka客户端查看消息投递

     

  • 相关阅读:
    使用Python自动发送邮件
    【Qt】QMainWindow |QDialog对话框
    智能汽车能否真正实现无人驾驶,为什么?
    机器学习jupyter 鸢尾花决策树
    【iOS】push与present Controller的区别
    自动创建表分区存储
    云里雾里?云方案没有统一标准,业务结合实际情况才是应该着重考虑的
    【自然语言处理(NLP)】基于SQuAD的机器阅读理解
    linux 普通挂载/LVM挂载/LVM扩容
    HCIA-PPPOE原理与配置
  • 原文地址:https://www.cnblogs.com/sportsky/p/16560110.html