• Flink 中kafka broker缩容导致Task一直重启


    背景

    Flink版本 1.12.2
    Kafka 客户端 2.4.1
    在公司的Flink平台运行了一个读Kafka计算DAU的流程序,由于公司Kafka的缩容,直接导致了该程序一直在重启,重启了一个小时都还没恢复(具体的所容操作是下掉了四台kafka broker,而当时flink配置了12台kafka broker),当时具体的现场如下:

    JobManaer上的日志如下:
    2023-10-07 10:02:52.975 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: TableSourceScan(table=[[default_catalog, default_database, ubt_start, watermark=[-(LOCALTIMESTAMP, 1000:INTERVAL SECOND)]]]) (34/64) (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED on container_e08_1690538387235_2599_01_000010 @ task-xxxx-shanghai.emr.aliyuncs.com (dataPort=xxxx).
    org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: null
            at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
            at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
            at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
            at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)
            at java.lang.Thread.run(Thread.java:750)
    
    
    对应的 TaskManager(task-xxxx-shanghai.emr.aliyuncs.com)上的日志如下:
    
    2023-10-07 10:02:24.604 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxxx] Connection to node 46129 (sh-bs-b1-303-i14-kafka-129-46.ximalaya.local/192.168.129.46:9092) could not be established. Broker may not be available.
    
    
    2023-10-07 10:02:52.939 WARN  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(t) (34/64)#0 (e33d9ad0196a71e8eb551c181eb779b5) switched from RUNNING to FAILED.
    org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: null
            at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
            at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
            at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
            at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913)
            at java.lang.Thread.run(Thread.java:750)
    
    2023-10-07 10:04:58.205 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Connection to node -4 (xxxx:909) could not be established. Broker may not be available.
    2023-10-07 10:04:58.205 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxx] Bootstrap broker sxxxx:909 (id: -4 rack: null) disconnected
    2023-10-07 10:04:58.206 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Connection to node -5 (xxxx:9092) could not be established. Broker may not be available.
    2023-10-07 10:04:58.206 WARN  org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=xxxxu] Bootstrap broker xxxx:9092 (id: -5 rack: null) disconnected
    
    
    2023-10-07 10:08:15.541 WARN  org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(xxx) switched from RUNNING to FAILED.
    org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    当时Flink中kafka source的相关配置如下:

    scan.topic-partition-discovery.interval  300000
    restart-strategy.type fixed-delay
    restart-strategy.fixed-delay.attempts 50000000
    jobmanager.execution.failover-strategy region
    
    • 1
    • 2
    • 3
    • 4

    结论以及解决

    目前在kafka 消费端有两个参数default.api.timeout.ms(默认60000),request.timeout.ms(默认30000),这两个参数来控制kakfa的客户端从服务端请求超时,也就是说每次请求的超时时间是30s(当然不是一次请求broker的超时时间,见后续其他),超时之后可以再重试,如果在60s内请求没有得到任何回应,则会报TimeOutException,具体的见如下分析,
    我们在flink kafka connector中通过设置如下参数来解决:

    `properties.default.api.timeout.ms` = '600000',
    `properties.request.timeout.ms` = '5000',
    // max.block.ms是设置kafka producer的超时
    `properties.max.block.ms` = '600000',
    
    • 1
    • 2
    • 3
    • 4

    分析

    在Flink中对于Kafka的Connector的DynamicTableSourceFactoryKafkaDynamicTableFactory,这里我们只讨论kafka作为source的情况,
    而该类的方法createDynamicTableSource最终会被调用,至于具体的调用链可以参考Apache Hudi初探(四)(与flink的结合)–Flink Sql中hudi的createDynamicTableSource/createDynamicTableSink/是怎么被调用–只不过把Sink改成Source就可以了,所以最终会到KafkaDynamicSource类:

    @Override
        public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
            final DeserializationSchema keyDeserialization =
                    createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
    
            final DeserializationSchema valueDeserialization =
                    createDeserialization(context, valueDecodingFormat, valueProjection, null);
    
            final TypeInformation producedTypeInfo =
                    context.createTypeInformation(producedDataType);
    
            final FlinkKafkaConsumer kafkaConsumer =
                    createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);
    
            return SourceFunctionProvider.of(kafkaConsumer, false);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    该类的getScanRuntimeProvider方法会被调用,所有kafka相关的操作都可以追溯到FlinkKafkaConsumer类(继承FlinkKafkaConsumerBase)中,对于该类重点的方法如下:

        @Override
        public final void initializeState(FunctionInitializationContext context) throws Exception {
    
            OperatorStateStore stateStore = context.getOperatorStateStore();
    
            this.unionOffsetStates =
                    stateStore.getUnionListState(
                            new ListStateDescriptor<>(
                                    OFFSETS_STATE_NAME,
                                    createStateSerializer(getRuntimeContext().getExecutionConfig())));
    
           ... 
        }
    
       @Override
        public void open(Configuration configuration) throws Exception {
            // determine the offset commit mode
            this.offsetCommitMode =
                    OffsetCommitModes.fromConfiguration(
                            getIsAutoCommitEnabled(),
                            enableCommitOnCheckpoints,
                            ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
    
            // create the partition discoverer
            this.partitionDiscoverer =
                    createPartitionDiscoverer(
                            topicsDescriptor,
                            getRuntimeContext().getIndexOfThisSubtask(),
                            getRuntimeContext().getNumberOfParallelSubtasks());
            this.partitionDiscoverer.open();
    
            subscribedPartitionsToStartOffsets = new HashMap<>();
            final List allPartitions = partitionDiscoverer.discoverPartitions();
            if (restoredState != null) {
                ...
            } else {
                // use the partition discoverer to fetch the initial seed partitions,
                // and set their initial offsets depending on the startup mode.
                // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
                // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily
                // determined
                // when the partition is actually read.
                switch (startupMode) {
                    。。。
                    default:
                        for (KafkaTopicPartition seedPartition : allPartitions) {
                            subscribedPartitionsToStartOffsets.put(
                                    seedPartition, startupMode.getStateSentinel());
                        }
                }
    
                if (!subscribedPartitionsToStartOffsets.isEmpty()) {
                    switch (startupMode) {
                        ...
                        case GROUP_OFFSETS:
                            LOG.info(
                                    "Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                                    getRuntimeContext().getIndexOfThisSubtask(),
                                    subscribedPartitionsToStartOffsets.size(),
                                    subscribedPartitionsToStartOffsets.keySet());
                    }
                } else {
                    LOG.info(
                            "Consumer subtask {} initially has no partitions to read from.",
                            getRuntimeContext().getIndexOfThisSubtask());
                }
            }
    
            this.deserializer.open(
                    RuntimeContextInitializationContextAdapters.deserializationAdapter(
                            getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
        }
    
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            if (subscribedPartitionsToStartOffsets == null) {
                throw new Exception("The partitions were not set for the consumer");
            }
    
            // initialize commit metrics and default offset callback method
            this.successfulCommits =
                    this.getRuntimeContext()
                            .getMetricGroup()
                            .counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
            this.failedCommits =
                    this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
            final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
    
            this.offsetCommitCallback =
                    new KafkaCommitCallback() {
                        @Override
                        public void onSuccess() {
                            successfulCommits.inc();
                        }
    
                        @Override
                        public void onException(Throwable cause) {
                            LOG.warn(
                                    String.format(
                                            "Consumer subtask %d failed async Kafka commit.",
                                            subtaskIndex),
                                    cause);
                            failedCommits.inc();
                        }
                    };
    
            // mark the subtask as temporarily idle if there are no initial seed partitions;
            // once this subtask discovers some partitions and starts collecting records, the subtask's
            // status will automatically be triggered back to be active.
            if (subscribedPartitionsToStartOffsets.isEmpty()) {
                sourceContext.markAsTemporarilyIdle();
            }
    
            LOG.info(
                    "Consumer subtask {} creating fetcher with offsets {}.",
                    getRuntimeContext().getIndexOfThisSubtask(),
                    subscribedPartitionsToStartOffsets);
            // from this point forward:
            //   - 'snapshotState' will draw offsets from the fetcher,
            //     instead of being built from `subscribedPartitionsToStartOffsets`
            //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
            //     Kafka through the fetcher, if configured to do so)
            this.kafkaFetcher =
                    createFetcher(
                            sourceContext,
                            subscribedPartitionsToStartOffsets,
                            watermarkStrategy,
                            (StreamingRuntimeContext) getRuntimeContext(),
                            offsetCommitMode,
                            getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                            useMetrics);
    
            if (!running) {
                return;
            }
    
            if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
                kafkaFetcher.runFetchLoop();
            } else {
                runWithPartitionDiscovery();
            }
        }
    
        @Override
        public final void snapshotState(FunctionSnapshotContext context) throws Exception {
            ...
                    HashMap currentOffsets = fetcher.snapshotCurrentState();
    
                    if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                        // the map cannot be asynchronously updated, because only one checkpoint call
                        // can happen
                        // on this function at a time: either snapshotState() or
                        // notifyCheckpointComplete()
                        pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                    }
    
                    for (Map.Entry kafkaTopicPartitionLongEntry :
                            currentOffsets.entrySet()) {
                        unionOffsetStates.add(
                                Tuple2.of(
                                        kafkaTopicPartitionLongEntry.getKey(),
                                        kafkaTopicPartitionLongEntry.getValue()));
                    }
              ... 
            }
        }
    
        @Override
        public final void notifyCheckpointComplete(long checkpointId) throws Exception {
                ...
                fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
                ...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173

    主要是initializeStateopen,run,snapshotState,notifyCheckpointComplete这四个方法,下面带着问题逐一介绍一下:
    注意:对于initializeStateopen方法的先后顺序,可以参考StreamTask类,其中如下的调用链:

    invoke()
     ||
     \/
    beforeInvoke()
     ||
     \/
    operatorChain.initializeStateAndOpenOperators
     ||
     \/
    FlinkKafkaConsumerBase.initializeState
     ||
     \/
    FlinkKafkaConsumerBase.open
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    就可以知道 initializeState方法的调用是在open之前的

    initializeState方法

    这里做的事情就是从持久化的State中恢复kafkaTopicOffset信息,我们这里假设是第一次启动

    open方法

    • offsetCommitMode
      offsetCommitMode = OffsetCommitModes.fromConfiguration 这里获取设置的kafka offset的提交模式,这里会综合enable.auto.commit的配置(默认是true),enableCommitOnCheckpoints默认是true,checkpointing设置为true(默认是false),综合以上得到的值为OffsetCommitMode.ON_CHECKPOINTS
    • partitionDiscoverer
      这里主要是进行kafka的topic的分区发现,主要路程是 partitionDiscoverer.discoverPartitions,这里的涉及的流程如下:
      AbstractPartitionDiscoverer.discoverPartitions
        ||
        \/
      AbstractPartitionDiscoverer.getAllPartitionsForTopics 
        ||
        \/
      KafkaPartitionDiscoverer.kafkaConsumer.partitionsFor
        ||
        \/
      KafkaConsumer.partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs)) //这里的defaultApiTimeoutMs 来自于*default.api.timeout.ms*
        ||
        \/
      Fetcher.getTopicMetadata //这里面最后抛出 new TimeoutException("Timeout expired while fetching topic metadata");
        ||
        \/
      Fetcher.sendMetadataRequest => NetworkClient.leastLoadedNode //这里会根据某种策略选择配置的broker的节点
        ||
        \/
      client.poll(future, timer) => NetworkClient.poll => selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // 这里的 *defaultRequestTimeoutMs* 来自配置*request.timeout.ms*
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      综上所述,discoverPartitions做的就是根据某种策略选择配置的broker节点,对每个节点进行请求,request.timeout.ms超时后,再根据策略选择broker,直至总的时间达到了配置的default.api.timeout.ms,这里默认default.api.timeout.ms 为60秒,request.timeout.ms为30秒
    • subscribedPartitionsToStartOffsets
      根据startupMode模式,默认是StartupMode.GROUP_OFFSETS(默认从上次消费的offset开始消费),设置开启的kafka offset,这在kafkaFetcher中会用到

    run方法

    • 设置一些指标successfulCommits/failedCommits
    • KafkaFetcher
      这里主要是从kafka获取数据以及如果有分区发现则循环进kafka的topic分区发现,这里会根据配置scan.topic-partition-discovery.interval默认配置为0,实际中设置的为300000,即5分钟。该主要的流程为在方法runWithPartitionDiscovery:
        private void runWithPartitionDiscovery() throws Exception {
            final AtomicReference discoveryLoopErrorRef = new AtomicReference<>();
            createAndStartDiscoveryLoop(discoveryLoopErrorRef);
      
            kafkaFetcher.runFetchLoop();
      
            // make sure that the partition discoverer is waked up so that
            // the discoveryLoopThread exits
            partitionDiscoverer.wakeup();
            joinDiscoveryLoopThread();
      
            // rethrow any fetcher errors
            final Exception discoveryLoopError = discoveryLoopErrorRef.get();
            if (discoveryLoopError != null) {
                throw new RuntimeException(discoveryLoopError);
            }
        }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • createAndStartDiscoveryLoop 这个会启动单个线程以while sleep方式实现以scan.topic-partition-discovery.interval为间隔来轮询进行Kafka的分区发现,注意这里会吞没Execption,并不会抛出异常

         private void createAndStartDiscoveryLoop(AtomicReference discoveryLoopErrorRef) {
           discoveryLoopThread =
                   new Thread(
                           ...
                           while (running) {
                             ...
                                       try {
                                           discoveredPartitions =
                                                   partitionDiscoverer.discoverPartitions();
                                       } catch (AbstractPartitionDiscoverer.WakeupException
                                               | AbstractPartitionDiscoverer.ClosedException e) {
                                         
                                           break;
                                       }
                                       if (running && !discoveredPartitions.isEmpty()) {
                                           kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
                                       }
        
                                       if (running && discoveryIntervalMillis != 0) {
                                           try {
                                               Thread.sleep(discoveryIntervalMillis);
                                           } catch (InterruptedException iex) {
                                               break;
                                           }
                                       }
                                   }
                               } catch (Exception e) {
                                   discoveryLoopErrorRef.set(e);
                               } finally {
                                   // calling cancel will also let the fetcher loop escape
                                   // (if not running, cancel() was already called)
                                   if (running) {
                                       cancel();
                                   }
                               }
                           },
                           "Kafka Partition Discovery for "
                                   + getRuntimeContext().getTaskNameWithSubtasks());
        
           discoveryLoopThread.start();
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41

        这里的kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);subscribedPartitionStates变量会把发现分区信息保存起来,这在kafkaFetcher.runFetchLoop中会设置已经提交的offset信息,并且会在snapshotState会用到

      • kafkaFetcher.runFetchLoop 这里会从kafka拉取数据,并设置kafka的offset,具体的流程如下:

         runFetchLoop 
            ||
            \/
          subscribedPartitionStates 这里会获取*subscribedPartitionStates*变量
            ||
            \/
          partitionConsumerRecordsHandler
            ||
            \/
          emitRecordsWithTimestamps
            ||
            \/
          emitRecordsWithTimestamps
            ||
            \/
          partitionState.setOffset(offset);
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16

        这里的offset就是从消费的kafka记录中获取的

    snapshotState方法

    这里会对subscribedPartitionStates中的信息进行处理,主要是加到pendingOffsetsToCommit变量中

    • offsetCommitMode
      这里上面说到是OffsetCommitMode.ON_CHECKPOINTS,如果是ON_CHECKPOINTS,则会从fetcher.snapshotCurrentState获取subscribedPartitionStates
      并加到pendingOffsetsToCommit,并持久化到unionOffsetStates中,这实际的kafka offset commit操作在notifyCheckpointComplete中,

    notifyCheckpointComplete方法

    获取到要提交的kafka offset信息,并持久化保存kafka中

    其他

    之前说的request.timeout.ms 并不是一次请求,而是从目前的实现来看,kafka client 会对访问失败的broker,一直访问直到超时(默认30秒):

    [2023-10-13 13:42:18] 7673 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:18] 7723 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:18] 7773 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:18] 7823 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:18] 7874 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:18] 7924 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:18] 7974 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:18] 8025 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:18] 8075 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:18] 8125 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:19] 8175 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:19] 8226 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:19] 8276 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:19] 8326 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:19] 8376 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:19] 8426 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:19] 8477 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:19] 8527 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:19] 8577 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:19] 8627 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    [2023-10-13 13:42:19] 8678 [main] TRACE o.a.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Found least loaded connecting node xxxx:9092 (id: -11 rack: null)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    参考

  • 相关阅读:
    React批处理原理及性能优化实践
    道可云元宇宙每日资讯|德国研发元宇宙虚拟战场训练平台
    使用Hbuilder+Xcode打包iOS app前期准备
    虚拟机桥接模式无法联网
    新品发布 | Cloudpods 3.9.2 版本上线!
    02-URL与资源
    小程序自定义导航栏
    nginx配置代理转发,顺便写点负载均衡
    网页报告不能直接转换成Word、PDF格式怎么办?Spire.doc控件可以轻松解决
    网络语言错误是指在编程中出现的错误或故障,导致程序无法正常运行或产生意外的结果
  • 原文地址:https://blog.csdn.net/monkeyboy_tech/article/details/133786992