基本配置

后端依赖

1
2
3
4
5
6

<dependency>
    <groupId>com.corundumstudio.socketio</groupId>
    <artifactId>netty-socketio</artifactId>
    <version>2.0.9</version>
</dependency>

配置文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#自定义socketio配置,你可以直接硬编码,看个人喜好
socketio:
  # socketio请求地址
  host: localhost
  # socketio端口
  port: 8099
  # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
  maxFramePayloadLength: 1048576
  # 设置http交互最大内容长度
  maxHttpContentLength: 1048576
  # socket连接数大小(如只监听一个端口boss线程组为1即可)
  bossCount: 1
  # 连接数大小
  workCount: 100
  # 允许客户请求
  allowCustomRequests: true
  # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
  upgradeTimeout: 1000000
  # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
  pingTimeout: 6000000
  # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
  pingInterval: 25000
  # 命名空间,多个以逗号分隔,
  namespaces: /test,/socketIO,/room,/push

配置类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.Optional;

@Configuration
public class SocketIOConfig {

    @Value("${socketio.host}")
    private String host;

    @Value("${socketio.port}")
    private Integer port;

    @Value("${socketio.maxFramePayloadLength}")
    private int maxFramePayloadLength;

    @Value("${socketio.maxHttpContentLength}")
    private int maxHttpContentLength;

    @Value("${socketio.bossCount}")
    private int bossCount;

    @Value("${socketio.workCount}")
    private int workCount;

    @Value("${socketio.allowCustomRequests}")
    private boolean allowCustomRequests;

    @Value("${socketio.upgradeTimeout}")
    private int upgradeTimeout;

    @Value("${socketio.pingTimeout}")
    private int pingTimeout;

    @Value("${socketio.pingInterval}")
    private int pingInterval;

    @Value("${socketio.namespaces}")
    private String[] namespaces;

    @Bean
    public SocketIOServer socketIOServer() {
        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setTcpNoDelay(true);
        socketConfig.setSoLinger(0);
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        config.setSocketConfig(socketConfig);
        config.setHostname(host);
        config.setPort(port);
        config.setMaxFramePayloadLength(maxFramePayloadLength);
        config.setMaxHttpContentLength(maxHttpContentLength);
        config.setBossThreads(bossCount);
        config.setWorkerThreads(workCount);
        config.setAllowCustomRequests(allowCustomRequests);
        config.setUpgradeTimeout(upgradeTimeout);
        config.setPingTimeout(pingTimeout);
        config.setPingInterval(pingInterval);
        config.setOrigin(":*:");

        //服务端
        final SocketIOServer server = new SocketIOServer(config);

        //添加命名空间(如果你不需要命名空间,下面的代码可以去掉)
        Optional.ofNullable(namespaces).ifPresent(nss ->
                Arrays.stream(nss).forEach(server::addNamespace));
        server.addNamespace("/test");//也可以这样定义
        server.addNamespace("/socketIO");
        return server;
    }

    //这个对象是用来扫描socketio的注解,比如 @OnConnect、@OnEvent
    @Bean
    public SpringAnnotationScanner springAnnotationScanner() {
        return new SpringAnnotationScanner(socketIOServer());
    }
}

服务启动/关闭

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Component;

@SpringBootApplication
public class SocketioServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SocketioServerApplication.class, args);
    }

}

@Component
@Slf4j
class SocketIOServerRunner implements CommandLineRunner, DisposableBean {

    @Autowired
    private SocketIOServer socketIOServer;

    @Autowired
    private TestHandler testHandler;

    @Autowired
    private SocketioHandler socketioHandler;

    @Override
    public void run(String... args) throws Exception {
        //namespace分别交给各自的Handler监听,这样就可以隔离,只有客户端指定namespace,才能访问对应Handler。
        //比如:http://localhost:9999/test?userId=12345
        socketIOServer.getNamespace("/test").addListeners(testHandler);
        socketIOServer.getNamespace("/socketIO").addListeners(socketIOHandler);
        socketIOServer.start();
        log.info("SocketIOServer==============================启动成功");
    }


    @Override
    public void destroy() throws Exception {
        //如果用kill -9  这个监听是没用的,有可能会导致你服务kill掉了,但是socket服务没有kill掉
        socketIOServer.stop();
        log.info("SocketIOServer==============================关闭成功");
    }
}

用户信息缓存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import com.corundumstudio.socketio.SocketIOClient;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 这是存储用户的缓存信息
 */
@Component
public class ClientCache {

    //用于存储用户的socket缓存信息
    private static ConcurrentHashMap<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap<>();


    //保存用户信息
    public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) {
        HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId);
        if (sessionIdClientCache == null) {
            sessionIdClientCache = new HashMap<>();
        }
        sessionIdClientCache.put(sessionId, socketIOClient);
        concurrentHashMap.put(userId, sessionIdClientCache);
    }


    //获取用户信息
    public HashMap<UUID, SocketIOClient> getUserClient(String userId) {
        return concurrentHashMap.get(userId);
    }


    //根据用户id和session删除用户某个session信息
    public void deleteSessionClientByUserId(String userId, UUID sessionId) {
        concurrentHashMap.get(userId).remove(sessionId);
    }


    //删除用户缓存信息
    public void deleteUserCacheByUserId(String userId) {
        concurrentHashMap.remove(userId);
    }
}

监听客户端请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;

import com.gzgs.socketio.common.cache.ClientCache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;


@Slf4j
@Component
public class SocketIOServerHandler {

    @Autowired
    private ClientCache clientCache;

