websocket作为java后端与web端长链接的工具,一般来说java后端是作为server端存在的。像一些简易版的聊天室,都是通过java后端作为server端进行转发的。
但是有时候,java后端也可以作为客户端进行存在的。本文采用 java-websocket 这个工具类,讲述了如何使用 java后端搭建 springboot版本的websocket客户端。
实现如下机制:
1、websocket client 客户端
2、定时向server发送心跳
3、断开重连
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.79</version>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
无论是项目还是作为依赖包存在,springboot中配置文件都是很重要的存在。目前主要对以下几个属性进行定义
wsUrl: websocket服务端地址
heartbeatInterval: 心跳机制,一般websocket client 都需要心跳,避免断线
enableHeartbeat: 开启心跳
enableReconnection: 是否开启断开重连
application.yml
cancan:
websocket:
client:
config:
wsUrl: ws://localhost:8082
enableHeartbeat: true
heartbeatInterval: 20000
enableReconnection: true
WebsocketClientConfiguration
/**
* @program: cancan-java-share
* @description: websocket客户端配置
* @author: czchen
* @date: 2022-06-29 10:43:04
*/
@Configuration
@ConfigurationProperties(prefix="cancan.websocket.client.config")
public class WebsocketClientConfiguration {
/**
* websocket server ws://ip:port
*/
private String wsUrl;
/**
* 是否启用心跳监测 默认开启
*/
private Boolean enableHeartbeat;
/**
* 心跳监测间隔 默认20000毫秒
*/
private Integer heartbeatInterval;
/**
* 是否启用重连接 默认启用
*/
private Boolean enableReconnection;
public String getWsUrl() {
return wsUrl;
}
public void setWsUrl(String wsUrl) {
this.wsUrl = wsUrl;
}
public Boolean getEnableHeartbeat() {
return enableHeartbeat;
}
public void setEnableHeartbeat(Boolean enableHeartbeat) {
this.enableHeartbeat = enableHeartbeat;
}
public Integer getHeartbeatInterval() {
return heartbeatInterval;
}
public void setHeartbeatInterval(Integer heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
public Boolean getEnableReconnection() {
return enableReconnection;
}
public void setEnableReconnection(Boolean enableReconnection) {
this.enableReconnection = enableReconnection;
}
}
定义了连接、监听、断开、出错
WebsocketRunClient.java
/**
* @program: cancan-java-share
* @description: websocket客户端
* @author: czchen
* @date: 2022-06-29 13:40:59
*/
@Slf4j
public class WebsocketRunClient extends WebSocketClient {
public WebsocketRunClient(URI serverUri) {
super(serverUri);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("[websocket] Websocket客户端连接成功");
}
@Override
public void onMessage(String msg) {
log.info("[websocket] 收到消息:{}",msg);
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.info("[websocket] Websocket客户端关闭");
System.out.println("Connection closed by " + (remote ? "remote peer" : "us") + " Code: " + code + " Reason: " + reason);
}
@Override
public void onError(Exception e) {
log.info("[websocket] Websocket客户端出现异常, 异常原因为:{}",e.getMessage());
}
}
将websocket client 交给 spring 进行管理
/**
* @program: cancan-java-share
* @description: 创建Websocket客户端的Bean
* @author: czchen
* @date: 2022-06-29 13:52:37
*/
@Slf4j
@Configuration
public class WebsocketClientBeanConfig {
@Bean
public WebsocketRunClient websocketRunClient(WebsocketClientConfiguration websocketClientConfiguration){
String wsUrl = websocketClientConfiguration.getWsUrl();
Boolean enableHeartbeat = websocketClientConfiguration.getEnableHeartbeat();
Integer heartbeatInterval = websocketClientConfiguration.getHeartbeatInterval();
Boolean enableReconnection = websocketClientConfiguration.getEnableReconnection();
try {
WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(wsUrl));
websocketRunClient.connect();
websocketRunClient.setConnectionLostTimeout(0);
new Thread(()->{
while (true){
try {
Thread.sleep(heartbeatInterval);
if(enableHeartbeat){
websocketRunClient.send("[websocket] 心跳检测");
log.info("[websocket] 心跳检测");
}
} catch (Exception e) {
log.error("[websocket] 发生异常{}",e.getMessage());
try {
if(enableReconnection){
log.info("[websocket] 重新连接");
websocketRunClient.reconnect();
websocketRunClient.setConnectionLostTimeout(0);
}
}catch (Exception ex){
log.error("[websocket] 重连异常,{}",ex.getMessage());
}
}
}
}).start();
return websocketRunClient;
} catch (URISyntaxException ex) {
log.error("[websocket] 连接异常,{}",ex.getMessage());
}
return null;
}
}
/**
* @program: cancan-java-share
* @description: Websocket客户端Controller
* @author: czchen
* @date: 2022-06-29 14:24:07
*/
@RestController
public class WebsocketClientController {
@Autowired
private WebsocketRunClient websocketRunClient;
@RequestMapping("/send")
public String send(){
websocketRunClient.send("我是客户端,发送数据...");
return "发送成功";
}
}
2022-06-29 16:42:29.434 INFO 19984 --- [tReadThread-130] c.c.w.websocket.WebsocketRunClient : [websocket] Websocket客户端连接成功
2022-06-29 16:42:29.470 INFO 19984 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2022-06-29 16:42:29.620 INFO 19984 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2022-06-29 16:42:29.628 INFO 19984 --- [ main] c.c.wsclient.WebsocketClientApplication : Started WebsocketClientApplication in 1.877 seconds (JVM running for 3.195)
2022-06-29 16:42:49.374 INFO 19984 --- [ Thread-110] c.c.w.config.WebsocketClientBeanConfig : [websocket] 心跳检测
2022-06-29 16:43:09.384 INFO 19984 --- [ Thread-110] c.c.w.config.WebsocketClientBeanConfig : [websocket] 心跳检测
同时,在server端,每20s就会接收到心跳

如果按照以上的不成功,可以参考以下源码地址