NATS

NATS-Server 基于docker compose部署 1 2 3 4 5 6 7 8 services: nats-main: image: nats:latest ports: - "4222:4222" - "6222:6222" - "8222:8222" restart: unless-stopped Go-NATS 引入依赖 1 go get github.com/nats-io/nats.go 发布订阅模式 在此模式下,发送者发送消息后,所有在线订阅者都能接收到消息 sub.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package main import "github.com/nats-io/nats.go" func main() { conn, err := nats.Connect("nats://ip:4222") if err != nil { return } for i := 1; i <= 2; i++ { dummy := i conn.Subscribe("hello", func(msg *nats.Msg) { fmt.Printf("消费者[%d]收到:%s\n", dummy, string(msg.Data)) }) } select {} } pub.go ...

2024-09-26 · 4 分钟 · Nebula

gRPC

gRPC简介 gRPC 是由google开源的一个高性能的RPC框架。Stubby Google内部的RPC,演化而来的,2015正式开源。云原生时代是一个RPC标准。 gRPC 核心的设计思路 网络通信 —> gRPC自己封装网络通信的部分 提供多种语言的 网络通信的封装 (C Java[Netty] GO) 协议 —> HTTP2 传输数据的时候 二进制数据内容。 支持双向流(双工)连接的多路复用。 序列化 —> 基本文本 JSON 基于二进制 Java原生序列化方式 Thrift二进制的序列化 压缩二级制序列化。 protobuf (Protocol Buffers) google开源一种序列化方式 时间效率和空间效率是JSON的3—5倍。 IDL语言 代理的创建 —>让调用者像调用本地方法那样 去调用远端的服务方法。 stub gRPC的好处 高效的进行进程间通信。 支持多种语言 原生支持 C Go Java实现。C语言版本上扩展 C++ C# NodeJS Python Ruby PHP.. 支持多平台运行 Linux Android IOS MacOS Windows。 gPRC序列化方式采用protobuf,效率高。 使用Http2协议 大厂的背书 Http2.0协议 回顾 Http1.x协议 Http1.0协议 请求响应的模式 短连接协议(无状态协议) 传输数据文本结构 单工 无法实现服务端推送 变相实现推动(客户端轮训的方式) Http1.1协议 请求响应的模式 有限的长连接 升级的方式WebSocket 双工 实现服务器向客户端推送。 总结Http1.x协议 共性 传输数据文本格式 可读性好的但是效率差。 本质上Http1.x协议无法实现双工通信。 资源的请求。需要发送多次请求,建立多个连接才可以完成。 HTTP2.0协议 Http2.0协议是一个二进制协议,效率高于Http1.x协议,可读性差。 可以实现双工通信。 一个请求 一个连接 可以请求多个数据。【多路复用】 Http2.0协议的三个概念 数据流 stream 消息 message 帧 frame 其他的相关概念 数据流的优先级,可以通过为不同的stream设置权重,来限制不同流的传输顺序。 流控 client发送的数据太快了,server处理不过来,通知client暂停数据的发送。 Protocol Buffers protobuf 是一种与编程语言无关【IDL】,与具体的平台无关【OS】。他定义的中间语言,可以方便的在client 于 server中进行RPC的数据传输。 protobuf 两种版本 proto2 proto3,但是目前主流应用的都是proto3。 protobuf主要安装protobuf的编译器,编译器目的,可以把protobuf的IDL语言,转换成具体某一种开发语言。 protobuf编译器的安装 https://github.com/protocolbuffers/protobuf/releases ...

2024-08-09 · 9 分钟 · Nebula

RabbitMQ

