轻量级的binlog监听方案

发布 : 2019-04-24 分类 : 工具类 浏览 :

轻量级的binlog监听方案

前言

会有一系列的文章介绍common-*.jar的各种用法,这些工具类jar包都已上传在maven中央库。可以直接通过maven坐标引入使用。源码可以参见:https://gitee.com/rjzjh/common

数据实时推送

互联网体系架构具有可控性差、 数据量大、 架构复杂等特点,错综复杂的各业务模块需要解耦,各异构数据需要同步,双活/多活的容灾方案需要高实时性 等,在各种场合都需要一套可靠的数据实时推送方案。mysql已成为互联网项目存储的主力,围绕着它的各外围模块急需实时地获取它的数据,binlog监听是解决此实时同步问题的不二之选。

使用场景举例

在数据的实时推领域,由于mysql的广泛使用,使用binlog监听具体非常广泛的需求。随便都能说出重要场景:

  • mysql到ES数据同步
    大家知道mysql是关系型数据库,当数据量非常大时(经验值500W,这个没有准确数),它的查询已不是靠建个索引之类的可以提高它的查询效率的,一般来说互联网项目是会使用 sharding分库分表来解决查询问题,但是不管你使用服务端分库分表中间件(mycat为代表),还是使用客户端分库分表(tddl为代表),做join之类的关联查询会非常不方便,在非分库分表键上做查询有也限制。这时我们可能会想到用ES来做查询,mysql仍然做增删改。这时就会出现mysql与ES不同步的问题,我们可以采用双写的机制,但是要保证写mysql与写ES这个原子性, 事务也是头疼的问题,而且耦合度也高。采用binlog同步来解耦则是比较好的方案。
  • 两地数据复制
    为了提高系统的健壮性,重要一点的服务都会做双活、多活方案。采用binlog同步来两地复制也有必要
  • 大数据的实时计算
    像flink,spark这些实时计算引擎也需要源源不到的实时数据喂给它。

    binlog监听原理

    Mysql有很多日志,redo, undo 等二进制日志是各存储引擎产生的,binlog日志却属于逻辑日志,与引擎无关。binlog日志有三种格式:
     1、 Statement 记录每条SQL
     2、 ROW 记录具体的数据
     3、 MiXED 混合模式,一般的语句修使用statment格式,但对一些函数,statement无法完成主从复制的操作。
    我们使用的binlog监听都是采用ROW类型来做监听,MySQL replication协议,它是用于做主从复制的协议。一般的binlog监听都是利用这个协议,伪装为从服务器,诱骗Mysql的主服务器把binlog日志发送给它。
    mysql还有一个有必要说的概念是Gtid。它是MySQL的全局唯一标识,用来做主备切换的位点恢复。

启动一个binlog监听

需要要项目中引入一个jar包,maven坐标为:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/net.wicp.tams/common-binlog-alone -->
<dependency>
<groupId>net.wicp.tams</groupId>
<artifactId>common-binlog-alone</artifactId>
<version>*****</version>
</dependency>

最简单启动方式像下面一样使用api启动监听:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testListener() {
ConnConf.Builder build = ConnConf.newBuilder();
build.setIp("192.168.137.100");
build.setPort(3306);
build.setClientId(77987);
build.setUser("root");
build.setPassword("mysql");
build.setListener("net.wicp.tams.common.binlog.alone.parser.test.TestListener");
BinlogStart.listening(build);
}

当然也支持配置的方式启动监听:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void startListenerForConfig() throws IOException {
Properties props=new Properties();
props.put("common.binlog.alone.conf.default.ip", "192.168.137.100");
props.put("common.binlog.alone.conf.default.user", "root");
props.put("common.binlog.alone.conf.default.password", "mysql");
props.put("common.binlog.alone.conf.default.listener", "net.wicp.tams.common.binlog.alone.parser.test.TestListener");
Conf.overProp(props);
BusiAssit.startListenerForConfig();
System.in.read();
}

