• php安装kafka 和在linux下php-rdkafka扩展安装


    我的开发环境是php7.3 ,先来部署两个php扩展,php7.3目录下放librdkafka.dll,ext/php_rdkafka.dll,php.ini增加,[rdkafka]
    extension = php_rdkafka.dll
    php7.3对应的扩展包链接:PECL :: Package :: rdkafka
    看自己php版本对应在这里找
    PECL :: Package :: rdkafka 6.0.0RC2 for Windows

    由于 php-rdkafka 依赖 librdkafka,linux 就需要先安装 librdkafka 后安装 php-rdkafka,而 windows 版本是如下几个文件,安装方法如下:

     

    (1). 将 librdkafka.dll 和 librdkafka.pdb 放入 PHP 安装的根目录下,而 php_rdkafka.dll 和 php_rdkafka.pdb 放入 PHP 安装目录的 ext 下。

    (2). php.ini 配置文件添加 extension=php_rdkafka.dll,最后重启 PHP。

    (3). php-m 或这 phpinfo (); 就可以查看到扩展了。

    使用:

    在linux下安装php-rdkafka扩展

    安装php-rdkafka 扩展 

        注意:git方式下载的话,需要先连接vpn
    一、先安装librdkafka
    >>>源码方式安装
      git clone https://github.com/edenhill/librdkafka
      librdkafka-master.zip
      unzip librdkafka-master.zip
      cd librdkafka-master
      ./configure make make install

    >>>yum方式安装[推荐这种安装]
      yum install librdkafka-devel
      yum install libtool


    二、下载扩展包:

      git clone https://github.com/arnaud-lb/php-rdkafka.git

    【网络不行,克隆不下来,直接下载zip包上传linux里】

    GitHub - arnaud-lb/php-rdkafka: Production-ready, stable Kafka client for PHP
      php-rdkafka-6.x.zip
      unzip php-rdkafka-6.x.zip
      cd php-rdkafka-6.x
      phpize
      ./configure   (若是不写with-php-config,则用默认安装的PHP配置版本;若系统安装了多个版本PHP,必须指定with-php-config)

           ./configure --with-php-config=./configure --with-php-config=/www/server/php/73/bin/php-config
      make && make install

    可能会报错,configure: error: Please reinstall the rdkafka distribution
    说明没安装librdkafka

    vim /etc/php.ini
    extension=rdkafka.so

    php -m | grep rdkafka 查看是否安装成功

    安装多个版本php,如何切换自己喜欢版本PHP为默认版本?



    ***使用kafka扩展

    生产者:
    $objRdKafka = new RdKafka\Producer();
    $objRdKafka->setLogLevel(LOG_DEBUG);
    $objRdKafka->addBrokers("localhost:9092");
    $oObjTopic = $objRdKafka->newTopic("test");
    // 从终端接收输入
    $oInputHandler = fopen('php://stdin', 'r');
    while (true) {
    echo "\nEnter messages:\n";
    $sMsg = trim(fgets($oInputHandler));
    // 空消息意味着退出
    if (empty($sMsg)) {
    break;
    }
    // 发送消息
    $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
    }

    echo "done\n";

    消费者:
    $objRdKafka = new RdKafka\Consumer();
    $objRdKafka->setLogLevel(LOG_DEBUG);
    $objRdKafka->addBrokers("localhost:9092");
    $oObjTopic = $objRdKafka->newTopic("test");
    /**
    * consumeStart
    * 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
    * 第二个参数标识从什么位置开始拉取消息,可选值为
    * RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
    * RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
    * RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
    */
    $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

    while (true) {
    // 第一个参数是分区,第二个参数是超时时间
    $oMsg = $oObjTopic->consume(0, 1000);

    // 没拉取到消息时,返回NULL
    if (!$oMsg) {
    usleep(10000);
    continue;
    }

    if ($oMsg->err) {
    echo $msg->errstr(), "\n";
    break;
    } else {
    echo $oMsg->payload, "\n";
    }
    }

    先启动生产,再启动消费会造成数据丢失,不在再队列中存着

  • 相关阅读:
    MIPI CSI-2笔记(22) -- CSI-2实现案例
    浅刷牛客链表题,逐步深入链表,理解链表
    nydusd 源码理解(一)
    SIP会话发起协议 - 先知道是什么(一)
    什么是鱼叉式网络钓鱼?
    osg给osg::Geometry(自己绘制的几何体)添加纹理(二)
    【机器学习】PyTorch中 tensor.detach() 和 tensor.data 的区别
    图解LeetCode——1700. 无法吃午餐的学生数量(难度:简单)
    Flex & Bison 开始
    CSS属性: 过度效果属性transition
  • 原文地址:https://blog.csdn.net/yuanzelin8/article/details/136539072