NATS-Server

基于docker compose部署

1
2
3
4
5
6
7
8
services:
  nats-main:
    image: nats:latest
    ports:
      - "4222:4222"
      - "6222:6222"
      - "8222:8222"
    restart: unless-stopped

Go-NATS

引入依赖

1
go get github.com/nats-io/nats.go

发布订阅模式

在此模式下,发送者发送消息后,所有在线订阅者都能接收到消息

sub.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package main

import "github.com/nats-io/nats.go"

func main() {
	conn, err := nats.Connect("nats://ip:4222")
	if err != nil {
		return
	}
	for i := 1; i <= 2; i++ {
		dummy := i
		conn.Subscribe("hello", func(msg *nats.Msg) {
			fmt.Printf("消费者[%d]收到:%s\n", dummy, string(msg.Data))
		})
	}
	select {}
}

pub.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
package main

import "github.com/nats-io/nats.go"

func main() {
	conn, err := nats.Connect("nats://ip:4222")
	if err != nil {
		return
	}
	conn.Publish("hello", []byte("这是一个测试数据"))
	select {}
}

工作队列模式

在此模式下,发送者发送消息后,只有一个订阅者能接收到消息

sub.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
	"fmt"
	"github.com/nats-io/nats.go"
)

func main() {
	conn, err := nats.Connect("nats://ip:4222")
	if err != nil {
		return
	}
	for i := 1; i <= 2; i++ {
		dummy := i
		conn.QueueSubscribe("hello", "go_queue", func(msg *nats.Msg) {
			fmt.Printf("消费者[%d]收到:%s\n", dummy, string(msg.Data))
		})
	}
	select {}
}

pub.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
package main

import "github.com/nats-io/nats.go"

func main() {
	conn, err := nats.Connect("nats://ip:4222")
	if err != nil {
		return
	}
	conn.Publish("hello", []byte("这是一个测试数据"))
	select {}
}

请求响应模式

在此模式下,发送者发送消息后,只有订阅者能接收到消息,并且订阅者只能回复一次,否则会被丢弃

sub.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
	"fmt"
	"github.com/nats-io/nats.go"
)

func main() {
	conn, err := nats.Connect("nats://120.0.0.1:4222")
	if err != nil {
		return
	}

	conn.Subscribe("hello", func(msg *nats.Msg) {
		fmt.Printf("消费者主题[%s]收到:%s\n", "hello", string(msg.Data))
		conn.Publish(msg.Reply, []byte("消费者回复"))
	})

	select {}
}

路由匹配模式

* 单个匹配

> 单个匹配或多个匹配

sub.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
	"fmt"
	"github.com/nats-io/nats.go"
)

func main() {
	conn, err := nats.Connect("nats://ip:4222")
	if err != nil {
		return
	}

	subjs := []string{"user", "user.*", "user.*.>", "user.*.*"}
	for _, subj := range subjs {
		dummy := subj
		conn.Subscribe(dummy, func(msg *nats.Msg) {
			fmt.Printf("消费者主题[%s]收到:%s\n", dummy, string(msg.Data))
		})
	}

	select {}
}

pub.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package main

import "github.com/nats-io/nats.go"

func main() {
	conn, err := nats.Connect("nats://120.0.0.1:4222")
	if err != nil {
		return
	}
	conn.Publish("user", []byte("user"))
	conn.Publish("user.hello", []byte("user.hello"))
	conn.Publish("user.hello.lufei", []byte("user.hello.lufei"))
	conn.Publish("user.hello.lufei.namei", []byte("user.hello.lufei.namei"))
	select {}
}

Kotlin-Nats

引入依赖

1
2
3
4
5
6

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>2.20.2</version>
</dependency>

发布订阅

sub.kt

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package pubsub

import io.nats.client.Message
import io.nats.client.Nats
import java.nio.charset.StandardCharsets
import java.util.concurrent.CountDownLatch

fun main() {
    try {
        val nc = Nats.connect("nats://ip:4222")
        println("订阅连接成功")
        val latch = CountDownLatch(10)

        val d = nc.createDispatcher { msg: Message ->
            val str = String(msg.data, StandardCharsets.UTF_8)
            println("全部的消息:" + msg.javaClass.toString())
            println("订阅接收的消息:$str")
            latch.countDown()
        }

        d.subscribe("hello")
        println("订阅消息!")
        latch.await()

        nc.close()
        println("订阅 关闭连接")
    } catch (e: Exception) {
        e.printStackTrace()
    }
}

pub.kt

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package pubsub

import io.nats.client.Nats
import java.time.Duration

fun main() {
    try {
        val nc = Nats.connect("nats://ip:4222")
        println("发布 连接成功")

        nc.publish("hello", "这是测试数据")

        nc.flush(Duration.ZERO)
        println("发布消息成功")
        nc.close()

        println("发布关闭连接")
    } catch (e: Exception) {
        e.printStackTrace()
    }
}

请求响应

sub.kt

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package requestReply

import io.nats.client.Message
import io.nats.client.Nats
import java.nio.charset.StandardCharsets
import java.util.concurrent.CountDownLatch

fun main() {
    try {
        val nc = Nats.connect("nats://ip:4222")
        println("订阅连接成功")
        val latch = CountDownLatch(10)

        val d = nc.createDispatcher { msg: Message ->
            val str = String(msg.data, StandardCharsets.UTF_8)
            println("订阅接收的消息:$str")
            nc.publish(msg.replyTo, "这是响应".toByteArray(StandardCharsets.UTF_8))
            latch.countDown()
        }

        d.subscribe("hello")
        println("订阅消息!")
        latch.await()

        nc.close()
        println("订阅 关闭连接")
    } catch (e: Exception) {
        e.printStackTrace()
    }
}