这个依赖包支持哪些配置项呢?看下面的默认配置项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#h2数据库文件存放地点,环境变量H2_DIR可以覆盖这个值
common.binlog.alone.h2.dir=.tams
#默认的监听服务配置,如果有多套,复制此配置并修改default为其它值
common.binlog.alone.conf.default.ip=localhost
common.binlog.alone.conf.default.port=3306
common.binlog.alone.conf.default.user=root
common.binlog.alone.conf.default.password=null
common.binlog.alone.conf.default.listener=null
common.binlog.alone.conf.default.dbPattern=null
common.binlog.alone.conf.default.tbPattern=null
common.binlog.alone.conf.default.rds=false
#设置了groupId表示需要做分布式锁,同一个groupId+ip就是一个集群分布式锁
common.binlog.alone.conf.default.groupId=null
#默认使用CheckPointH2db,还提供了CheckPointMemory的chk实现,CheckPointMysql多进程的HA机制
common.binlog.alone.conf.default.chk=net.wicp.tams.common.binlog.alone.checkpoint.CheckPointH2db
#这是一个int类型,可以不填,系统会自动生成一个,为了做HA,必须填一个,否则不能区分lastPo
common.binlog.alone.conf.default.clientId=10000
common.binlog.alone.conf.default.pos.gtids=null
#cur表示从当前最新位点启动,last表示从记录的最后位点启动,pos表示从上面设置的gtids启动
common.binlog.alone.conf.default.haType=last
#这是mysql需要的数据库连接,只有在默认的chk为CheckPointMysql才能起作用
common.binlog.alone.chk.mysql.ip=null
common.binlog.alone.chk.mysql.port=3306
common.binlog.alone.chk.mysql.defaultdb=tams
common.binlog.alone.chk.mysql.user=null
common.binlog.alone.chk.mysql.password=null

common.binlog.alone.conf. 是监听配置的前缀
default 是监听配置的key,此jar支持监听多套mysql,特别是分库分表的情况
default后就是就真正的配置项了,它们除了ip,端口,用户名,密码外还有:
listener 监听类,这个必须,它必须实现下面接口:

1
2
3
4
5
6
7
8
public interface IBinlogListener {
//初始化,在启动监听前会触发
public void init();
//关闭,在关闭监听后会触发
public void close();
//真正处理的业务逻辑
public void doBui(DuckulaEvent duckulaEvent);
}

dbPattern 库名过滤,正则表达式
tbPattern 表名过滤,正则表达式
rds 你监听的库是不是阿里云的rds,rds会对无主键的表自动创建一个rowkey字段做为主键
groupId 分布式锁配置,同一个groupId监听同一个数据库实例的多个监听任务,
只有一个会成功启动监听。如是为”null” 表示不启用分布式锁
chk 元数据存储实现,建议不填,后面介绍
clientId 监听时需要注册的clientId,mysql要求clientId不能重复,建议不填
pos.gtids 开始监听的gtid号,不填表示从当前位点开始监听。
haType 表示HA类型,有三种,见上面注解
common.binlog.alone.chk.mysql .** 这一系列的配置只有在checkpoint为CheckPointMysql才起作用

下面就上面示例使用的监听者net.wicp.tams.common.binlog.alone.parser.test.TestListener:

1
2
3
4
5
6
7
8
9
10
11
package net.wicp.tams.common.binlog.alone.parser.test;

import net.wicp.tams.common.binlog.alone.normalize.AbsBinlogListener;
import net.wicp.tams.duckula.client.Protobuf3.DuckulaEvent;

public class TestListener extends AbsBinlogListener {
@Override
public void doBui(DuckulaEvent duckulaEvent) {
System.out.println("aaaaa");
}
}

轻量级的监听方案

