NATS-Server#
基于docker compose部署
1
2
3
4
5
6
7
8
| services:
nats-main:
image: nats:latest
ports:
- "4222:4222"
- "6222:6222"
- "8222:8222"
restart: unless-stopped
|
Go-NATS#
引入依赖#
1
| go get github.com/nats-io/nats.go
|
发布订阅模式#
在此模式下,发送者发送消息后,所有在线订阅者都能接收到消息
sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| package main
import "github.com/nats-io/nats.go"
func main() {
conn, err := nats.Connect("nats://ip:4222")
if err != nil {
return
}
for i := 1; i <= 2; i++ {
dummy := i
conn.Subscribe("hello", func(msg *nats.Msg) {
fmt.Printf("消费者[%d]收到:%s\n", dummy, string(msg.Data))
})
}
select {}
}
|
pub.go
1
2
3
4
5
6
7
8
9
10
11
12
| package main
import "github.com/nats-io/nats.go"
func main() {
conn, err := nats.Connect("nats://ip:4222")
if err != nil {
return
}
conn.Publish("hello", []byte("这是一个测试数据"))
select {}
}
|
工作队列模式#
在此模式下,发送者发送消息后,只有一个订阅者能接收到消息
sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| package main
import (
"fmt"
"github.com/nats-io/nats.go"
)
func main() {
conn, err := nats.Connect("nats://ip:4222")
if err != nil {
return
}
for i := 1; i <= 2; i++ {
dummy := i
conn.QueueSubscribe("hello", "go_queue", func(msg *nats.Msg) {
fmt.Printf("消费者[%d]收到:%s\n", dummy, string(msg.Data))
})
}
select {}
}
|
pub.go
1
2
3
4
5
6
7
8
9
10
11
12
| package main
import "github.com/nats-io/nats.go"
func main() {
conn, err := nats.Connect("nats://ip:4222")
if err != nil {
return
}
conn.Publish("hello", []byte("这是一个测试数据"))
select {}
}
|
请求响应模式#
在此模式下,发送者发送消息后,只有订阅者能接收到消息,并且订阅者只能回复一次,否则会被丢弃
sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| package main
import (
"fmt"
"github.com/nats-io/nats.go"
)
func main() {
conn, err := nats.Connect("nats://120.0.0.1:4222")
if err != nil {
return
}
conn.Subscribe("hello", func(msg *nats.Msg) {
fmt.Printf("消费者主题[%s]收到:%s\n", "hello", string(msg.Data))
conn.Publish(msg.Reply, []byte("消费者回复"))
})
select {}
}
|
路由匹配模式#
*
单个匹配
>
单个匹配或多个匹配
sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| package main
import (
"fmt"
"github.com/nats-io/nats.go"
)
func main() {
conn, err := nats.Connect("nats://ip:4222")
if err != nil {
return
}
subjs := []string{"user", "user.*", "user.*.>", "user.*.*"}
for _, subj := range subjs {
dummy := subj
conn.Subscribe(dummy, func(msg *nats.Msg) {
fmt.Printf("消费者主题[%s]收到:%s\n", dummy, string(msg.Data))
})
}
select {}
}
|
pub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| package main
import "github.com/nats-io/nats.go"
func main() {
conn, err := nats.Connect("nats://120.0.0.1:4222")
if err != nil {
return
}
conn.Publish("user", []byte("user"))
conn.Publish("user.hello", []byte("user.hello"))
conn.Publish("user.hello.lufei", []byte("user.hello.lufei"))
conn.Publish("user.hello.lufei.namei", []byte("user.hello.lufei.namei"))
select {}
}
|
Kotlin-Nats#
引入依赖#
1
2
3
4
5
6
|
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.20.2</version>
</dependency>
|
发布订阅#
sub.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| package pubsub
import io.nats.client.Message
import io.nats.client.Nats
import java.nio.charset.StandardCharsets
import java.util.concurrent.CountDownLatch
fun main() {
try {
val nc = Nats.connect("nats://ip:4222")
println("订阅连接成功")
val latch = CountDownLatch(10)
val d = nc.createDispatcher { msg: Message ->
val str = String(msg.data, StandardCharsets.UTF_8)
println("全部的消息:" + msg.javaClass.toString())
println("订阅接收的消息:$str")
latch.countDown()
}
d.subscribe("hello")
println("订阅消息!")
latch.await()
nc.close()
println("订阅 关闭连接")
} catch (e: Exception) {
e.printStackTrace()
}
}
|
pub.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| package pubsub
import io.nats.client.Nats
import java.time.Duration
fun main() {
try {
val nc = Nats.connect("nats://ip:4222")
println("发布 连接成功")
nc.publish("hello", "这是测试数据")
nc.flush(Duration.ZERO)
println("发布消息成功")
nc.close()
println("发布关闭连接")
} catch (e: Exception) {
e.printStackTrace()
}
}
|
请求响应#
sub.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| package requestReply
import io.nats.client.Message
import io.nats.client.Nats
import java.nio.charset.StandardCharsets
import java.util.concurrent.CountDownLatch
fun main() {
try {
val nc = Nats.connect("nats://ip:4222")
println("订阅连接成功")
val latch = CountDownLatch(10)
val d = nc.createDispatcher { msg: Message ->
val str = String(msg.data, StandardCharsets.UTF_8)
println("订阅接收的消息:$str")
nc.publish(msg.replyTo, "这是响应".toByteArray(StandardCharsets.UTF_8))
latch.countDown()
}
d.subscribe("hello")
println("订阅消息!")
latch.await()
nc.close()
println("订阅 关闭连接")
} catch (e: Exception) {
e.printStackTrace()
}
}
|