etcd-raft-网络分析

etcd raft transport模块

etcd raft继承了简约的设计理念,只实现了最核心的raft算法, 节点之间数据的传输需要用户自己来实现,etcd自己使用的网络层库为transport。

简介

transport模块是etcd提供的raft网络库,其golang地址为”go.etcd.io/etcd/etcdserver/api/rafthttp”。 etcd-raft网络传输模块主要用于在一个raft集群的节点之间进行raft协议的消息传输,这里的消息包括raft协议中定义的所有消息类型,例如选主时的投票消息、日志复制消息、snapshot拷贝消息、集群节点变更消息等等。

如何使用transport

etcd的kv案例中就是使用transport作为raft的网络层,对于用户来说transport的使用比较简单。

  1. 初始化一个transport实例,官方案例如下:
  • 需要传入本节点的id,clusterid等参数。
    rc.transport = &rafthttp.Transport{
          Logger:      zap.NewExample(),
          ID:          types.ID(rc.id),
          ClusterID:   0x1000,
          Raft:        rc,
          ServerStats: stats.NewServerStats("", ""),
          LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
          ErrorC:      make(chan error),
      }
    
  • 调用Start方法,并将raft集群中的其它节点的id和ip都加入到transport中 ``` rc.transport.Start()

for i := range rc.peers { if i+1 != rc.id { rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]}) } }


- 需要调用Handler函数,这个函数会启动几个http的监听端口,用于在raft节点之间传输数据。

err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)


至此就完成了transport的所有的准备工作。

2. 发送数据

transport.Send(Messages)

直接调用Send方法发送raftnode Ready方法返回的Messages数据。

3. 添加、删除节点
如果集群中间有节点的新增和删除,用户可以调用AddPeer和RemovePeer方法完成节点的新增和删除。

## 原理
transport主要分为raft消息的传输部分和节点存活的探针模块(对接prometheus),接下来主要着重讲raft节点之间的数据传输部分。
transport主要通过Peer子模块来实现节点之间的数据传输,transport会维护一个节点ID到Peer的map结构(peers)

transport的结构体如下:

type Transport struct { Logger *zap.Logger

DialTimeout time.Duration // maximum duration before timing out dial of the request
// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
// a distinct rate limiter is created per every peer (default value: 10 events/sec)
DialRetryFrequency rate.Limit

TLSInfo transport.TLSInfo // TLS information used when creating connection

ID          types.ID   // local member ID
URLs        types.URLs // local peer URLs
ClusterID   types.ID   // raft cluster ID for request validation
Raft        Raft       // raft state machine, to which the Transport forwards received messages and reports status
Snapshotter *snap.Snapshotter
ServerStats *stats.ServerStats // used to record general transportation statistics
// used to record transportation statistics with followers when
// performing as leader in raft protocol
LeaderStats *stats.LeaderStats
// ErrorC is used to report detected critical errors, e.g.,
// the member has been permanently removed from the cluster
// When an error is received from ErrorC, user should stop raft state
// machine and thus stop the Transport.
ErrorC chan error

streamRt   http.RoundTripper // roundTripper used by streams
pipelineRt http.RoundTripper // roundTripper used by pipelines

mu      sync.RWMutex         // protect the remote and peer map
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers   map[types.ID]Peer    // peers map, 维护集群节点的map结构

pipelineProber probing.Prober
streamProber   probing.Prober } ```

发送数据

transport调用Send方法来发送数据到其它节点

Send(m []raftpb.Message)

// raftpb.Message的proto 数据格式
message Message {
    optional MessageType type        = 1  [(gogoproto.nullable) = false];
    optional uint64      to          = 2  [(gogoproto.nullable) = false];
    optional uint64      from        = 3  [(gogoproto.nullable) = false];
    optional uint64      term        = 4  [(gogoproto.nullable) = false];
    optional uint64      logTerm     = 5  [(gogoproto.nullable) = false];
    optional uint64      index       = 6  [(gogoproto.nullable) = false];
    repeated Entry       entries     = 7  [(gogoproto.nullable) = false];
    optional uint64      commit      = 8  [(gogoproto.nullable) = false];
    optional Snapshot    snapshot    = 9  [(gogoproto.nullable) = false];
    optional bool        reject      = 10 [(gogoproto.nullable) = false];
    optional uint64      rejectHint  = 11 [(gogoproto.nullable) = false];
    optional bytes       context     = 12;
}

