Canal源码分析之server模块
server模块的核⼼接⼝是CanalServer,其有2个实现类CanalServerWithNetty、CanalServerWithEmbeded。关于CanalServer,官⽅⽂档中有有以下描述:
下图是笔者对官⽅⽂档的进⼀步描述:
左边的图
表⽰的是Canal独⽴部署。不同的应⽤通过canal client与canal server进⾏通信,所有的canal client的请求统⼀由CanalServerWithNetty接受,之后CanalServerWithNetty 会将客户端请求派给CanalServerWithEmbeded 进⾏真正的处理。CannalServerWithEmbeded内部维护了多个canal instance,每个canal instance伪装成不同的mysql实例的slave,⽽CanalServerWithEmbeded会根据客户端请求携带的destination参数确定要由哪⼀个canal instance为其提供服务。 右边的图
是直接在应⽤中嵌⼊CanalServerWithEmbeded,不需要独⽴部署canal。很明显,⽹络通信环节少了,同步binlog信息的效率肯定更⾼。但是对于使⽤者的技术要求⽐较⾼。在应⽤中,我们可以通过CanalServerWithEmbeded.instance()⽅法来获得CanalServerWithEmbeded实例,这⼀个单例。
整个server模块⽬录结构如下所⽰:
其中上⾯的红⾊框就是嵌⼊式实现,⽽下⾯的绿⾊框是基于Netty的实现。
看起来基于netty的实现代码虽然多⼀点,这其实只是幻觉,CanalServerWithNetty会将所有的请求委派给CanalServerWithEmbedded处理。 ⽽内嵌的⽅式只有CanalServerWithEmbedded⼀个类, 是因为CanalServerWithEmbedded⼜要根据destination选择某个具体的CanalInstance来处理客户端请求,⽽CanalInstance的实现位于instance模块,我们将在之后分析。因此从canal server的⾓度来说,CanalServerWithEmbedded才是server模块真正的核⼼。
CanalServerWithNetty和CanalServerWithEmbedded都是单例的,提供了⼀个静态⽅法instance()获取对应的实例。回顾前⼀节分析CanalController源码时,在CanalController构造⽅法中准备CanalServer的相关代码,就是通过这两个静态⽅法获取对应的实例的。
1. public CanalController(final Properties properties){
2. ....
3. // 准备canal server
4. ip = getProperty(properties, CanalConstants.CANAL_IP);
5. port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
6. embededCanalServer = CanalServerWithEmbedded.instance();
7. embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置⾃定义的instanceGenerator
8. canalServer = CanalServerWithNetty.instance();
9. canalServer.setIp(ip);
10. canalServer.setPort(port);
11. ....
12. }
CanalServer接⼝
CanalServer接⼝继承了CanalLifeCycle接⼝,主要是为了重新定义start和stop⽅法,抛出CanalServerException。
1. public interface CanalServer extends CanalLifeCycle {
2.
3. void start() throws CanalServerException;
4.
5. void stop() throws CanalServerException;
6. }
CanalServerWithNetty
CanalServerWithNetty主要⽤于接受客户端的请求,然后将其委派给CanalServerWithEmbeded处理。下⾯的源码显⽰了CanalServerWithNetty种定义的字段和构造⽅法
1. public class CanalServerWithNetty extends AbstractCanalLifeCycle implements CanalServer {
2. //监听的所有客户端请求都会为派给CanalServerWithEmbedded处理
3. private CanalServerWithEmbedded embeddedServer; // 嵌⼊式server
4.
5. //监听的ip和port,client通过此ip和port与服务端通信
6. private String ip;
7. private int port;
8.
9. //netty组件
10. private Channel serverChannel = null;
11. private ServerBootstrap bootstrap = null;
12. //....单例模式实现
13. private CanalServerWithNetty(){
14. //给embeddedServer赋值
15. beddedServer = CanalServerWithEmbedded.instance();
16. }
17. //... start and stop method
18. //...setters
19. }
字段说明:
embeddedServer:因为CanalServerWithNetty需要将请求委派给CanalServerWithEmbeded处理,因此其维护了embeddedServer对象。
ip、port:这是netty监听的⽹络ip和端⼝,client通过这个ip和端⼝与server通信
serverChannel、bootstrap:这是netty的API。其中ServerBootstrap⽤于启动服务端,通过调⽤其bind⽅法,返回⼀个类型为Channel的serverChannel对象,代表服务端通道。关于netty知识不是本教程重点,如果读者不熟悉,可以参考笔者的。
start⽅法
start⽅法中包含了netty启动的核⼼逻辑,如下所⽰:
CanalServerWithNetty#start
1. public void start() {
2. super.start();
账本网3. //优先启动内嵌的canal server,因为基于netty的实现需要将请求委派给其处理
4. if (!embeddedServer.isStart()) {
5. embeddedServer.start();
6. }
7.
8. /* 创建bootstrap实例,参数NioServerSocketChannelFactory也是Netty的API,其接受2个线程池参数
9. 其中第⼀个线程池是Accept线程池,第⼆个线程池是woker线程池,
10. Accept线程池接收到client连接请求后,会将代表client的对象转发给worker线程池处理。
11. 这⾥属于netty的知识,不熟悉的⽤户暂时不必深究,简单认为netty使⽤线程来处理客户端的⾼并发请求即可。*/
12. this.bootstrap = new ServerBootstrap(new wCachedThreadPool(),
13. wCachedThreadPool()));
14.
15. /*pipeline实际上就是netty对客户端请求的处理器链,
16. 可以类⽐JAVA EE编程中Filter的责任链模式,上⼀个filter处理完成之后交给下⼀个filter处理,
17. 只不过在netty中,不再是filter,⽽是ChannelHandler。*/
18. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
19. public ChannelPipeline getPipeline() throws Exception {
20. ChannelPipeline pipelines = Channels.pipeline();
21. //主要是处理编码、解码。因为⽹路传输的传⼊的都是⼆进制流,FixedHeaderFrameDecoder的作⽤就是对其进⾏解析
22. pipelines.addLast(Name(), new FixedHeaderFrameDecoder());
23. //处理client与server握⼿
24. pipelines.addLast(Name(), new HandshakeInitializationHandler());
25. //client⾝份验证
26. pipelines.addLast(Name(),
27. new ClientAuthenticationHandler(embeddedServer));
28. //SessionHandler⽤于真正的处理客户端请求,是本⽂分析的重点
29. SessionHandler sessionHandler = new SessionHandler(embeddedServer);
30. pipelines.addLast(Name(), sessionHandler);
31. return pipelines;
32. }
33. });
34.
迷魂阵捕鱼35. // 启动,当bind⽅法被调⽤时,netty开始真正的监控某个端⼝,此时客户端对这个端⼝的请求可以被接受到
36. if (StringUtils.isNotEmpty(ip)) {
37. this.serverChannel = bootstrap.bind(new InetSocketAddress(this.ip, this.port));
38. } else {
39. this.serverChannel = bootstrap.bind(new InetSocketAddress(this.port));
40. }
41. }
关于stop⽅法⽆⾮是⼀些关闭操作,代码很简单,这⾥不做介绍。
SessionHandler
很明显的,canal处理client请求的核⼼逻辑都在SessionHandler这个处理器中。注意其在实例化时,传⼊了embeddedServer对象,前⾯我们提
过,CanalServerWithNetty要将请求委派给CanalServerWithEmbedded处理,显然SessionHandler也要维护embeddedServer实例。
这⾥我们主要分析SessionHandler的 messageReceived⽅法,这个⽅法表⽰接受到了⼀个客户端请求,我们主要看的是SessionHandler如何对客户端请求进⾏解析,然后委派给CanalServerWithEmbedded处理的。为了体现其转发请求处理的核⼼逻辑,以下代码省去了⼤量源码⽚段,如下
SessionHandler#messageReceived
1. public class SessionHandler extends SimpleChannelHandler {
2. ....
3. //messageReceived⽅法表⽰收到客户端请求
4. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
5. ....
6. //根据客户端发送的⽹路通信包请求类型type,将请求委派embeddedServer处理
7. switch (Type()) {
8. case SUBSCRIPTION://订阅请求 9. ...
10. embeddedServer.subscribe(clientIdentity);
11. ...
12. break;
13. case UNSUBSCRIPTION://取消订阅请求
玻璃倒角机14. ...
15. embeddedServer.unsubscribe(clientIdentity);
16. ...
17. break;
18. case GET://获取binlog请求
19. ....
20. if (Timeout() == -1) {// 根据客户端是否指定了请求超时时间调⽤embeddedServer不同⽅法获取binlog
21. message = WithoutAck(clientIdentity, FetchSize());
22. } else {
23. ...
24. message = WithoutAck(clientIdentity,
25. FetchSize(),
26. Timeout(),
27. unit);
28. }
干燥炉29. ...
30. break;
31. case CLIENTACK://客户端消费成功ack请求
32. ...
33. embeddedServer.ack(clientIdentity, BatchId());
34. ...
35. break;
36. case CLIENTROLLBACK://客户端消费失败回滚请求
37. ...
38. if (BatchId() == 0L) {
39. llback(clientIdentity);// 回滚所有批次
40. } else {
41. llback(clientIdentity, BatchId()); // 只回滚单个批次
42. }
43. ...
44. break;
45. default://⽆法判断请求类型
46. (400, MessageFormatter.format("packet type={} is NOT supported!", Type())
47. .getMessage(), Channel(), null);
48. break;
49. }
50. ...
51. }
52. ...
53. }
可以看到,SessionHandler对client请求进⾏解析后,根据请求类型,委派给CanalServerWithEmbedded的相应⽅法进⾏处理。因此核⼼逻辑都在
CanalServerWithEmbedded中。
CannalServerWithEmbeded
CanalServerWithEmbedded实现了CanalServer和CanalService Can两个接⼝。其内部维护了⼀个Map,key为destination,value为对应的CanalInstance,根据客户端请求
携带的destination参数将其转发到对应的CanalInstance上去处理。
1. public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {
2. ...
3. //key为destination,value为对应的CanalInstance。
4. private Map<String, CanalInstance> canalInstances;
5. ...
6. }
对于CanalServer接⼝中定义的start和stop这两个⽅法实现⽐较简单,这⾥不再赘述。
在上⾯的SessionHandler源码分析中,我们已经看到,会根据请求报⽂的类型,会调⽤CanalServerWithEmbedded的相应⽅法,这些⽅法都定义在CanalService接⼝
中,如下:
1. public interface CanalService {
2. //订阅
3. void subscribe(ClientIdentity clientIdentity) throws CanalServerException;
4. //取消订阅
5. void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException;
6. //⽐例获取数据,并⾃动⾃⾏ack
7. Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
8. //超时时间内批量获取数据,并⾃动进⾏ack
9. Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException;
10. //批量获取数据,不进⾏ack
11. Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
12. //超时时间内批量获取数据,不进⾏ack
13. Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalSer
14. //ack某个批次的数据
15. void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException;冷焊钳
16. //回滚所有没有ack的批次的数据
17. void rollback(ClientIdentity clientIdentity) throws CanalServerException;
18. //回滚某个批次的数据
19. void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException;
20. }
细⼼地的读者会发现,每个⽅法中都包含了⼀个ClientIdentity类型参数,这就是客户端⾝份的标识。
1. public class ClientIdentity implements Serializable {
2. private String destination;
3. private short clientId;
4. private String filter;
5. ...
6. }
CanalServerWithEmbedded就是根据ClientIdentity中的destination参数确定这个请求要交给哪个CanalInstance处理的。
下⾯⼀次分析每⼀个⽅法的作⽤:
subscribe⽅法:
subscribe主要⽤于处理客户端的订阅请求,⽬前情况下,⼀个CanalInstance只能由⼀个客户端订阅,不过可以重复订阅。订阅主要的处理步骤如下:
1、根据客户端要订阅的destination,到对应的CanalInstance
2、通过这个CanalInstance的CanalMetaManager组件记录下有客户端订阅。
3、获取客户端当前订阅位置(Position)。⾸先尝试从CanalMetaManager中获取,CanalMetaManager 中记录了某个client当前订阅binlog的位置信息。如果是第⼀次订阅,肯定⽆法获取到这个位置,则尝试从CanalEventStore中获取第⼀个binlog的位置。从CanalEventStore中获取binlog位置信息的逻辑是:CanalInstance⼀旦启动,就会⽴刻去拉取binlog,存储到CanalEventStore中,在第⼀次订阅的情况下,CanalEventStore中的第⼀条binlog的位置,就是当前客户端当前消费的开始位置。
4、通知CanalInstance订阅关系变化
1. /**
2. * 客户端订阅,重复订阅时会更新对应的filter信息
3. */
4. @Override
5. public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
6. Destination());
7. //1、根据客户端要订阅的destination,到对应的CanalInstance
8. CanalInstance canalInstance = (Destination());
9. if (!MetaManager().isStart()) {
10. MetaManager().start();
11. }
一叶荻12. //2、通过CanalInstance的CanalMetaManager组件进⾏元数据管理,记录⼀下当前这个CanalInstance有客户端在订阅
13. MetaManager().subscribe(clientIdentity); // 执⾏⼀下meta订阅
14. //3、获取客户端当前订阅的binlog位置(Position),⾸先尝试从CanalMetaManager中获取
15. Position position = MetaManager().getCursor(clientIdentity);
16. if (position == null) {
17. //3.1 如果是第⼀次订阅,尝试从CanalEventStore中获取第⼀个binlog的位置,作为客户端订阅开始的位置。
18. position = EventStore().getFirstPosition();// 获取⼀下store中的第⼀条
19. if (position != null) {
20. MetaManager().updateCursor(clientIdentity, position); // 更新⼀下cursor
21. }
22. logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
23. } else {
24. logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);
25. }
26. //4 通知下订阅关系变化
27. canalInstance.subscribeChange(clientIdentity);
28. }
unsubscribe⽅法:
unsubscribe⽅法主要⽤于取消订阅关系。在下⾯的代码中,我们可以看到,其实就是到CanalInstance对应的CanalMetaManager,调⽤其unsubscribe取消这个订阅记录。需要注意的是,取消订阅并不意味着停⽌CanalInstance。当某个客户端取消了订阅,还会有新的client来订阅这个Ca
nalInstance,所以不能停。
1. /**
2. * 取消订阅
3. */
4. @Override
5. public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
6. CanalInstance canalInstance = (Destination());
7. MetaManager().unsubscribe(clientIdentity); // 执⾏⼀下meta订阅
8. logger.info("unsubscribe successfully, {}", clientIdentity);
9. }
listAllSubscribe⽅法: