• 大数据中间件——Kafka


    Kafka安装配置

    首先我们把kafka的安装包上传到虚拟机中:

    解压到对应的目录并修改对应的文件名:

    首先我们来到kafka的config目录,我们第一个要修改的文件就是server.properties文件,修改内容如下:

    1. # Licensed to the Apache Software Foundation (ASF) under one or more
    2. # contributor license agreements. See the NOTICE file distributed with
    3. # this work for additional information regarding copyright ownership.
    4. # The ASF licenses this file to You under the Apache License, Version 2.0
    5. # (the "License"); you may not use this file except in compliance with
    6. # the License. You may obtain a copy of the License at
    7. #
    8. # http://www.apache.org/licenses/LICENSE-2.0
    9. #
    10. # Unless required by applicable law or agreed to in writing, software
    11. # distributed under the License is distributed on an "AS IS" BASIS,
    12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13. # See the License for the specific language governing permissions and
    14. # limitations under the License.
    15. # see kafka.server.KafkaConfig for additional details and defaults
    16. ############################# Server Basics #############################
    17. # The id of the broker. This must be set to a unique integer for each broker.
    18. # kafka在整个集群中的身份标识,集群中的id是唯一的
    19. broker.id=0
    20. ############################# Socket Server Settings #############################
    21. # The address the socket server listens on. It will get the value returned from
    22. # java.net.InetAddress.getCanonicalHostName() if not configured.
    23. # FORMAT:
    24. # listeners = listener_name://host_name:port
    25. # EXAMPLE:
    26. # listeners = PLAINTEXT://your.host.name:9092
    27. #listeners=PLAINTEXT://:9092
    28. # Hostname and port the broker will advertise to producers and consumers. If not set,
    29. # it uses the value for "listeners" if configured. Otherwise, it will use the value
    30. # returned from java.net.InetAddress.getCanonicalHostName().
    31. #advertised.listeners=PLAINTEXT://your.host.name:9092
    32. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    33. #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    34. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    35. num.network.threads=3
    36. # The number of threads that the server uses for processing requests, which may include disk I/O
    37. num.io.threads=8
    38. # The send buffer (SO_SNDBUF) used by the socket server
    39. socket.send.buffer.bytes=102400
    40. # The receive buffer (SO_RCVBUF) used by the socket server
    41. socket.receive.buffer.bytes=102400
    42. # The maximum size of a request that the socket server will accept (protection against OOM)
    43. socket.request.max.bytes=104857600
    44. ############################# Log Basics #############################
    45. # A comma separated list of directories under which to store log files
    46. # 存储kafka数据的位置,默认存储在临时文件夹,要修改成自己的文件夹
    47. log.dirs=/opt/model/kafka/datas
    48. # The default number of log partitions per topic. More partitions allow greater
    49. # parallelism for consumption, but this will also result in more files across
    50. # the brokers.
    51. num.partitions=1
    52. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    53. # This value is recommended to be increased for installations with data dirs located in RAID array.
    54. num.recovery.threads.per.data.dir=1
    55. ############################# Internal Topic Settings #############################
    56. # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    57. # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    58. offsets.topic.replication.factor=1
    59. transaction.state.log.replication.factor=1
    60. transaction.state.log.min.isr=1
    61. ############################# Log Flush Policy #############################
    62. # Messages are immediately written to the filesystem but by default we only fsync() to sync
    63. # the OS cache lazily. The following configurations control the flush of data to disk.
    64. # There are a few important trade-offs here:
    65. # 1. Durability: Unflushed data may be lost if you are not using replication.
    66. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    67. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    68. # The settings below allow one to configure the flush policy to flush data after a period of time or
    69. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    70. # The number of messages to accept before forcing a flush of data to disk
    71. #log.flush.interval.messages=10000
    72. # The maximum amount of time a message can sit in a log before we force a flush
    73. #log.flush.interval.ms=1000
    74. ############################# Log Retention Policy #############################
    75. # The following configurations control the disposal of log segments. The policy can
    76. # be set to delete segments after a period of time, or after a given size has accumulated.
    77. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    78. # from the end of the log.
    79. # The minimum age of a log file to be eligible for deletion due to age
    80. log.retention.hours=168
    81. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    82. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    83. #log.retention.bytes=1073741824
    84. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    85. log.segment.bytes=1073741824
    86. # The interval at which log segments are checked to see if they can be deleted according
    87. # to the retention policies
    88. log.retention.check.interval.ms=300000
    89. ############################# Zookeeper #############################
    90. # Zookeeper connection string (see zookeeper docs for details).
    91. # This is a comma separated host:port pairs, each corresponding to a zk
    92. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    93. # You can also append an optional chroot string to the urls to specify the
    94. # root directory for all kafka znodes.
    95. # 连接的zookeeper集群,需要将集群中部署zookeeper的所有节点写入
    96. # 首先,在zookeeper中,数据的存储是以目录树的方式去存储的,如果后期我们的kafka的数据要修改,在不做任何的修改的情况下,默认是存储在zookeeper根目录下的,这样我们想要单独提取出zookeeper的数据就非常的麻烦
    97. # 所以我们将kafka的数据的单独存储在一个文件分支中,这就是我们为什么要在最后写一个[/kafka]的原因。
    98. # 前面写多个节点是为了防止单个zookeeper节点无法连接可以使用其他的zookeeper节点
    99. zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka
    100. # Timeout in ms for connecting to zookeeper
    101. zookeeper.connection.timeout.ms=6000
    102. ############################# Group Coordinator Settings #############################
    103. # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    104. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    105. # The default value for this is 3 seconds.
    106. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    107. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    108. group.initial.rebalance.delay.ms=0

    主要修改三个部分,一个是唯一标识id,kafka的文件存储路径,一个是zookeeper的节点地址。

    然后我们将kafka的安装包分发到其他的节点中。

    注意在分发完成之后,不要忘记修改不同节点中的唯一标识id的值。

    然后我们就可以启动kafka的服务了,注意在启动kafka的服务之前,我们必须要启动zookeeper的服务。

    kafka和zookeeper一样,也是要在每个节点中都分别执行启动脚本,并且kafka的启动脚本需要手动指定配置文件:

    ./kafka-server-start.sh -daemon ../config/server.properties

    注意,我的kafka的地址和你们的可能不一样,但是只需要知道启动命令在bin目录下,配置文件在conf目录下即可,我们在三台虚拟机上分别执行脚本:

    当我们看到在集群中出现kafka的进程之后,就表示我们的kafka集群启动成功了。

  • 相关阅读:
    【王道】计算机组成原理第二章数据的表示与运算(二)
    Pr:导出设置之基本视频设置
    Day22力扣打卡
    curl用法:查看响应时间
    自动化测试框架 —— pytest框架入门篇
    无监督学习KMeans学习笔记和实例
    激光雷达物体检测(一):初步认识
    C++贪心算法之乘船问题
    【linux命令讲解大全】057.UNIX实用命令详解:col、colrm和dircolors的用法
    关于前面介绍过的MPCC控制改为使用Yalmip解算器解以及MPC控制的一点想法
  • 原文地址:https://blog.csdn.net/hssjsh/article/details/133851362