举个栗子之gorpc – 消息的编码和解码
2022年的第一个rpc,比以往来的更早一些…
留杭过年…写点东西
初始化项目gorpc
借助go module我们可以轻易创建一个新的项目
mkdir gorpc
go mod init github.com/taadis/gorpc
// output:
go: creating new go.mod: module github.com/taadis/gorpc
消息约定
rpc 的客户端和服务端之间通信需要传输数据消息,典型的消息结构一般有2部分组成
- header 消息头 – 用来承载约定内容,一般是相对固定的。
- body 消息体 – 用来承载用户数据,通常是不定长,也就是动态的
这里我们先定义更泛的消息体,因为消息体是动态的,所以我们可以直接用go中的 interface{} 来定义,就不用特别声明一个结构体了。
然后我们来定义一个header结构体
// codec.go
type Header struct {
Sequence uint64 // sequence number chosen by client
ServiceMethod string // format "Service.Method"
Error error
}
Sequence 序列号是客户端带过来的,每个请求总要有点不一样的地方,方便服务端根据序列号来区分不同的调用,可以理解是唯一ID。
ServiceMethod 是要远程调用的服务下的方法名词,这里对照为go语言中结构体的方法名,比如用户创建方法”User.Create”。
Error 是错误信息,用来放置一端发生的错误,以便另一种接收到消息时能根据错误进行处理,而不是直接丢失响应。
消息的编码解码
rpc 的客户端和服务端之间通信的消息有其特有的格式,因此都需要涉及编码和解码这一关键步骤,也就是我们熟称的序列化和反序列化。
以便抽象理解,我们定义一个统一的Codec接口
// codec.go
type Codec interface {
ReadHeader(*Header) error
ReadBody(interface{}) error
Write(*Header, interface{}) error
}
ReadHeader 读取信息头,如果有错误返回错误。
ReadBody 读取消息体,数据是动态的,所以使用interface{}作为参数,如果有错误返回错误。
Write 消息接收处理完成后,我们需要把结果告知给客户端,需要一个写入的操作。
这里我们借助标准库内置的encoding/gob来提高工作效率。
当然你也可以用encoding/json,encoding/xml或者其他编解码包,这里选择encoding/gob,仅仅是因为这是go所特有的。JUST GO。
接下来我们基于encoding/gob实现一个gobCodec
rpc 请求是一种网络请求,本质还是I/O,所以我们可以用io.ReadWriteCloser来定义网络链接conn.
通过gob.Decoder解码请求中的数据流至对应的结构体参数,
完成服务端调用之后,把返回结果再用gob.Encoder编码至数据流中,
最后通过bufio.Writer写入数据完成响应。
// codec.go
type gobCodec struct {
conn io.ReadWriteCloser
decoder *gob.Decoder
encoder *gob.Encoder
writeBuf *bufio.Writer
}
封装一个newGobCodec函数,方便后续调用。
func newGobCodec(conn io.ReadWriteCloser) Codec {
writeBuf := bufio.NewWriter(conn)
return &gobCodec{
conn: conn,
decoder: gob.NewDecoder(conn),
encoder: gob.NewEncoder(writeBuf),
writeBuf: writeBuf,
}
}
实现 Codec 接口中的 ReadHeader 方法
func (c *gobCodec) ReadHeader(header *Header) error {
return c.decoder.Decode(header)
}
实现 Codec 接口中的 ReadBody 方法
func (c *gobCodec) ReadBody(body interface{}) error {
return c.decoder.Decode(body)
}
实现 Codec 接口中的 Write 方法
func (c *gobCodec) Write(header *Header, body interface{}) error {
defer func() {
if c.writeBuf.Flush() != nil {
c.conn.Close()
}
}()
if err := c.encoder.Encode(header); err != nil {
return err
}
if err := c.encoder.Encode(body); err != nil {
return err
}
return nil
}
至此,rpc 中比较底层的数据编码和解码我们已经抽象出来了Codec接口,并借助encoding/gob实现了gobCodec.