• 记一次kafka消息积压的排查


    kafka消息积压报警,首先进行了自查,这个现象频频出现,之前每次都是先重新分配分区或者回溯(消息可丢弃防止大量积压消费跟不上)。
    根据手册首先排查下消息拉取是否正常,看到了消息拉取线程是waiting状态,然后看到kafka这块逻辑是消费线程阻塞了拉取线程。

    对比了其他消费者,消费线程都是在runing和waiting中切换,但是当前消费者的消费状态一直处于runing,阻塞了消息拉取线程。

    问题定位成功,然后去看了线程的栈信息,发现是里面的逻辑卡在了socket.read,当即想到了socket的超时,去看了代码逻辑,是httpclinet,果然没有设置超时时间。

    按照定义解释为如果sockettimeout设置为0的话,应该是等待无限长的时间(直到进程重启),这里有个老哥用个更详细的排查https://cloud.tencent.com/developer/news/698654
    所以解决方案就是在请求是设置一下:
    使用的是fluent api

    import org.apache.http.client.fluent.Request;
     Request request = Request.Post(uri).connectTimeout(1000).socketTimeout(1000);
                String response = request.execute().returnContent().asString();
    
    • 1
    • 2
    • 3

    后面考虑到这个请求量比较大,可能会影响交易流程(这次的问题查询是一个同步信息接口),因此决定不使用公共连接池,写法如下:

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.net.ssl.SSLContext;
    import java.io.IOException;
    import java.security.KeyManagementException;
    import java.security.NoSuchAlgorithmException;
    
    /**
     * @version 1.0
     */
    public class HttpFluentUtil {
        private Logger logger = LoggerFactory.getLogger(HttpFluentUtil.class);
        private final static int MaxPerRoute = 100;
        private final static int MaxTotal = 200;
        final static PoolingHttpClientConnectionManager CONNMGR;
        final static HttpClient CLIENT;
        final static Executor executor;
    
        static {
            LayeredConnectionSocketFactory ssl = null;
            try {
                ssl = SSLConnectionSocketFactory.getSystemSocketFactory();
            } catch (final SSLInitializationException ex) {
                final SSLContext sslcontext;
                try {
                    sslcontext = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
                    sslcontext.init(null, null, null);
                    ssl = new SSLConnectionSocketFactory(sslcontext);
                } catch (final SecurityException ignore) {
                } catch (final KeyManagementException ignore) {
                } catch (final NoSuchAlgorithmException ignore) {
                }
            }
    
            final Registry<ConnectionSocketFactory> sfr = RegistryBuilder.<ConnectionSocketFactory>create()
                    .register("http", PlainConnectionSocketFactory.getSocketFactory())
                    .register("https", ssl != null ? ssl : SSLConnectionSocketFactory.getSocketFactory()).build();
    
            CONNMGR = new PoolingHttpClientConnectionManager(sfr);
            CONNMGR.setDefaultMaxPerRoute(MaxPerRoute);
            CONNMGR.setMaxTotal(MaxTotal);
            CLIENT = HttpClientBuilder.create().setConnectionManager(CONNMGR).build();
            executor = Executor.newInstance(CLIENT);
        }
    
        public static String Get(String uri, int connectTimeout, int socketTimeout) throws IOException {
            return executor.execute(Request.Get(uri).connectTimeout(connectTimeout).socketTimeout(socketTimeout))
                    .returnContent().asString();
        }
    
        public static String Post(String uri, int connectTimeout, int socketTimeout)
                throws IOException {
            return executor.execute(Request.Post(uri).socketTimeout(socketTimeout)
                    ).returnContent().asString();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
  • 相关阅读:
    EAV模型(实体-属性-值)的设计和低代码的处理方案(2)--数据的查询处理
    【Python-Django】基于TF-IDF算法的医疗推荐系统复现过程
    Linux下如何配置普通用户的sudo命令权限?
    刷爆力扣之最长连续递增序列
    分账管理有哪些功能?
    公司新来了个拿25K的测试,一介绍,原来是测试天花板级别的···
    【leetcode热题】比较版本号
    单向链表(c/c++)
    SpringBoot+Netty+Vue+Websocket实现在线推送/聊天系统
    AP51656 PWM和线性调光 LED车灯电源驱动IC 兼容替代PT4115 PT4205
  • 原文地址:https://blog.csdn.net/m0_56033865/article/details/136483749