// Close close chan of all func(p *Publisher)Close() { p.m.Lock() defer p.m.Unlock() for sub := range p.subscribers { delete(p.subscribers, sub) close(sub) } }
// Publish 向所有满足条件的主题发送消息 func(p *Publisher)Publish(v interface{}) { p.m.Lock() defer p.m.Unlock() var wg sync.WaitGroup for sub, topic := range p.subscribers { wg.Add(1) go p.SendTopic(sub, topic, v, &wg) } wg.Wait() }
// SendTopic 向某一主题发送消息 func(p *Publisher)SendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { defer wg.Done() if topic != nil && !topic(v) { return } select { case sub <- v: case <-time.After(p.timeout): } }