• Kafka部署安装及简单使用


    一、环境准备

    1、jdk 8+

    2、zookeeper 

    3、kafka

    说明:在kafka较新版本中已经集成了zookeeper,所以不用单独安装zookeeper,只需要在kafka文件目录中启动zookeeper即可

    二、下载地址

    Apache Kafka

    三、部署

    1、启动zookeeper

    -- 启动
    ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    
    -- 查看是否启动成功
    ps -ef | grep zoo

    2、进入解压的kafka目录,修改/config/kafka-server的配置文件

    vi config/server.properties
    -- 重点配置节点说明
    
    listeners=PLAINTEXT://localhost:9092   -- 将localhost修改为主机ip
    
    log.dirs=/bigdata/kafka/logs-1  -- 默认不修改也可

     3、使用kafka-server-start.sh,启动kafka服务

    ./bin/kafka-server-start.sh config/server.properties

    四、使用客户端kafka tools连接kafka

    客户端下载地址:Offset Explorer

    关于客户端如何使用可查看:kafka可视化客户端工具(Kafka Tool)的基本使用 - Frankdeng - 博客园

    五、kafka实战简单使用

    NuGet:Confluent.Kafka

     1、新建.Net Core控制台项目,代码如下:

    static void Main(string[] args)
            {
                // 发送消息
                var producerConfig = new ProducerConfig
                {
                    BootstrapServers = "192.168.140.131:9092",
                    MessageTimeoutMs = 50000
                };
                var builder = new ProducerBuilder(producerConfig);
                using (var producer = builder.Build())
                {
                    var data = new { key = "1", value = "001" };
                    var json = JsonConvert.SerializeObject(data);
                    var dr = producer.ProduceAsync("order", new Message { Key = "order", Value = json }).GetAwaiter().GetResult();
                    Console.WriteLine($"发送事件{dr.Value}到{dr.TopicPartitionOffset}成功");
                }
    
                // 消费消息
                var consumerConfig = new ConsumerConfig {
                    BootstrapServers = "192.168.140.131:9092",
                    AutoOffsetReset=AutoOffsetReset.Earliest,
                    GroupId="1111", // 自定义
                    EnableAutoCommit=true
                };
                var consumerBuilder = new ConsumerBuilder(consumerConfig);
    
    
                using (var consumer = consumerBuilder.Build())
                {
                    // 1、订阅
                    consumer.Subscribe("order");
                    while (true)
                    {
                        try
                        {
                            // 2、消费(自动确认)
                            var result = consumer.Consume();
    
                            // 3、业务逻辑
                            string key = result.Key;
                            string value = result.Value;
    
                            Console.WriteLine($"创建商品:Key:{key}");
                            Console.WriteLine($"创建商品:Order:{value}");
                            consumer.Commit(result);
    
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine($"异常:Order:{e}");
                        }
                    }
                }
            }

    2、使用kafka客户端查看消息投递

  • 相关阅读:
    Google Earth Engine(GEE)——用reducers来获取某一个区域得响应值并转化为列
    ReentrantLock原理(未完待续)
    第三十四章 使用 CSP 进行基于标签的开发 - Hyperevent例子
    SkyWalking 入门教程
    隐马尔科夫模型(HMM)学习笔记
    OpenCV 学习文章记录
    ABB MPRC086444-005数字输入模块
    逻辑漏洞之越权漏洞
    leetcode做题笔记2342. 数位和相等数对的最大和
    非比较排序------计数排序
  • 原文地址:https://blog.csdn.net/jh035/article/details/128062117