flink用的binlog监听source

发布 : 2019-04-24 分类 : 工具类 浏览 :

flink用的binlog监听source

前言

会有一系列的文章介绍common-*.jar的各种用法,这些工具类jar包都已上传在maven中央库。可以直接通过maven坐标引入使用。源码可以参见:https://gitee.com/rjzjh/common

实时流式分析场景分析及困惑

  Flink可以用来做到批量处理与实时流式分析,在实时流式分析这块,常规的做法是业务主动发kafka消息,然后flink监听kafka来完成的。但是在现实互联网项目中,发kafka这块由业务开发部门来实施,如业务中台。而用flink收kafka这块则一般由大数据部门来完成,这就需要跨部门沟通协调,大数据部门处理数据链的下游,它的开发进度会严重受制于业务开发部门的进度,往往一个简单的功能需要来回扯皮数天后还要等人家排期、发版。限入无穷尽的等待中。

  稍微好一些的方案是采用独立的binlog中间件来(如duckula)监听我关注的数据,直接配置实时推送进kafka,业务部门也不用开发代码,数据部门也无需等待直接完成需求,但独立中间件方案也是一种较重的方案,像duckula、阿里精卫等部署的时候就需要zookeeper、kafka等其它中间件资源,在申请监听的时候也需要运维部门或是基础架构部门来进行配置,配置之前还需要申请kafka的topic等资源。也是另一种有求于人的等待。而且在flink和mysql之间,中间隔了个duckula和kafka,出现问题的概率也更大,融错能力完全依赖于它们,况且 实时性能不能得到保障?这些都是一个问题。

  那有没有无需等待又实时性高又能完全自主控制的方案呢?当然有,那就是采用自定义binlog监听的source方案,直接由flink来监听mysql的binlog,想用哪个库就和哪个库,想用哪张表就用哪张表,这种当家做主的感觉还是很棒的。

使用binlog监听source

使用它非常简单,老规则,来一个maven坐标:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/net.wicp.tams/common-binlog-alone -->
<dependency>
<groupId>net.wicp.tams</groupId>
<artifactId>common-flink-source-binlog</artifactId>
<version>最后版本</version>
</dependency>

这个包是依赖 轻量级的binlog监听方案。flink的main方法如下写:

1
2
3
Properties props = IOUtil.fileToProperties("/demo-tams-flink.properties", BinlogTestMain.class);
Conf.overProp(props);
DataStream<DuckulaEvent> data = env.setParallelism(1).addSource(new BinlogSource());

其中resources的根目录下的配置文件demo-tams-flink.properties内容如下:

1
2
3
4
5
6
7
8
#默认的监听服务配置,如果有多套,复制此配置并修改default为其它值
common.binlog.alone.conf.default.ip=192.168.137.100
common.binlog.alone.conf.default.port=3306
common.binlog.alone.conf.default.user=root
common.binlog.alone.conf.default.password=mysql
common.binlog.alone.conf.default.dbPattern=^binlog_test_db$
common.binlog.alone.conf.default.tbPattern=^user_info$
common.binlog.alone.conf.default.rds=false

参数的配置请参看轻量级的binlog监听方案

protobuf3的序列化

首先引用依赖包,maven坐标:

1
2
3
4
5
6
7
<!-- https://mvnrepository.com/artifact/com.twitter/chill-protobuf -->
<!-- 0.9.3 0.5.2缺少方法 -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-protobuf</artifactId>
<version>0.8.3</version>
</dependency>

在env中注意ProtobufSerializer序列化处理:

1
env.getConfig().registerTypeWithKryoSerializer(DuckulaEvent.class, ProtobufSerializer.class);

使用 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package net.wicp.tams.demo.flink.streaming.lab;

import java.util.Properties;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import com.twitter.chill.protobuf.ProtobufSerializer;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import net.wicp.tams.common.Conf;
import net.wicp.tams.common.apiext.IOUtil;
import net.wicp.tams.common.flink.source.binlog.BinlogSource;
import net.wicp.tams.duckula.client.Protobuf3.DuckulaEvent;
import net.wicp.tams.duckula.client.Protobuf3.OptType;

@Slf4j
public class BinlogTestMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("file:////alidata1/binlogTest"));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setCheckpointInterval(2000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(30);
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().registerTypeWithKryoSerializer(DuckulaEvent.class, ProtobufSerializer.class);

Properties props = IOUtil.fileToProperties("/demo-tams-flink.properties", BinlogTestMain.class);
Conf.overProp(props);
DataStream<DuckulaEvent> data = env.setParallelism(1).addSource(new BinlogSource());
env.setParallelism(1);
DataStream<OptCount> sum = data.flatMap(new FlatMapFunction<DuckulaEvent, OptCount>() {
private static final long serialVersionUID = 1L;

@Override
public void flatMap(DuckulaEvent value, Collector<OptCount> out) throws Exception {
OptType optType = value.getOptType();
OptCount optCount = new OptCount();
optCount.setOptType(optType.name());
optCount.setCount(1);
out.collect(optCount);
}
}).keyBy("optType").sum("count");
sum.writeAsText("/alidata1/sink/abc.txt", WriteMode.OVERWRITE);
// sum.print().setParallelism(1);
env.execute();
}

@Data
public static class OptCount {
private String optType;
private int count;
}
}

本文作者 : andy.zhou
原文链接 : https://rjzjh.gitee.io/2019/04/24/common/common-flinksource/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!

知识 & 情怀 | 二者兼得

微信扫一扫, 向我投食

微信扫一扫, 向我投食

支付宝扫一扫, 向我投食

支付宝扫一扫, 向我投食

留下足迹