    /**
     * 建立连接
     * @param client 客户端的SocketIO
     */
    @OnConnect
    public void onConnect(SocketIOClient client) {

        //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999?userId=12345

        //下面两种是加了命名空间的,他会请求对应命名空间的方法(就类似你进了不同的房间玩游戏)
        //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/test?userId=12345
        //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/SocketIO?userId=12345
        String userId = client.getHandshakeData().getSingleUrlParam("userId");

        //同一个页面sessionid一样的
        UUID sessionId = client.getSessionId();

        //保存用户的信息在缓存里面
        clientCache.saveClient(userId, sessionId, client);

        log.info("SocketIOServerHandler-用户id:{},sessionId:{},建立连接成功", userId, sessionId);


    }

    /**
     * 关闭连接
     * @param client 客户端的SocketIO
     */
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {

        //因为我定义用户的参数为userId,你也可以定义其他名称
        String userId = client.getHandshakeData().getSingleUrlParam("userId");

        //sessionId,页面唯一标识
        UUID sessionId = client.getSessionId();

        //clientCache.deleteUserCacheByUserId(userId);
        //只会删除用户某个页面会话的缓存,不会删除该用户不同会话的缓存,比如:用户同时打开了谷歌和QQ浏览器,当你关闭谷歌时候,只会删除该用户谷歌的缓存会话
        clientCache.deleteSessionClientByUserId(userId, sessionId);

        log.info("SocketIOServerHandler-用户id:{},sessionId:{},关闭连接成功", userId, sessionId);
    }
}

前端依赖

1
2
3
{
  "socket.io-client": "^4.7.2"
}

后端构建

处理器定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.fasterxml.jackson.core.JsonProcessingException;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

@Slf4j
@Component
public class TestHandler {

    //测试使用
    @OnEvent("testHandler")
    public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {

        log.info("MyTestHandler:{}", data);

        if (ackRequest.isAckRequested()) {
            //返回给客户端,说我接收到了
            ackRequest.sendAckData("MyTestHandler", data);
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;

import com.corundumstudio.socketio.annotation.OnEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SocketIOHandler {

    //测试使用
    @OnEvent("socketIOHandler")
    public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {

        log.info("SocketIOHandler:{}", data);

        if (ackRequest.isAckRequested()) {
            //返回给客户端,说我接收到了
            ackRequest.sendAckData("SocketIOHandler", data);
        }

    }

}

加入房间

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  //加入房间
@OnEvent("joinRoom")
public void joinRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {

    client.joinRoom(data);

    if (ackRequest.isAckRequested()) {
        //返回给客户端,说我接收到了
        ackRequest.sendAckData("加入房间", "成功");
    }

}

离开房间

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  //离开房间
@OnEvent("leaveRoom")
public void leaveRoom(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {

    client.leaveRoom(data);

    if (ackRequest.isAckRequested()) {
        //返回给客户端,说我接收到了
        ackRequest.sendAckData("离开房间", "成功");
    }
}

获取用户所有房间

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
 //获取该用户所有房间
@OnEvent("getUserRooms")
public void getUserRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {

    String userId = client.getHandshakeData().getSingleUrlParam("userId");

    Set<String> allRooms = client.getAllRooms();

    for (String room : allRooms) {
        System.out.println("房间名称:" + room);
    }

    log.info("服务器收到消息,客户端用户id:{} | 客户发送的消息:{} | 是否需要返回给客户端内容:{} ", userId, data, ackRequest.isAckRequested());

    if (ackRequest.isAckRequested()) {
        //返回给客户端,说我接收到了
        ackRequest.sendAckData("你好", "哈哈哈");
    }
}

发送消息给指定的房间

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

@OnEvent("sendRoomMessage")
public void sendRoomMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {


    String userId = client.getHandshakeData().getSingleUrlParam("userId");


    Set<String> allRooms = client.getAllRooms();

    for (String room : allRooms) {
        log.info("房间:{}", room);
        //发送给指定空间名称以及房间的人,并且排除不发给自己
        socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message", client, data);

        //发送给指定空间名称以及房间的人,包括自己
        //socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message", data);;
    }


    if (ackRequest.isAckRequested()) {
        //返回给客户端,说我接收到了
        ackRequest.sendAckData("发送消息到指定的房间", "成功");
    }

}

广播消息给指定的Namespace下所有客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
//广播消息给指定的Namespace下所有客户端
@OnEvent("sendNamespaceMessage")
public void sendNamespaceMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {


    socketIoServer.getNamespace("/socketIO").getBroadcastOperations().sendEvent("message", client, data);
    ;


    if (ackRequest.isAckRequested()) {
        //返回给客户端,说我接收到了
        ackRequest.sendAckData("发送消息到指定的房间", "成功");
    }
}

点对点发送

1
2
3
4
5
6
7
8
9
    //点对点
public void sendMessageOne(String userId) throws JsonProcessingException {

    HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(userId);

    for (UUID sessionId : userClient.keySet()) {
        socketIoServer.getNamespace("/socketIO").getClient(sessionId).sendEvent("message", "这是点对点发送");
    }
}

前端构建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import {io} from "socket.io-client";

export default function SocketIOFrontend() {
    const [socketIO, setSocketIO] = useState(null)

    useEffect(() => {
        let socket = io()
        if (socketIO === null) {
            socket = io(socketIOUrl + 'room?key=' + localStorage.getItem('satoken')
                , {
                    transports: ['websocket'], upgrade: false, reconnection: true
                }
            )
            socket.on("connect", () => {
                console.log(socket.id);
                console.log('已连接')
            });
            socket.on("disconnect", () => {
                console.log(socket.id);
                console.log('已断开')
            });
            setSocketIO(socket)
            socket.on("onMessage", async (param) => {
            })//监听回调       
            socket.emit('emitMessage', param)//发送消息
        }
        return () => {
            socket.close()
            setSocketIO(null)
        };
    }, []);
}