前置 基本概念 Publisher:生产者,可以向队列或者交换机发送消息 Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,递交给某个特别队列、递交给所有队列、或是将消息丢弃。取决于Exchange的类型 Fanout:广播,将消息交给所有绑定到交换机的队列 Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列 Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符 Headers:头匹配,基于MQ的消息头匹配 Queue:队列,接收消息、缓存消息 Consumer:消费者,订阅队列并消费消息 安装 1 2 3 4 5 6 7 8 9 10 11 docker run \ -e RABBITMQ_DEFAULT_USER=nebula \ -e RABBITMQ_DEFAULT_PASS=123456 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ --network mq-net\ -d \ rabbitmq 15672:RabbitMQ提供的管理控制台的端口 5672:RabbitMQ的消息发送处理接口 spring使用 pom依赖 1 2 3 4 5 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> yaml配置 1 2 3 4 5 6 7 spring: rabbitmq: host: 127.0.0.1 # 你的虚拟机IP port: 5672 # 端口 virtual-host: /nebula_vh # 虚拟主机 username: nebula # 用户名 password: 123456 # 密码 WorkQueue模型配置 1 2 3 4 5 spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息 队列收发 publisher 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; public void testSimpleQueue() { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); } } consumer 1 2 3 4 5 6 7 8 9 10 11 @Component public class SpringRabbitListener { // 利用RabbitListener来声明要监听的队列信息 // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。 // 可以看到方法体中接收的就是消息体的内容 @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } } 交换机收发 Fanout交换机 publisher 1 2 3 4 5 6 7 public void testFanoutExchange() { // 交换机名称 String exchangeName = "nebula.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); } consumer 1 2 3 4 5 6 7 8 9 @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); } Direct交换机 publisher 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void testSendDirectExchange() { // 交换机名称 String exchangeName = "nebula.direct"; // 消息 String message = "redMessage"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); } public void testSendDirectExchange() { // 交换机名称 String exchangeName = "nebula.direct"; // 消息 String message = "blueMessage"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "blue", message); } consumer 1 2 3 4 5 6 7 8 9 @RabbitListener(queues = "direct.queue1") public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "direct.queue2") public void listenDirectQueue2(String msg) { System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); } Topic交换机 #:匹配一个或多个词 *:匹配一个词 publisher 1 2 3 4 5 6 7 8 public void testSendTopicExchange() { // 交换机名称 String exchangeName = "nebula.topic"; // 消息 String message = "message"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "nebula.news", message); } consumer 1 2 3 4 5 6 7 8 9 @RabbitListener(queues = "topic.queue1") public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "topic.queue2") public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); } 配置类声明交换机 Fanout交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration public class FanoutConfig { //声明交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("nebula.fanout"); } //声明队列 @Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1"); } //绑定队列和交换机 @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } } Direct交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Configuration public class DirectConfig { //声明交换机 @Bean public DirectExchange directExchange() { return ExchangeBuilder.directExchange("nebula.direct").build(); } //声明队列 @Bean public Queue directQueue1() { return new Queue("direct.queue1"); } //绑定队列和交换机 @Bean public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange) { return BindingBuilder.bind(directQueue1).to(directExchange).with("red"); } } Topic交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Configuration public class TopicConfig { //声明交换机 @Bean public TopicExchange topicExchange() { return ExchangeBuilder.topicExchange("nebula.topic").build(); } //声明队列 @Bean public Queue topicQueue1() { return new Queue("topic.queue1"); } //绑定队列和交换机 @Bean public Binding bindingQueue1WithRed(Queue topicQueue1, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue1).to(topicExchange).with("red.#"); } } 注解声明交换机 Fanout交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue1"), exchange = @Exchange(name = "nebula.fanout", type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue1(String msg){ System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue2"), exchange = @Exchange(name = "nebula.fanout", type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue2(String msg){ System.out.println("消费者2接收到fanout.queue2的消息:【" + msg + "】"); } Direct交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "nebula.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "nebula.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); } Topic交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); } 消息转换器 使用jackson转换器 1 2 3 4 <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> </dependency> 添加Bean 1 2 3 4 5 6 7 8 9 //在启动类中添加Bean @Bean public MessageConverter messageConverter(){ // 1.定义消息转换器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; }

2024-08-09 · 5 分钟 · Nebula

SocketIO