由于duckula中间件(binlog监听中间件)依赖较多的其它中间件,如:zookeeper、kafka 等,需要考虑的场景也较多,对于某些场景确实觉得较重了点。比如:属地化部署、flink的source。
针对这些场景,需要一套轻量级的binlog监听方案,可以不需要duckula里面分布式锁等功能。只要求能收到binlog事件就行了。那么binlog监听独立jar包(common-binlog-alone)就是为了解决这一痛点而开发。

  • 它具有如下特点:
    1、表的列名colName的控制,特别是在有字段名变更时
    2、位点的控制,与checkpoint的完美结合
    3、序列化,为应用做为远程传送提供方案。
    4、duckuka是进程方式监听,由于要集成到flink和与业务代码共用一个JVM,所以监听只能改为线程方式。
    5、能从历史位点精准恢复监听。
    6、通用性,任何有监听需求的项目只要引入此jar包,传入一些必填参数后就可以操作监听数据。
    7、checkpoint与colName存储的可替换性,jar包默认是存储在h2db内嵌数据库中的,它不是分布式的中央存储,如果应用有需求,只需要实现相应的接口,它们也可以存储在 zookeeper、mysql等集中式中间件中。

  • 引入的业务应用需要考虑的问题:
    1、如果过了监听时效性(也就是binlog日志被删除了,无法从checkpoint中的位点进行监听),如何补救。
    2、监听程序发生不可恢复异常要如何处理
    3、监听是顺序给到应用注册的“监听者”的,但不排除在异常情况下(主备切换等),某些binlog事件不是顺序给到, 如果应用需要顺序性的要求场景,需要考虑这种情况。
    4、分布式锁,目前实现分布式锁有三种 zookeeper,redis,数据库,由于它是独立jar包,设计之初就是为了“轻量”,所以不会依赖任何第三方系统,这与分布式锁必须借助第三方系统来实现有冲突。需要业务应用跟据不同情况完成分布式锁。

    设计原理图

    img
    chk元数据存储:此监听方案会内置一个默认的元数据存储实现,它是用H2DB这个内嵌数据库来实现的,存储数据文件在 ~/.tams/目录 下面,文件名为 “监听ip_监听端口”的形式。如果有需求业务可以实现下面接口,然后修改配置项“chk”为实现类的全类名就可以了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public interface ISaveCheckPoint {
/***
* 初始化,如启动服务,或是创建连接等。
*/
public void init(ConnConf.Builder connConfBuilder);

/***
* 在JVM退出时要做的清理工作
*/
public void shutdown();

public void savePoint(Position pos);

/****
* db/tb/timestamp联合唯一索引
*
* @param colHis
*/
public void saveColName(ColHis colHis);

/***
* 找到指定时间前最近的一个位点
*
* @param time
* @return
*/
public Position findPoint(long time);

/***
* 找到指定时间后的所有字段名
*
* @param db
* @param tb
* @return
*/
public List<ColHis> findColsList(String db, String tb);

/****
* 查所以的列
*
* @return
*/
public List<ColHis> findColsAll();
/***
* 获得锁
* @return
*/
public YesOrNo acquireLock();
/***
* 释放锁
*/
public void releaseLock();
}

内置的checkpoint实现及适应场景

看上面的设计原理图,此jar包需要一个H2DB来存储位点、colName历史等binlog监听需要的元数据信息,这个H2DB存储方案是可以替换的,可以换成zookeeper、redis、mysql 等,甚至是一个文件也可以,只要实现上面的ISaveCheckPoint接口就可以了。独立jar包也跟据不同场景内置了三个checkpoint实现。

1、内存实现(net.wicp.tams.common.binlog.alone.checkpoint.CheckPointMemory)
位点与colname保存在内存,只要JVM退出,所有的位点与colname都将丢失,不支持用来做HA,它的优点是速度快,不依赖其它东西(连文件系统也不依赖),要做HA必需要求业务应用自己做好了checkpoint,如flink,现阶段 自定义的binlog监听的 flink source 使用了此实现

2、H2DB内嵌数据库实现(net.wicp.tams.common.binlog.alone.checkpoint.CheckPointH2db)
位点与colname保存在内嵌数据库H2DB,JVM重启后可以从上次停止的位点启动监听,只要在同一台服务器上,保存的元数据就不会丢失,它的优点是不依赖其它中间件,但依赖文件系统(H2DB可以使用内存模式,也就退化为内存实现了),缺点是它保留的元数据只能在运行服务器上,不能跨服务器做HA。它适用于Demo及单实例的应用。

