golang简单实现发布订阅模式

发布/订阅(publish-subscribe)模型通常被简写为pub/sub模型。在这个模型中,消息生产者为发布者(publisher)。而消息消费者者为订阅者(subscriber),生产者和消费者是 M:N 的关系。在传统生产者/消费者模型中,是将消息发送到一个队列中,而发布订阅模型则是将消息发布给一个主题。

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package pubsub

import (
"sync"
"time"
)

type (
subscriber chan interface{} // 订阅者为一个通道
topicFunc func(v interface{}) bool // 主题为一个过滤器
)

type Publisher struct {
m sync.RWMutex // 读写锁
buffer int // 订阅队列的缓存长度
timeout time.Duration // 生产者生产消息的超时时间
subscribers map[subscriber]topicFunc // 订阅者信息
}

// NewPublisher 构建发送者对象
func NewPublisher(buffer int, timeout time.Duration) *Publisher {
return &Publisher{
buffer: buffer,
timeout: timeout,
subscribers: make(map[subscriber]topicFunc),
}
}

// Subscribe 订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}

// SubscribeTopic 订阅某一主题
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
defer p.m.Unlock()
p.subscribers[ch] = topic
return ch
}

// Evict 退出主题
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}

// Close close chan of all
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}

// Publish 向所有满足条件的主题发送消息
func (p *Publisher) Publish(v interface{}) {
p.m.Lock()
defer p.m.Unlock()
var wg sync.WaitGroup
for sub, topic := range p.subscribers {
wg.Add(1)
go p.SendTopic(sub, topic, v, &wg)
}
wg.Wait()
}

// SendTopic 向某一主题发送消息
func (p *Publisher) SendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
select {
case sub <- v:
case <-time.After(p.timeout):
}
}

构建发送和消费示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"os"
"os/signal"
"strings"
"study/tests/pub/pubsub"
"syscall"
"time"
)

func main() {
p := pubsub.NewPublisher(10, 5*time.Second)
defer p.Close()
// 订阅全部主题
all := p.Subscribe()
// 订阅包含golang的主题
golang := p.SubscribeTopic(func(v interface{}) bool {
if s, ok := v.(string); ok && strings.Contains(s, "golang") {
return true
}
return false
})
p.Publish("hello world")
p.Publish("hello golang")
go func() {
for v := range all {
fmt.Println("all subscribe: ", v)
}
}()
go func() {
for v := range golang {
fmt.Println("golang subscribe: ", v)
}
}()
sig := make(chan os.Signal, 1)
// syscall.SIGINT 用户发送INTR字符(Ctrl+C)触发
// syscall.SIGTERM 结束程序(可以被捕获、阻塞或忽略)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("quit (%v)\n", <-sig)
}

参考

  • 《Go语言高级编程》
Search by:GoogleBingBaidu