基本配置 后端依赖 1 2 3 4 5 6 <dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>2.0.9</version> </dependency> 配置文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #自定义socketio配置,你可以直接硬编码,看个人喜好 socketio: # socketio请求地址 host: localhost # socketio端口 port: 8099 # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器 maxFramePayloadLength: 1048576 # 设置http交互最大内容长度 maxHttpContentLength: 1048576 # socket连接数大小(如只监听一个端口boss线程组为1即可) bossCount: 1 # 连接数大小 workCount: 100 # 允许客户请求 allowCustomRequests: true # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间 upgradeTimeout: 1000000 # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件 pingTimeout: 6000000 # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔 pingInterval: 25000 # 命名空间,多个以逗号分隔, namespaces: /test,/socketIO,/room,/push 配置类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 import com.corundumstudio.socketio.SocketConfig; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.SpringAnnotationScanner; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.Optional; @Configuration public class SocketIOConfig { @Value("${socketio.host}") private String host; @Value("${socketio.port}") private Integer port; @Value("${socketio.maxFramePayloadLength}") private int maxFramePayloadLength; @Value("${socketio.maxHttpContentLength}") private int maxHttpContentLength; @Value("${socketio.bossCount}") private int bossCount; @Value("${socketio.workCount}") private int workCount; @Value("${socketio.allowCustomRequests}") private boolean allowCustomRequests; @Value("${socketio.upgradeTimeout}") private int upgradeTimeout; @Value("${socketio.pingTimeout}") private int pingTimeout; @Value("${socketio.pingInterval}") private int pingInterval; @Value("${socketio.namespaces}") private String[] namespaces; @Bean public SocketIOServer socketIOServer() { SocketConfig socketConfig = new SocketConfig(); socketConfig.setTcpNoDelay(true); socketConfig.setSoLinger(0); com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); config.setSocketConfig(socketConfig); config.setHostname(host); config.setPort(port); config.setMaxFramePayloadLength(maxFramePayloadLength); config.setMaxHttpContentLength(maxHttpContentLength); config.setBossThreads(bossCount); config.setWorkerThreads(workCount); config.setAllowCustomRequests(allowCustomRequests); config.setUpgradeTimeout(upgradeTimeout); config.setPingTimeout(pingTimeout); config.setPingInterval(pingInterval); config.setOrigin(":*:"); //服务端 final SocketIOServer server = new SocketIOServer(config); //添加命名空间(如果你不需要命名空间,下面的代码可以去掉) Optional.ofNullable(namespaces).ifPresent(nss -> Arrays.stream(nss).forEach(server::addNamespace)); server.addNamespace("/test");//也可以这样定义 server.addNamespace("/socketIO"); return server; } //这个对象是用来扫描socketio的注解,比如 @OnConnect、@OnEvent @Bean public SpringAnnotationScanner springAnnotationScanner() { return new SpringAnnotationScanner(socketIOServer()); } } 服务启动/关闭 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import com.corundumstudio.socketio.SocketIOServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.stereotype.Component; @SpringBootApplication public class SocketioServerApplication { public static void main(String[] args) { SpringApplication.run(SocketioServerApplication.class, args); } } @Component @Slf4j class SocketIOServerRunner implements CommandLineRunner, DisposableBean { @Autowired private SocketIOServer socketIOServer; @Autowired private TestHandler testHandler; @Autowired private SocketioHandler socketioHandler; @Override public void run(String... args) throws Exception { //namespace分别交给各自的Handler监听,这样就可以隔离,只有客户端指定namespace,才能访问对应Handler。 //比如:http://localhost:9999/test?userId=12345 socketIOServer.getNamespace("/test").addListeners(testHandler); socketIOServer.getNamespace("/socketIO").addListeners(socketIOHandler); socketIOServer.start(); log.info("SocketIOServer==============================启动成功"); } @Override public void destroy() throws Exception { //如果用kill -9 这个监听是没用的,有可能会导致你服务kill掉了,但是socket服务没有kill掉 socketIOServer.stop(); log.info("SocketIOServer==============================关闭成功"); } } 用户信息缓存 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import com.corundumstudio.socketio.SocketIOClient; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * 这是存储用户的缓存信息 */ @Component public class ClientCache { //用于存储用户的socket缓存信息 private static ConcurrentHashMap<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap<>(); //保存用户信息 public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) { HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId); if (sessionIdClientCache == null) { sessionIdClientCache = new HashMap<>(); } sessionIdClientCache.put(sessionId, socketIOClient); concurrentHashMap.put(userId, sessionIdClientCache); } //获取用户信息 public HashMap<UUID, SocketIOClient> getUserClient(String userId) { return concurrentHashMap.get(userId); } //根据用户id和session删除用户某个session信息 public void deleteSessionClientByUserId(String userId, UUID sessionId) { concurrentHashMap.get(userId).remove(sessionId); } //删除用户缓存信息 public void deleteUserCacheByUserId(String userId) { concurrentHashMap.remove(userId); } } 监听客户端请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.OnConnect; import com.corundumstudio.socketio.annotation.OnDisconnect; import com.gzgs.socketio.common.cache.ClientCache; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; @Slf4j @Component public class SocketIOServerHandler { @Autowired private ClientCache clientCache; /** * 建立连接 * @param client 客户端的SocketIO */ @OnConnect public void onConnect(SocketIOClient client) { //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999?userId=12345 //下面两种是加了命名空间的,他会请求对应命名空间的方法(就类似你进了不同的房间玩游戏) //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/test?userId=12345 //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/SocketIO?userId=12345 String userId = client.getHandshakeData().getSingleUrlParam("userId"); //同一个页面sessionid一样的 UUID sessionId = client.getSessionId(); //保存用户的信息在缓存里面 clientCache.saveClient(userId, sessionId, client); log.info("SocketIOServerHandler-用户id:{},sessionId:{},建立连接成功", userId, sessionId); } /** * 关闭连接 * @param client 客户端的SocketIO */ @OnDisconnect public void onDisconnect(SocketIOClient client) { //因为我定义用户的参数为userId,你也可以定义其他名称 String userId = client.getHandshakeData().getSingleUrlParam("userId"); //sessionId,页面唯一标识 UUID sessionId = client.getSessionId(); //clientCache.deleteUserCacheByUserId(userId); //只会删除用户某个页面会话的缓存,不会删除该用户不同会话的缓存,比如:用户同时打开了谷歌和QQ浏览器,当你关闭谷歌时候,只会删除该用户谷歌的缓存会话 clientCache.deleteSessionClientByUserId(userId, sessionId); log.info("SocketIOServerHandler-用户id:{},sessionId:{},关闭连接成功", userId, sessionId); } } 前端依赖 1 2 3 { "socket.io-client": "^4.7.2" } 后端构建 处理器定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.annotation.OnEvent; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component public class TestHandler { //测试使用 @OnEvent("testHandler") public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { log.info("MyTestHandler:{}", data); if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("MyTestHandler", data); } } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.annotation.OnEvent; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component public class SocketIOHandler { //测试使用 @OnEvent("socketIOHandler") public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { log.info("SocketIOHandler:{}", data); if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("SocketIOHandler", data); } } } 加入房间 1 2 3 4 5 6 7 8 9 10 11 12 //加入房间 @OnEvent("joinRoom") public void joinRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { client.joinRoom(data); if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("加入房间", "成功"); } } 离开房间 1 2 3 4 5 6 7 8 9 10 11 //离开房间 @OnEvent("leaveRoom") public void leaveRoom(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { client.leaveRoom(data); if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("离开房间", "成功"); } } 获取用户所有房间 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 //获取该用户所有房间 @OnEvent("getUserRooms") public void getUserRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { String userId = client.getHandshakeData().getSingleUrlParam("userId"); Set<String> allRooms = client.getAllRooms(); for (String room : allRooms) { System.out.println("房间名称:" + room); } log.info("服务器收到消息,客户端用户id:{} | 客户发送的消息:{} | 是否需要返回给客户端内容:{} ", userId, data, ackRequest.isAckRequested()); if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("你好", "哈哈哈"); } } 发送消息给指定的房间 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @OnEvent("sendRoomMessage") public void sendRoomMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { String userId = client.getHandshakeData().getSingleUrlParam("userId"); Set<String> allRooms = client.getAllRooms(); for (String room : allRooms) { log.info("房间:{}", room); //发送给指定空间名称以及房间的人,并且排除不发给自己 socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message", client, data); //发送给指定空间名称以及房间的人,包括自己 //socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message", data);; } if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("发送消息到指定的房间", "成功"); } } 广播消息给指定的Namespace下所有客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 //广播消息给指定的Namespace下所有客户端 @OnEvent("sendNamespaceMessage") public void sendNamespaceMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { socketIoServer.getNamespace("/socketIO").getBroadcastOperations().sendEvent("message", client, data); ; if (ackRequest.isAckRequested()) { //返回给客户端,说我接收到了 ackRequest.sendAckData("发送消息到指定的房间", "成功"); } } 点对点发送 1 2 3 4 5 6 7 8 9 //点对点 public void sendMessageOne(String userId) throws JsonProcessingException { HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(userId); for (UUID sessionId : userClient.keySet()) { socketIoServer.getNamespace("/socketIO").getClient(sessionId).sendEvent("message", "这是点对点发送"); } } 前端构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import {io} from "socket.io-client"; export default function SocketIOFrontend() { const [socketIO, setSocketIO] = useState(null) useEffect(() => { let socket = io() if (socketIO === null) { socket = io(socketIOUrl + 'room?key=' + localStorage.getItem('satoken') , { transports: ['websocket'], upgrade: false, reconnection: true } ) socket.on("connect", () => { console.log(socket.id); console.log('已连接') }); socket.on("disconnect", () => { console.log(socket.id); console.log('已断开') }); setSocketIO(socket) socket.on("onMessage", async (param) => { })//监听回调 socket.emit('emitMessage', param)//发送消息 } return () => { socket.close() setSocketIO(null) }; }, []); }

2024-08-09 · 6 分钟 · Nebula