3、mysql实现(net.wicp.tams.common.binlog.alone.checkpoint.CheckPointMysql)
位点与colname保存在mysql数据库,提供了最完备的HA支持,可以跨服务器重启监听而不丢数据。它但依赖mysql数据库,并且要创建一个用于储存元数据的专用库,且用户有建表权限。生产环境推荐使用此方案,一般来说生产环境的应用会启3个及以上的实例来进行负载,这样监听也会启用多个,它执行执行相同的监听器,对于做过幂等的监听器也许没什么问题,但如果没做幂等那就会出大问题。依赖包内置mysql实现的分布式锁方案,如果业务感觉不完美否是已在使用zookeeper或是redis等更完美的分布式锁方案,可以使用自己熟悉的分布式锁方案,只需把groupId设置为”null”,在启动监听前自行获取自己的分布式锁,成功后再启动监听就OK了。

jar包默认的是H2DB内嵌数据库实现,如有其它需求,需要覆盖配置项“common.binlog.alone.conf.default.chk”

内部结构图

img

总结

此轻量级监听方案虽没有duckula这一监听中间件功能来的强大与完备,全量ES导入,位点监控、离线解析等功能,但能满足业务数据实时推送的大部分的需求,如mysql到redis的实时同步。 同步写kafka等功能。通过不同的checkpoint实现,完成大部分的场景需求。如果只是监听那么1、2张表,跟本不需要引用duckula中间件,毕竞,为了1、2张表安装三个中间件(duckula依赖zookeeper和kafka)确实太重了,引用独立的监听包不失为一种更优的方案。它现在已支持的特性有(只有在mysql实现的checkpoint时全部支持):

1、分布式锁: 同一个groupId只有一个任务能成功,其它的监听一直等待
2、断网重连: 如果出现网络闪断或网络波动情况,网络恢复后自动从上次位点重连,不需人工干预
3、高可用:如果一个监听的JVM停止后,可以由其它监听得到分布式锁,从死了的监听位点启动


使用此轻量级监听方案,业务方需要关注的点有:

业务注册的监听器异常处理:

此方案并未对异常进行处理,如果出现异常直接打印log日志,重新监听此位点,如果业务方没有做异常处理会导致监听程序不断的在此位点重新监听并重启,这是因为binlog监听需要保证不丢数据,只能采用此策略,依赖业务方自己做好异常处理,一般来说有2种手段处理异常,一、出异常时保存好出异常的数据并跳过此异常。二、一直重试并等待人工干预,它适用于不能丢失的重要数据监听(这是此方案的默认策略)

业务注册的监听器处理长时间阻塞

由于要保证顺序,此监听方案采用单线程来把监听数据给到业务注册的监听器,如果业务注册的监听器由于各种原因处理一直阻塞会导致binlog的监听一直等待。需要业务注册的监听器有超时机制,不能无限等待。

性能问题

此方案的监听主体部分代码做过压测,只做监听不做业务逻辑,在网络条件非常好的情况下(包延时在1ms以下),可以达到3.5W 条记录/秒(每条记录60个字段左右)。所以binlog监听这块并不会出现性能问题。如果有位点更新延时那肯定是业务注册的监听器处理业务时用的时候较长,为了保证顺序性binlog的监听是并线程的,为了保证性能,业务在监听器里可以考虑使用多线程处理。如何使用多线程?主要有下面三种方式:

一、 不保证顺序:

启一个带阻塞功能的线程池,只要有监听数据就往线程池里丢,处理逻辑写在线程池的线程里,监听器只管生产数据给线程池就可以了。

二、保证有条件顺序:

启动多个只有一个线程的线程池,并给这些线程按 1/2/3/4/5/6/7/8 标好序号,只要有数据来了,监听器打开数据取出其中的id字段,如果是整形直接取模拿到序号,然后往对应的序号线程池里抛,如果是字符型就需要加一个hash再取模拿到序号。

三、保证全局有序:

可以采用disruptor的ringbuff的环状缓冲池。业务注册的监听器做为生产者而消费者可以是多线程,注意它也不能保证完全的全局有序,它是分批拿给消费者的,比如有10个消费者,那么它10个一批拿给消费者,只有等10个消费者全部处理完了,再拿10给消费者,第二批可以比第一批晚招待,但是同一批的10条记录并不能保证先后顺序。

本文作者 : andy.zhou
原文链接 : https://rjzjh.gitee.io/2019/04/24/common/common-binlog/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