在raftpb.Message中的To字段会标识出这条信息需要发送到节点的ID,具体的实现还是通过调用Peer子模块中的Send方法。 数据的发送有两个方法:

  • streamWriter 顾名思义,这个方法是流式的向特定节点发送数据。streamWriter分成了msgAppV2Writer和writer,但是两个实例的处理流程都一样,只不过处理的数据类型不一样,msgAppV2Writer专门处理MsgApp类型,writer处理其它类型。不清楚为什么要分为两种情况,感觉一个流式实例就能处理完成了。

  • pipeline 每次请求都单独发送一个http数据包,主要用于用户发送快照数据,并在streamWriter无效的时候作为备选方案用来发送数据。

下图为选择的流程图: 发送方式选择流程图

返回的其实是一个通道,peer将数据传入到指定的通道中,streamWriter和pipeline会从通道中获取并发送数据。

pipeline详解

pipeline的运行模式如下图:

pipeline运行模式图

pipeline会启动四个协程,竞争的从msgc队列中获取数据发送(post发送),通道的buffer size为64。

streamWriter详解

streamWrite跟pipeline一样,也是从通道中获取数据发送,只是steamWriter会保持与目标机器的一个流式链接。 streamWrite会从四个channel中获取数据:

  1. heartbeatc: 获取心跳数据并发送。

  2. msgc: 普通需要发送的数据,包括:快照数据、日志数据等。

  3. connc: 链接通道,这个通道会通知steamWriter有新的流式链接可以使用,这个时候streamWriter会替换新的io.Writer, 以后的数据就通过这个新的流式链接发送。

  4. stopc: 通知关闭这个流式链接

streamWrite发送每条数据时都会在数据的最前端加上这段数据size,这样接收端就能通过这个size获取指定大小的数据,不会造成数据的误取。 如何获取得到这个streamWriter在后续的章节中会介绍

接收数据

每个raft节点都会启动一个http的server端用于接收数据,在使用的时候需要用户手动的去启动这个http server。 暴露了如下四个接口:

  1. pipeline Handler: 用于接收pipeline方式发送过来的数据,path为/raft。

  2. stream handler: 用于接收stream发送过来的数据,路径为: /raft/stream

  3. snap handler: 用于接收pipeline发送快照时的数据,/raft/snapshot

  4. probing handler: 用于接收探活请求,/raft/probing

streamReader

程序会循环的从流式连接中读取数据,每次读取都先获取这段数据的size,然后才根据size获取指定大小的数据解析。 只有在获取数据出错时程序才会尝试去获取新的streamReader,这样能保证一条连接的数据一定是可以读取完成的。解析完成后的数据会塞入到一个channel中,peer会开启一个协程不断的从这个channel中获取数据,并把数据交给raft node处理。

    go func() {
        for {
            select {
            case mm := <-p.recvc:
                if err := r.Process(ctx, mm); err != nil {
                    if t.Logger != nil {
                        t.Logger.Warn("failed to process Raft message", zap.Error(err))
                    } else {
                        plog.Warningf("failed to process raft message (%v)", err)
                    }
                }
            case <-p.stopc:
                return
            }
        }
    }()

    // r.Process might block for processing proposal when there is no leader.
    // Thus propc must be put into a separate routine with recvc to avoid blocking
    // processing other raft messages.
    go func() {
        for {
            select {
            case mm := <-p.propc:
                if err := r.Process(ctx, mm); err != nil {
                    plog.Warningf("failed to process raft message (%v)", err)
                }
            case <-p.stopc:
                return
            }
        }
    }()

streamWrite和streamReader的获取

可能大家一定会好奇,transport中的streamWrite和streamReader怎么来的。上面讲到过,用户在使用transport的时候需要启动一个http server来处理请求。transport就使用这条链接的reader和writer来做流式写入和读取。例如A节点请求B节点, 这个时候就会把A节点的io.reader作为A节点的streamReader, 把B节点的response Writer作为streamWriter, 双方都不会主动的去关闭这条链接。所以etcd raft节点在启动时,节点之间就会主动发起请求来建立一条流式的连接。

监控

transport使用一个探针不断的去探测节点是否可达,探测的方式就是简单的调用被检测节点的get方法,如果有正常返回就认为节点可达,并把数据同步给监控模块。

尾记

转载请注明原地址,冯杰的博客:http://Lfengjie.github.io 谢谢!

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