TiDB Binlog 源码阅读系列文章(七)Drainer server 介绍 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
PingCAP
V2EX    数据库

TiDB Binlog 源码阅读系列文章(七)Drainer server 介绍

  •  
  •   PingCAP 2019-12-25 10:56:44 +08:00 1555 次点击
    这是一个创建于 2192 天前的主题,其中的信息可能已经有所发展或是发生改变。

    作者:黄佳豪

    前面文章介绍了 Pump server,接下来我们来介绍 Drainer server 的实现,Drainer server 的主要作用是从各个 Pump server 获取 binlog,按 commit timestamp 归并排序后析 binlog 同步到不同的目标系统,对应的源码主要集中在 TiDB Binlog 仓库的 drainer/ 目录下。

    启动 Drainer Server

    Drainer server 的启动逻辑主要实现在两个函数中:NewServer(*Server).Start()

    NewServer 根据传入的配置项创建 Server 实例,初始化 Server 运行所需的字段。其中重要字段的说明如下:

    1. metrics: MetricClient,用于定时向 Prometheus Pushgateway 推送 drainer 运行中的各项参数指标。

    2. cp: checkpoint,用于保存 drainer 已经成功输出到目标系统的 binlog 的 commit timestamp。drainer 在重启时会从 checkpoint 记录的 commit timestamp 开始同步 binlog。

    3. collector: collector,用于收集全部 binlog 数据并按照 commit timestamp 递增的顺序进行排序。同时 collector 也负责实时维护 pump 集群的状态信息。

    4. syncer: syncer,用于将排好序的 binlog 输出到目标系统 (MySQL,Kafka...) ,同时更新同步成功的 binlog 的 commit timestamp 到 checkpoint。

    Server 初始化以后,就可以用 (*Server).Start 启动服务,启动的逻辑包含:

    1. 初始化 heartbeat 协程定时上报心跳信息到 etcd (内嵌在 PD 中)。

    2. 调用 collector.Start() 驱动 Collector 处理单元。

    3. 调用 syncer.Start() 驱动 Syncer 处理单元。

      errc := s.heartbeat(s.ctx) go func() { for err := range errc { log.Error("send heart failed", zap.Error(err)) } }() s.tg.GoNoPanic("collect", func() { defer func() { go s.Close() }() s.collector.Start(s.ctx) }) if s.metrics != nil { s.tg.GoNoPanic("metrics", func() { 

    后续的章节中,我们会详细介绍 Checkpoint、Collector 与 Syncer。

    Checkpoint

    Checkpoint 代码在 /drainer/checkpoint 下。

    首先看下 接口定义

    // When syncer restarts, we should reload meta info to guarantee continuous transmission. type CheckPoint interface { // Load loads checkpoint information. Load() error // Save saves checkpoint information. Save(int64, int64) error // TS get the saved commit ts. TS() int64 // Close closes the CheckPoint and release resources after closed other methods should not be called again. Close() error } 

    drainer 支持把 checkpoint 保存到不同类型的存储介质中,目前支持 mysql 和 file 两种类型,例如 mysql 类型的实现代码在 mysql.go 。如果用户没有指定 checkpoit 的存储类型,drainer 会根据目标系统的类型自动选择对应的 checkpoint 存储类型。

    当目标系统是 mysql/tidb,drainer 默认会保存 checkpoint 到 tidb_binlog.checkpoint 表中:

    mysql> select * from tidb_binlog.checkpoint; +---------------------+---------------------------------------------+ | clusterID | checkPoint | +---------------------+---------------------------------------------+ | 6766844929645682862 | {"commitTS":413015447777050625,"ts-map":{}} | +---------------------+---------------------------------------------+ 1 row in set (0.00 sec) 

    commitTS 表示这个时间戳之前的数据都已经同步到目标系统了。ts-map 是用来做 TiDB 主从集群的数据校验 而保存的上下游 snapshot 对应关系的时间戳。

    下面看看 MysqlCheckpoint 主要方法的实现。

    // Load implements CheckPoint.Load interface func (sp *MysqlCheckPoint) Load() error { sp.Lock() defer sp.Unlock() if sp.closed { return errors.Trace(ErrCheckPointClosed) } defer func() { if sp.CommitTS == 0 { sp.CommitTS = sp.initialCommitTS } }() var str string selectSQL := genSelectSQL(sp) err := sp.db.QueryRow(selectSQL).Scan(&str) switch { case err == sql.ErrNoRows: sp.CommitTS = sp.initialCommitTS return nil case err != nil: return errors.Annotatef(err, "QueryRow failed, sql: %s", selectSQL) } if err := json.Unmarshal([]byte(str), sp); err != nil { return errors.Trace(err) } return nil } 

    Load 方法从数据库中读取 checkpoint 信息。需要注意的是,如果 drainer 读取不到对应的 checkpoint,会使用 drainer 配置的 initial-commit-ts 做为启动的开始同步点。

    // Save implements checkpoint.Save interface func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error { sp.Lock() defer sp.Unlock() if sp.closed { return errors.Trace(ErrCheckPointClosed) } sp.CommitTS = ts if slaveTS > 0 { sp.TsMap["master-ts"] = ts sp.TsMap["slave-ts"] = slaveTS } b, err := json.Marshal(sp) if err != nil { return errors.Annotate(err, "json marshal failed") } sql := genReplaceSQL(sp, string(b)) _, err = sp.db.Exec(sql) if err != nil { return errors.Annotatef(err, "query sql failed: %s", sql) } return nil } 

    Save 方法构造对应 SQL 将 checkpoint 写入到目标数据库中。

    Collector

    Collector 负责获取全部 binlog 信息后,按序传给 Syncer 处理单元。我们先看下 Start 方法:

    // Start run a loop of collecting binlog from pumps online func (c *Collector) Start(ctx context.Context) { var wg sync.WaitGroup wg.Add(1) go func() { c.publishBinlogs(ctx) wg.Done() }() c.keepUpdatingStatus(ctx, c.updateStatus) for _, p := range c.pumps { p.Close() } if err := c.reg.Close(); err != nil { log.Error(err.Error()) } c.merger.Close() wg.Wait() } 

    这里只需要关注 publishBinlogs 和 keepUpdatingStatus 两个方法。

    func (c *Collector) publishBinlogs(ctx context.Context) { defer log.Info("publishBinlogs quit") for { select { case <-ctx.Done(): return case mergeItem, ok := <-c.merger.Output(): if !ok { return } item := mergeItem.(*binlogItem) if err := c.syncBinlog(item); err != nil { c.reportErr(ctx, err) return } } } } 

    publishBinlogs 调用 merger 模块从所有 pump 读取 binlog,并且按照 binlog 的 commit timestamp 进行归并排序,最后通过调用 syncBinlog 输出 binlog 到 Syncer 处理单元。

    func (c *Collector) keepUpdatingStatus(ctx context.Context, fUpdate func(context.Context) error) { // add all the pump to merger c.merger.Stop() fUpdate(ctx) c.merger.Continue() // update status when had pump notify or reach wait time for { select { case <-ctx.Done(): return case nr := <-c.notifyChan: nr.err = fUpdate(ctx) nr.wg.Done() case <-time.After(c.interval): if err := fUpdate(ctx); err != nil { log.Error("Failed to update collector status", zap.Error(err)) } case err := <-c.errCh: log.Error("collector meets error", zap.Error(err)) return } } } 

    keepUpdatingStatus 通过下面两种方式从 etcd 获取 pump 集群的最新状态:

    1. 定时器定时触发。

    2. notifyChan 触发。这是一个必须要提一下的处理逻辑:当一个 pump 需要加入 pump c 集群的时候,该 pump 会在启动时通知所有在线的 drainer,只有全部 drainer 都被通知都成功后,pump 方可对外提供服务。 这个设计的目的是,防止对应的 pump 的 binlog 数据没有及时加入 drainer 的排序过程,从而导致 binlog 数据同步缺失。

    Syncer

    Syncer 代码位于 drainer/syncer.go,是用来处理数据同步的关键模块。

    type Syncer struct { schema *Schema cp checkpoint.CheckPoint cfg *SyncerConfig input chan *binlogItem filter *filter.Filter // last time we successfully sync binlog item to downstream lastSyncTime time.Time dsyncer dsync.Syncer shutdown chan struct{} closed chan struct{} } 

    在 Syncer 的结构定义中,我们关注下面三个对象:

    • dsyncer 是真正同步数据到不同目标系统的执行器实现,我们会在后续章节具体介绍,接口定义如下:

      // Syncer sync binlog item to downstream type Syncer interface { // Sync the binlog item to downstream Sync(item *Item) error // will be close if Close normally or meet error, call Error() to check it Successes() <-chan *Item // Return not nil if fail to sync data to downstream or nil if closed normally Error() <-chan error // Close the Syncer, no more item can be added by `Sync` Close() error } 
    • schema 维护了当前同步位置点的全部 schema 信息,可以根据 ddl binlog 变更对应的 schema 信息。

    • filter 负责对需要同步的 binlog 进行过滤。

    Syncer 运行入口在 run 方法,主要逻辑包含:

    1. 依次处理 Collector 处理单元推送过来的 binlog 数据。

    2. 如果是 DDL binlog,则更新维护的 schema 信息。

    3. 利用 filter 过滤不需要同步到下游的数据。

    4. 调用 drainer/sync/Syncer.Sync() 异步地将数据同步到目标系统。

    5. 处理数据同步结果返回。

      a. 通过 Succsses() 感知已经成功同步到下游的 binlog 数据,保存其对应 commit timestamp 信息到 checkpoint。

      b. 通过 Error() 感知同步过程出现的错误,drainer 清理环境退出进程。

    小结

    本文介绍了 Drainer server 的主体结构,后续文章会具体介绍其如何同步数据到不同下游。

    原文阅读https://pingcap.com/blog-cn/tidb-binlog-source-code-reading-7/

    目前尚无回复
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     4385 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 37ms UTC 01:05 PVG 09:05 LAX 17:05 JFK 20:05
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86