SpringBoot整合WebSocket实现客户端和服务端基本交互
服务端(Server)
依赖(基本SpringBoot不表明了)
<!--websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
Socket配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
Socket实现
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/*
* @Author GODV
* @Description WebSocket启动
* @Date 18:01 2021/6/21
**/
@Component
@ServerEndpoint(value = "/{ip}")
public class WebSocketServer {
private final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
// 在多线程访问的时候,使用线程安全的ConCurrentHashMap对象
private static ConcurrentHashMap<String, Session> connections = new ConcurrentHashMap<>();
private static ApplicationContext applicationContext;
public static void setApplicationContext(ApplicationContext applicationContext) {
WebSocketServer.applicationContext = applicationContext;
}
/**
* 打开连接
*
* @param session
* @param ip
*/
@OnOpen
public void onOpen(Session session, @PathParam("ip") String ip) {
log.info("[WebSocketServer] find {}, this client is connected to server",ip);
// 接收到客户端的请求,可以做一些其他业务逻辑处理,比如可以把该IP存储到数据库
// 避免当前服务断开后,与客户端服务失去连接
// 这时就可以使用到预加载处理,项目当中自定义的MyApplicationRunner类
connections.put(ip, session);
send(ip,"******** Please get me your pc uid now ********");
}
/**
* 接收消息
*
* @param text
*/
@OnMessage
public void onMessage(String text) {
}
/**
* 异常处理
*
* @param throwable
*/
@OnError
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
/**
* 关闭连接
*
* @param ip
*/
@OnClose
public void onClosing(@PathParam("ip") String ip) throws IOException {
connections.remove(ip);
}
/**
* 根据IP发送消息
*
* @param ip
* @param text
*/
public void send(String ip, String text) {
try {
Session session = connections.get(ip);
if (session != null && session.isOpen()) {
session.getAsyncRemote().sendText(text);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 遍历群发消息
*
* @param text
*/
public void send(String text) {
for (ConcurrentHashMap.Entry<String, Session> entry : connections.entrySet()) {
send(entry.getKey(), text);
}
}
}
客户端
Client实现
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
@Component
@ClientEndpoint
public class SocketClient {
private final Logger log = LoggerFactory.getLogger(SocketClient.class);
// 服务端的IP和端口号
@Value("${websocket.server.url}")
private String serverUri = "127.0.0.1:8082";
private Session session;
@PostConstruct
void init() {
try {
// 本机地址
log.info("[init] Client start connection to server...");
String hostAddress = InetAddress.getLocalHost().getHostAddress();
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
String wsUrl = "ws://" + serverUri + "/" + hostAddress;
URI uri = URI.create(wsUrl);
session = container.connectToServer(SocketClient.class, uri);
} catch (DeploymentException | IOException e) {
log.warn("[init] Client connection to {}, {}",serverUri,e.getMessage());
}
}
/**
* 打开连接
* @param session
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
}
/**
* 接收消息
* @param text
*/
@OnMessage
public void onMessage(String text) {
log.info("[onMessage] accept msg form server:{}",text);
}
/**
* 异常处理
* @param throwable
*/
@OnError
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
/**
* 关闭连接
*/
@OnClose
public void onClosing() throws IOException {
session.close();
}
/**
* 主动发送消息
*/
public void send(String text) {
session.getAsyncRemote().sendText(text);
}
}
打完收工