go-micro中的发布订阅Broker分析
最近手上有点时间,打算继续了解下go-micro的发布订阅(消息),看了micro的examples后,有个疑问,go-micro在提供发布订阅的插件Broker(以及几种实现)的同时,go-micro本身还实现了Publish(Client)以及Subscribe(Server)功能,于是翻了下源码,做个记录。
Broker
Broker是go-micro定义的一个异步消息的接口,同时使用插件的形式,可随意在不同的实现(http,nats,rabbitmq)之间无缝切换。
// Broker is an interface used for asynchronous messaging.
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}
从上面的接口可以看出,使用Broker来完成发布订阅只需要以下几步:
- 初始化一个Broker(
Init
) - 连接Broker(
Connect
) - 使用准备好的Broker发布/订阅(
Publish/Subscribe
) - 关闭Broker(
Disconnect
)
go-micro中默认的broker实现
go-micro默认有基于http的Broker实现,可以直接使用。micro有给出具体的example,具体看下source code中的实现。
下面是go-micro中broer.go中对DefaultBroker的相关code:
var (
DefaultBroker Broker = NewBroker()
)
func Init(opts ...Option) error {
return DefaultBroker.Init(opts...)
}
func Connect() error {
return DefaultBroker.Connect()
}
func Disconnect() error {
return DefaultBroker.Disconnect()
}
func Publish(topic string, msg *Message, opts ...PublishOption) error {
return DefaultBroker.Publish(topic, msg, opts...)
}
func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
return DefaultBroker.Subscribe(topic, handler, opts...)
}
func String() string {
return DefaultBroker.String()
}
可以看到都是基于NewBroker()
返回的broker实例来做的公用方法封装,我们进一步看看。
// NewBroker returns a new http broker
func NewBroker(opts ...Option) Broker {
return newHttpBroker(opts...)
}
这里是直接返回了一个http实现的broker(和上面提到的默认是基于http实现的匹配),继续跟newHttpBroker
。
这里这列出部分code,详细的可直接参考go-micro下的http.go
h := &httpBroker{
id: uuid.New().String(),
address: addr,
opts: options,
r: options.Registry,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
inbox: make(map[string][][]byte),
}
这里的核心是new了一个httpBroker,做为Broker接口的实现,在具体的实现就不在这里说了,下来我们看看上面提到接口的实现。
Init
func (h *httpBroker) Init(opts ...Option) error {
h.RLock()
if h.running {
h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
for _, o := range opts {
o(&h.opts)
}
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {
h.id = "go.micro.http.broker-" + uuid.New().String()
}
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// get cache
if rc, ok := h.r.(cache.Cache); ok {
rc.Stop()
}
// set registry
h.r = cache.New(reg)
// reconfigure tls config
if c := h.opts.TLSConfig; c != nil {
h.c = &http.Client{
Transport: newTransport(c),
}
}
return nil
}
从上面的code中可以看到,Init的作用就是初始化各种配置,如果Option参数有提供,就是用参数提供的,如果没有就在这里设置一个,这里有2个点我们需要额外关注下:
-
Registry
Registry是注册中心,如果option中没有提供registry,就会使用go-micro默认实现的(msdn)
-
TLSConfig
TLSConfig是针对https的配置,默认是http
Connect
func (h *httpBroker) Connect() error {
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(h.address, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(h.address, fn)
}
if err != nil {
return err
}
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go func() {
h.run(l)
h.Lock()
h.opts.Addrs = []string{addr}
h.address = addr
h.Unlock()
}()
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// set cache
h.r = cache.New(reg)
// set running
h.running = true
return nil
}
Connect方法的主要作用是创建一个Htto Server用来接收Publish时发送的消息
Subscribe
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
var err error
var host, port string
options := NewSubscribeOptions(opts...)
// parse address for host, port
host, port, err = net.SplitHostPort(h.Address())
if err != nil {
return nil, err
}
addr, err := maddr.Extract(host)
if err != nil {
return nil, err
}
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {
secure = true
}
// register service
node := ®istry.Node{
Id: topic + "-" + h.id,
Address: mnet.HostPort(addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
"broker": "http",
"topic": topic,
},
}
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
service := ®istry.Service{
Name: serviceName,
Version: version,
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: node.Id,
topic: topic,
fn: handler,
svc: service,
}
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
// return the subscriber
return subscriber, nil
}
这部分代码的核心功能就是创建用于订阅的server,一个topic创建一个server并收集(注册)到httpSubscriber的svc列表中(发布消息时使用topic在subscriber的svc列表中查询到对应的server给他发送消息)。
Publish
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
// create the message first
m := &Message{
Header: make(map[string]string),
Body: msg.Body,
}
for k, v := range msg.Header {
m.Header[k] = v
}
m.Header["Micro-Topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
}
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService(serviceName)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
scheme = "https"
}
vals := url.Values{}
vals.Add("id", node.Id)
uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode ())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err != nil {
return err
}
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {
var nodes []*registry.Node
for _, node := range service.Nodes {
// only use nodes tagged with broker http
if node.Metadata["broker"] != "http" {
continue
}
// look for nodes for the topic
if node.Metadata["topic"] != topic {
continue
}
nodes = append(nodes, node)
}
// only process if we have nodes
if len(nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range nodes {
// publish async
if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
}
default:
// select node to publish to
node := nodes[rand.Int()%len(nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
}
}
}
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
}
}
}()
return nil
看过了上面的Subscribe
实现,这里的Publish
就比较简单
- 创建消息体并存储在inbox
- 根据topic以及broker的标签(这里是固定http)来查找订阅的server(在上面订阅模块创建的)
上面有可能会查找出多个node(订阅server),所以里面还有一个版本的机制,如果指定了版本就会给所有的匹配节点发送(默认是随机发送一个)
- 使用http post的方式(异步)把消息发送出去
Disconnect
func (h *httpBroker) Disconnect() error {
h.RLock()
if !h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
// stop cache
rc, ok := h.r.(cache.Cache)
if ok {
rc.Stop()
}
// exit and return err
ch := make(chan error)
h.exit <- ch
err := <-ch
// set not running
h.running = false
return err
}
这部分功能很简单,清空缓存并发送退出的消息,同时停止服务
以上就是go-micro中默认基于http的broker实现。
go-micro中对于broker的包装
在看完broker的http默认实现后,我们对于broker有了一个大体了解,接下来我们在看下go-micro对于broker做的包装部分,应该是为了简化使用(确实只需要一步就可以)。
订阅RegisterSubscriber
:
func main() {
// create a service
service := micro.NewService(
micro.Name("go.micro.srv.pubsub"),
)
// parse command line
service.Init()
// register subscriber
micro.RegisterSubscriber("example.topic.pubsub.1", service.Server(), new(Sub))
// register subscriber with queue, each message is delivered to a unique subscriber
micro.RegisterSubscriber("example.topic.pubsub.2", service.Server(), subEv, server.SubscriberQueue("queue.pubsub"))
if err := service.Run(); err != nil {
log.Fatal(err)
}
}
发布NewPublisher, Publish
:
func main() {
// create a service
service := micro.NewService(
micro.Name("go.micro.cli.pubsub"),
)
// parse command line
service.Init()
// create publisher
pub1 := micro.NewPublisher("example.topic.pubsub.1", service.Client())
pub2 := micro.NewPublisher("example.topic.pubsub.2", service.Client())
// pub to topic 1
go sendEv("example.topic.pubsub.1", pub1)
// pub to topic 2
go sendEv("example.topic.pubsub.2", pub2)
// block forever
select {}
}
以上只是代码节选,具体使用方法可以参考example中的pubsub。
Subscriber
订阅对比直接用Broker只需要一步RegisterSubscriber
,我们看看里面实现
//go-micro/micro.go
// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}
//go-micro/server/rpc_server.go
func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
return s.router.NewSubscriber(topic, sb, opts...)
}
func (s *rpcServer) Subscribe(sb Subscriber) error {
s.Lock()
defer s.Unlock()
if err := s.router.Subscribe(sb); err != nil {
return err
}
s.subscribers[sb] = nil
return nil
}
//go-micro/server/rpc_router.go
// router represents an RPC router.
type router struct {
.......
subscribers map[string][]*subscriber
}
//go-micro/server/subscriber.go
type subscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*registry.Endpoint
opts SubscriberOptions
}
上面的节选code可以看出,在默认server(rpcServer)中的router中定义了个map类型的变量subscribers
用来存储订阅的topic和对应处理的subscriber
,server在接收到消息后,只需要根据topic去map中找到subscriber,去处理即可。
subscriber中具体的处理,可以从定义中看出来,里面存储对应路由和响应的handler(server本身的功能),有兴趣可以在go-micro/server/subscriber.go看看具体代码实现。
Publisher
发布是在go-micro的默认client实现(rpc_client)里面定义了一个默认的broker(上面有分析过的http实现)
//go-micro/micro.go
// Deprecated: NewPublisher returns a new Publisher
func NewPublisher(topic string, c client.Client) Event {
return NewEvent(topic, c)
}
// NewEvent creates a new event publisher
func NewEvent(topic string, c client.Client) Event {
if c == nil {
c = client.NewClient()
}
return &event{c, topic}
}
//go-micro/event.go
type event struct {
c client.Client
topic string
}
func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...)
}
这里可以看到实际上是使用传递进来的client来初始化一个event,并用来发送消息,如果传递的是空,默认创建一个client(rpcClient
)。
总结
经过以上过程的追踪,最终总结下来就几点:
- broker定义了接口,micro提供的插件的形式可无缝替换实现
- go-micro提供了一个默认的broker实现,是基于http
- go-micro的基于默认的server、client以及brkoer包装了一套更简单的pub和sub方法