public RedisClusterClient newRedisClient(String hosts, String ports, String password) {
String[] hostArr = hosts.split(",");
int hostLen = hostArr.length;
String[] portArr = ports.split(",");
int portLen = portArr.length;
List nodeList = new ArrayList<>();
for (String host : hostArr) {
for (String port : portArr) {
int portNum = Integer.parseInt(port);
nodeList.add(RedisURI.builder().withHost(host).withPort(portNum)
//.withAuthentication("default", password)
.withPassword(password)
.build());
}
}
logger.info("redis host size:{},hosts:{},port size:{},ports{}", hostLen, hosts, portLen, ports);
RedisClusterClient clusterClient = RedisClusterClient.create(nodeList);
ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(60L))//设置自适应拓扑刷新超时,每次超时刷新一次,默认30s;
.closeStaleConnections(false)//刷新拓扑时是否关闭失效连接,默认true,isPeriodicRefreshEnabled()为true时生效;
.dynamicRefreshSources(
true)//从拓扑中发现新节点,并将新节点也作为拓扑的源节点,动态刷新可以发现全部节点并计算每个客户端的数量,设置false则只有初始节点为源和计算客户端数量;
.enableAllAdaptiveRefreshTriggers()//启用全部触发器自适应刷新拓扑,默认关闭;
.enablePeriodicRefresh(Duration.ofSeconds(60L))//开启定时拓扑刷新并设置周期;
.refreshTriggersReconnectAttempts(3)//长连接重新连接尝试n次才拓扑刷新
.build();
ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
.autoReconnect(true)//在连接丢失时开启或关闭自动重连,默认true;
.cancelCommandsOnReconnectFailure(true)//允许在重连失败取消排队命令,默认false;
// .decodeBufferPolicy(DecodeBufferPolicies.always())//设置丢弃解码缓冲区的策略,以回收内存;always:解码后丢弃,最大内存效率;alwaysSome:解码后丢弃一部分;ratio(n)基于比率丢弃,n/(1+n),通常用1-10对应50%-90%;
.disconnectedBehavior(
ClientOptions.DisconnectedBehavior.DEFAULT)//设置连接断开时命令的调用行为,默认启用重连;DEFAULT:启用时重连中接收命令,禁用时重连中拒绝命令;ACCEPT_COMMANDS:重连中接收命令;REJECT_COMMANDS:重连中拒绝命令;
// .maxRedirects(5)//当键从一个节点迁移到另一个节点,集群重定向次数,默认5;
// .nodeFilter(nodeFilter)//设置节点过滤器
// .pingBeforeActivateConnection(true)//激活连接前设置PING,默认true;
// .protocolVersion(ProtocolVersion.RESP3)//设置协议版本,默认RESP3;
// .publishOnScheduler(false)//使用专用的调度器发出响应信号,默认false,启用时数据信号将使用服务的多线程发出;
// .requestQueueSize(requestQueueSize)//设置每个连接请求队列大小;
// .scriptCharset(scriptCharset)//设置Lua脚本编码为byte[]的字符集,默认StandardCharsets.UTF_8;
// .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)).keepAlive(true).tcpNoDelay(true).build())//设置低级套接字的属性
// .sslOptions(SslOptions.builder().build())//设置ssl属性
// .suspendReconnectOnProtocolFailure(false)//当重新连接遇到协议失败时暂停重新连接(SSL验证,连接失败前PING),默认值为false;
// .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(10)))//设置超时来取消和终止命令;
.topologyRefreshOptions(clusterTopologyRefreshOptions)//设置拓扑更新设置
.validateClusterNodeMembership(true)//在允许连接到集群节点之前,验证集群节点成员关系,默认值为true;
.build();
clusterClient.setDefaultTimeout(Duration.ofSeconds(10L));
clusterClient.setOptions(clusterClientOptions);
return clusterClient;
}
io.lettuce
lettuce-core
6.1.8.RELEASE