SocketIO

基本配置 后端依赖 1 2 3 4 5 6 <dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>2.0.9</version> </dependency> 配置文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #自定义socketio配置,你可以直接硬编码,看个人喜好 socketio: # socketio请求地址 host: localhost # socketio端口 port: 8099 # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器 maxFramePayloadLength: 1048576 # 设置http交互最大内容长度 maxHttpContentLength: 1048576 # socket连接数大小(如只监听一个端口boss线程组为1即可) bossCount: 1 # 连接数大小 workCount: 100 # 允许客户请求 allowCustomRequests: true # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间 upgradeTimeout: 1000000 # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件 pingTimeout: 6000000 # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔 pingInterval: 25000 # 命名空间,多个以逗号分隔, namespaces: /test,/socketIO,/room,/push 配置类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 import com.corundumstudio.socketio.SocketConfig; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.SpringAnnotationScanner; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.Optional; @Configuration public class SocketIOConfig { @Value("${socketio.host}") private String host; @Value("${socketio.port}") private Integer port; @Value("${socketio.maxFramePayloadLength}") private int maxFramePayloadLength; @Value("${socketio.maxHttpContentLength}") private int maxHttpContentLength; @Value("${socketio.bossCount}") private int bossCount; @Value("${socketio.workCount}") private int workCount; @Value("${socketio.allowCustomRequests}") private boolean allowCustomRequests; @Value("${socketio.upgradeTimeout}") private int upgradeTimeout; @Value("${socketio.pingTimeout}") private int pingTimeout; @Value("${socketio.pingInterval}") private int pingInterval; @Value("${socketio.namespaces}") private String[] namespaces; @Bean public SocketIOServer socketIOServer() { SocketConfig socketConfig = new SocketConfig(); socketConfig.setTcpNoDelay(true); socketConfig.setSoLinger(0); com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); config.setSocketConfig(socketConfig); config.setHostname(host); config.setPort(port); config.setMaxFramePayloadLength(maxFramePayloadLength); config.setMaxHttpContentLength(maxHttpContentLength); config.setBossThreads(bossCount); config.setWorkerThreads(workCount); config.setAllowCustomRequests(allowCustomRequests); config.setUpgradeTimeout(upgradeTimeout); config.setPingTimeout(pingTimeout); config.setPingInterval(pingInterval); config.setOrigin(":*:"); //服务端 final SocketIOServer server = new SocketIOServer(config); //添加命名空间(如果你不需要命名空间,下面的代码可以去掉) Optional.ofNullable(namespaces).ifPresent(nss -> Arrays.stream(nss).forEach(server::addNamespace)); server.addNamespace("/test");//也可以这样定义 server.addNamespace("/socketIO"); return server; } //这个对象是用来扫描socketio的注解,比如 @OnConnect、@OnEvent @Bean public SpringAnnotationScanner springAnnotationScanner() { return new SpringAnnotationScanner(socketIOServer()); } } 服务启动/关闭 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import com.corundumstudio.socketio.SocketIOServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.stereotype.Component; @SpringBootApplication public class SocketioServerApplication { public static void main(String[] args) { SpringApplication.run(SocketioServerApplication.class, args); } } @Component @Slf4j class SocketIOServerRunner implements CommandLineRunner, DisposableBean { @Autowired private SocketIOServer socketIOServer; @Autowired private TestHandler testHandler; @Autowired private SocketioHandler socketioHandler; @Override public void run(String... args) throws Exception { //namespace分别交给各自的Handler监听,这样就可以隔离,只有客户端指定namespace,才能访问对应Handler。 //比如:http://localhost:9999/test?userId=12345 socketIOServer.getNamespace("/test").addListeners(testHandler); socketIOServer.getNamespace("/socketIO").addListeners(socketIOHandler); socketIOServer.start(); log.info("SocketIOServer==============================启动成功"); } @Override public void destroy() throws Exception { //如果用kill -9 这个监听是没用的,有可能会导致你服务kill掉了,但是socket服务没有kill掉 socketIOServer.stop(); log.info("SocketIOServer==============================关闭成功"); } } 用户信息缓存 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import com.corundumstudio.socketio.SocketIOClient; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * 这是存储用户的缓存信息 */ @Component public class ClientCache { //用于存储用户的socket缓存信息 private static ConcurrentHashMap<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap<>(); //保存用户信息 public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) { HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId); if (sessionIdClientCache == null) { sessionIdClientCache = new HashMap<>(); } sessionIdClientCache.put(sessionId, socketIOClient); concurrentHashMap.put(userId, sessionIdClientCache); } //获取用户信息 public HashMap<UUID, SocketIOClient> getUserClient(String userId) { return concurrentHashMap.get(userId); } //根据用户id和session删除用户某个session信息 public void deleteSessionClientByUserId(String userId, UUID sessionId) { concurrentHashMap.get(userId).remove(sessionId); } //删除用户缓存信息 public void deleteUserCacheByUserId(String userId) { concurrentHashMap.remove(userId); } } 监听客户端请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.OnConnect; import com.corundumstudio.socketio.annotation.OnDisconnect; import com.gzgs.socketio.common.cache.ClientCache; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; @Slf4j @Component public class SocketIOServerHandler { @Autowired private ClientCache clientCache; /** * 建立连接 * @param client 客户端的SocketIO */ @OnConnect public void onConnect(SocketIOClient client) { //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999?userId=12345 //下面两种是加了命名空间的,他会请求对应命名空间的方法(就类似你进了不同的房间玩游戏) //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/test?userId=12345 //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/SocketIO?userId=12345 String userId = client.getHandshakeData().getSingleUrlParam("userId"); //同一个页面sessionid一样的 UUID sessionId = client.getSessionId(); //保存用户的信息在缓存里面 clientCache.saveClient(userId, sessionId, client); log.info("SocketIOServerHandler-用户id:{},sessionId:{},建立连接成功", userId, sessionId); } /** * 关闭连接 * @param client 客户端的SocketIO */ @OnDisconnect public void onDisconnect(SocketIOClient client) { //因为我定义用户的参数为userId,你也可以定义其他名称 String userId = client.getHandshakeData().getSingleUrlParam("userId"); //sessionId,页面唯一标识 UUID sessionId = client.getSessionId(); //clientCache.deleteUserCacheByUserId(userId); //只会删除用户某个页面会话的缓存,不会删除该用户不同会话的缓存,比如:用户同时打开了谷歌和QQ浏览器,当你关闭谷歌时候,只会删除该用户谷歌的缓存会话 clientCache.deleteSessionClientByUserId(userId, sessionId); log.info("SocketIOServerHandler-用户id:{},sessionId:{},关闭连接成功", userId, sessionId); } } 前端依赖 1 2 3 { "socket.io-client": "^4.7.2" } 后端构建 处理器定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.annotation.OnEvent; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component public class TestHandler { //测试使用 @OnEvent("testHandler") public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { log.info("MyTestHandler:{}", data); if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("MyTestHandler", data); } } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.annotation.OnEvent; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component public class SocketIOHandler { //测试使用 @OnEvent("socketIOHandler") public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { log.info("SocketIOHandler:{}", data); if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("SocketIOHandler", data); } } } 加入房间 1 2 3 4 5 6 7 8 9 10 11 12 //加入房间 @OnEvent("joinRoom") public void joinRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { client.joinRoom(data); if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("加入房间", "成功"); } } 离开房间 1 2 3 4 5 6 7 8 9 10 11 //离开房间 @OnEvent("leaveRoom") public void leaveRoom(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { client.leaveRoom(data); if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("离开房间", "成功"); } } 获取用户所有房间 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 //获取该用户所有房间 @OnEvent("getUserRooms") public void getUserRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { String userId = client.getHandshakeData().getSingleUrlParam("userId"); Set<String> allRooms = client.getAllRooms(); for (String room : allRooms) { System.out.println("房间名称:" + room); } log.info("服务器收到消息,客户端用户id:{} | 客户发送的消息:{} | 是否需要返回给客户端内容:{} ", userId, data, ackRequest.isAckRequested()); if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("你好", "哈哈哈"); } } 发送消息给指定的房间 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @OnEvent("sendRoomMessage") public void sendRoomMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { String userId = client.getHandshakeData().getSingleUrlParam("userId"); Set<String> allRooms = client.getAllRooms(); for (String room : allRooms) { log.info("房间:{}", room); //发送给指定空间名称以及房间的人,并且排除不发给自己 socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message", client, data); //发送给指定空间名称以及房间的人,包括自己 //socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message", data);; } if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("发送消息到指定的房间", "成功"); } } 广播消息给指定的Namespace下所有客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 //广播消息给指定的Namespace下所有客户端 @OnEvent("sendNamespaceMessage") public void sendNamespaceMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { socketIoServer.getNamespace("/socketIO").getBroadcastOperations().sendEvent("message", client, data); ; if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("发送消息到指定的房间", "成功"); } } 点对点发送 1 2 3 4 5 6 7 8 9 //点对点 public void sendMessageOne(String userId) throws JsonProcessingException { HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(userId); for (UUID sessionId : userClient.keySet()) { socketIoServer.getNamespace("/socketIO").getClient(sessionId).sendEvent("message", "这是点对点发送"); } } 前端构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import {io} from "socket.io-client"; export default function SocketIOFrontend() { const [socketIO, setSocketIO] = useState(null) useEffect(() => { let socket = io() if (socketIO === null) { socket = io(socketIOUrl + 'room?key=' + localStorage.getItem('satoken') , { transports: ['websocket'], upgrade: false, reconnection: true } ) socket.on("connect", () => { console.log(socket.id); console.log('已连接') }); socket.on("disconnect", () => { console.log(socket.id); console.log('已断开') }); setSocketIO(socket) socket.on("onMessage", async (param) => { })//监听回调 socket.emit('emitMessage', param)//发送消息 } return () => { socket.close() setSocketIO(null) }; }, []); }

2024-08-09 · 6 分钟 · Nebula