Canal源码分析之server模块

阅读: 评论:0

Canal源码分析之server模块
server模块的核⼼接⼝是CanalServer,其有2个实现类CanalServerWithNetty、CanalServerWithEmbeded。关于CanalServer,官⽅⽂档中有有以下描述:
下图是笔者对官⽅⽂档的进⼀步描述:
左边的图
表⽰的是Canal独⽴部署。不同的应⽤通过canal client与canal server进⾏通信,所有的canal client的请求统⼀由CanalServerWithNetty接受,之后CanalServerWithNetty 会将客户端请求派给CanalServerWithEmbeded 进⾏真正的处理。CannalServerWithEmbeded内部维护了多个canal instance,每个canal instance伪装成不同的mysql实例的slave,⽽CanalServerWithEmbeded会根据客户端请求携带的destination参数确定要由哪⼀个canal instance为其提供服务。
右边的图
是直接在应⽤中嵌⼊CanalServerWithEmbeded,不需要独⽴部署canal。很明显,⽹络通信环节少了,同步binlog信息的效率肯定更⾼。但是对于使⽤者的技术要求⽐较⾼。在应⽤中,我们可以通过CanalServerWithEmbeded.instance()⽅法来获得CanalServerWithEmbeded实例,这⼀个单例。
整个server模块⽬录结构如下所⽰:
其中上⾯的红⾊框就是嵌⼊式实现,⽽下⾯的绿⾊框是基于Netty的实现。
看起来基于netty的实现代码虽然多⼀点,这其实只是幻觉,CanalServerWithNetty会将所有的请求委派给CanalServerWithEmbedded处理。
⽽内嵌的⽅式只有CanalServerWithEmbedded⼀个类, 是因为CanalServerWithEmbedded⼜要根据destination选择某个具体的CanalInstance来处理客户端请求,⽽CanalInstance的实现位于instance模块,我们将在之后分析。因此从canal server的⾓度来说,CanalServerWithEmbedded才是server模块真正的核⼼。
CanalServerWithNetty和CanalServerWithEmbedded都是单例的,提供了⼀个静态⽅法instance()获取对应的实例。回顾前⼀节分析CanalController源码时,在CanalController构造⽅法中准备CanalServer的相关代码,就是通过这两个静态⽅法获取对应的实例的。
1.  public CanalController(final Properties properties){
2.        ....
3.      // 准备canal server
4.        ip = getProperty(properties, CanalConstants.CANAL_IP);
5.        port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
6.        embededCanalServer = CanalServerWithEmbedded.instance();
7.        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置⾃定义的instanceGenerator
8.        canalServer = CanalServerWithNetty.instance();
9.        canalServer.setIp(ip);
10.        canalServer.setPort(port);
11.        ....
12. }
CanalServer接⼝
CanalServer接⼝继承了CanalLifeCycle接⼝,主要是为了重新定义start和stop⽅法,抛出CanalServerException。
1. public interface CanalServer extends CanalLifeCycle {
2.
3.    void start() throws CanalServerException;
4.
5.    void stop() throws CanalServerException;
6. }
CanalServerWithNetty
CanalServerWithNetty主要⽤于接受客户端的请求,然后将其委派给CanalServerWithEmbeded处理。下⾯的源码显⽰了CanalServerWithNetty种定义的字段和构造⽅法
1. public class CanalServerWithNetty extends AbstractCanalLifeCycle implements CanalServer {
2.    //监听的所有客户端请求都会为派给CanalServerWithEmbedded处理
3.    private CanalServerWithEmbedded embeddedServer;      // 嵌⼊式server
4.
5.    //监听的ip和port,client通过此ip和port与服务端通信
6.    private String                  ip;
7.    private int                    port;
8.
9.    //netty组件
10.    private Channel                serverChannel = null;
11.    private ServerBootstrap        bootstrap    = null;
12.    //....单例模式实现
13.    private CanalServerWithNetty(){
14.        //给embeddedServer赋值
15.        beddedServer = CanalServerWithEmbedded.instance();
16.    }
17.    //... start and stop method
18.    //...setters
19. }
字段说明:
embeddedServer:因为CanalServerWithNetty需要将请求委派给CanalServerWithEmbeded处理,因此其维护了embeddedServer对象。
ip、port:这是netty监听的⽹络ip和端⼝,client通过这个ip和端⼝与server通信
serverChannel、bootstrap:这是netty的API。其中ServerBootstrap⽤于启动服务端,通过调⽤其bind⽅法,返回⼀个类型为Channel的serverChannel对象,代表服务端通道。关于netty知识不是本教程重点,如果读者不熟悉,可以参考笔者的。
start⽅法
start⽅法中包含了netty启动的核⼼逻辑,如下所⽰:
CanalServerWithNetty#start
1. public void start() {
2.        super.start();
账本网3.        //优先启动内嵌的canal server,因为基于netty的实现需要将请求委派给其处理
4.        if (!embeddedServer.isStart()) {
5.            embeddedServer.start();
6.        }
7.
8.          /* 创建bootstrap实例,参数NioServerSocketChannelFactory也是Netty的API,其接受2个线程池参数
9.          其中第⼀个线程池是Accept线程池,第⼆个线程池是woker线程池,
10.          Accept线程池接收到client连接请求后,会将代表client的对象转发给worker线程池处理。
11.          这⾥属于netty的知识,不熟悉的⽤户暂时不必深究,简单认为netty使⽤线程来处理客户端的⾼并发请求即可。*/
12.        this.bootstrap = new ServerBootstrap(new wCachedThreadPool(),
13.            wCachedThreadPool()));
14.
15.        /*pipeline实际上就是netty对客户端请求的处理器链,
16.        可以类⽐JAVA EE编程中Filter的责任链模式,上⼀个filter处理完成之后交给下⼀个filter处理,
17.        只不过在netty中,不再是filter,⽽是ChannelHandler。*/
18.        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
19.            public ChannelPipeline getPipeline() throws Exception {
20.                ChannelPipeline pipelines = Channels.pipeline();
21.                //主要是处理编码、解码。因为⽹路传输的传⼊的都是⼆进制流,FixedHeaderFrameDecoder的作⽤就是对其进⾏解析
22.                pipelines.addLast(Name(), new FixedHeaderFrameDecoder());
23.                //处理client与server握⼿
24.                pipelines.addLast(Name(), new HandshakeInitializationHandler());
25.                //client⾝份验证
26.                pipelines.addLast(Name(),
27.                    new ClientAuthenticationHandler(embeddedServer));
28.                //SessionHandler⽤于真正的处理客户端请求,是本⽂分析的重点
29.                SessionHandler sessionHandler = new SessionHandler(embeddedServer);
30.                pipelines.addLast(Name(), sessionHandler);
31.                return pipelines;
32.            }
33.        });
34.
迷魂阵捕鱼35.        // 启动,当bind⽅法被调⽤时,netty开始真正的监控某个端⼝,此时客户端对这个端⼝的请求可以被接受到
36.        if (StringUtils.isNotEmpty(ip)) {
37.            this.serverChannel = bootstrap.bind(new InetSocketAddress(this.ip, this.port));
38.        } else {
39.            this.serverChannel = bootstrap.bind(new InetSocketAddress(this.port));
40.        }
41.    }
关于stop⽅法⽆⾮是⼀些关闭操作,代码很简单,这⾥不做介绍。
SessionHandler
很明显的,canal处理client请求的核⼼逻辑都在SessionHandler这个处理器中。注意其在实例化时,传⼊了embeddedServer对象,前⾯我们提
过,CanalServerWithNetty要将请求委派给CanalServerWithEmbedded处理,显然SessionHandler也要维护embeddedServer实例。
这⾥我们主要分析SessionHandler的 messageReceived⽅法,这个⽅法表⽰接受到了⼀个客户端请求,我们主要看的是SessionHandler如何对客户端请求进⾏解析,然后委派给CanalServerWithEmbedded处理的。为了体现其转发请求处理的核⼼逻辑,以下代码省去了⼤量源码⽚段,如下
SessionHandler#messageReceived
1. public class SessionHandler extends SimpleChannelHandler {
2. ....
3. //messageReceived⽅法表⽰收到客户端请求
4. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
5.    ....
6.      //根据客户端发送的⽹路通信包请求类型type,将请求委派embeddedServer处理
7.        switch (Type()) {
8.            case SUBSCRIPTION://订阅请求
9.                ...
10.                embeddedServer.subscribe(clientIdentity);
11.                          ...
12.                break;
13.            case UNSUBSCRIPTION://取消订阅请求
玻璃倒角机14.                ...
15.                embeddedServer.unsubscribe(clientIdentity);
16.                ...
17.                break;
18.            case GET://获取binlog请求
19.                ....
20.                    if (Timeout() == -1) {// 根据客户端是否指定了请求超时时间调⽤embeddedServer不同⽅法获取binlog
21.                        message = WithoutAck(clientIdentity, FetchSize());
22.                    } else {
23.                        ...
24.                        message = WithoutAck(clientIdentity,
25.                            FetchSize(),
26.                            Timeout(),
27.                            unit);
28.                    }
干燥炉29.                ...
30.                break;
31.            case CLIENTACK://客户端消费成功ack请求
32.                ...
33.                  embeddedServer.ack(clientIdentity, BatchId());
34.                ...
35.                break;
36.            case CLIENTROLLBACK://客户端消费失败回滚请求
37.                ...
38.                    if (BatchId() == 0L) {
39.                        llback(clientIdentity);// 回滚所有批次
40.                    } else {
41.                        llback(clientIdentity, BatchId()); // 只回滚单个批次
42.                    }
43.                ...
44.                break;
45.            default://⽆法判断请求类型
46.                (400, MessageFormatter.format("packet type={} is NOT supported!", Type())
47.                    .getMessage(), Channel(), null);
48.                break;
49.        }
50.    ...
51. }
52. ...
53. }
可以看到,SessionHandler对client请求进⾏解析后,根据请求类型,委派给CanalServerWithEmbedded的相应⽅法进⾏处理。因此核⼼逻辑都在
CanalServerWithEmbedded中。
CannalServerWithEmbeded
CanalServerWithEmbedded实现了CanalServer和CanalService Can两个接⼝。其内部维护了⼀个Map,key为destination,value为对应的CanalInstance,根据客户端请求
携带的destination参数将其转发到对应的CanalInstance上去处理。
1. public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {
2.    ...
3.    //key为destination,value为对应的CanalInstance。
4.    private Map<String, CanalInstance> canalInstances;
5.    ...
6. }
对于CanalServer接⼝中定义的start和stop这两个⽅法实现⽐较简单,这⾥不再赘述。
在上⾯的SessionHandler源码分析中,我们已经看到,会根据请求报⽂的类型,会调⽤CanalServerWithEmbedded的相应⽅法,这些⽅法都定义在CanalService接⼝
中,如下:
1. public interface CanalService {
2.    //订阅
3.    void subscribe(ClientIdentity clientIdentity) throws CanalServerException;
4.    //取消订阅
5.    void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException;
6.    //⽐例获取数据,并⾃动⾃⾏ack
7.    Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
8.    //超时时间内批量获取数据,并⾃动进⾏ack
9.    Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException;
10.    //批量获取数据,不进⾏ack
11.    Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
12.    //超时时间内批量获取数据,不进⾏ack
13.    Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)                                                                                              throws CanalSer
14.    //ack某个批次的数据
15.    void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException;冷焊钳
16.    //回滚所有没有ack的批次的数据
17.    void rollback(ClientIdentity clientIdentity) throws CanalServerException;
18.    //回滚某个批次的数据
19.    void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException;
20. }
细⼼地的读者会发现,每个⽅法中都包含了⼀个ClientIdentity类型参数,这就是客户端⾝份的标识。
1. public class ClientIdentity implements Serializable {
2.    private String destination;
3.    private short  clientId;
4.    private String filter;
5.  ...
6. }
CanalServerWithEmbedded就是根据ClientIdentity中的destination参数确定这个请求要交给哪个CanalInstance处理的。
下⾯⼀次分析每⼀个⽅法的作⽤:
subscribe⽅法:
subscribe主要⽤于处理客户端的订阅请求,⽬前情况下,⼀个CanalInstance只能由⼀个客户端订阅,不过可以重复订阅。订阅主要的处理步骤如下:
1、根据客户端要订阅的destination,到对应的CanalInstance
2、通过这个CanalInstance的CanalMetaManager组件记录下有客户端订阅。
3、获取客户端当前订阅位置(Position)。⾸先尝试从CanalMetaManager中获取,CanalMetaManager 中记录了某个client当前订阅binlog的位置信息。如果是第⼀次订阅,肯定⽆法获取到这个位置,则尝试从CanalEventStore中获取第⼀个binlog的位置。从CanalEventStore中获取binlog位置信息的逻辑是:CanalInstance⼀旦启动,就会⽴刻去拉取binlog,存储到CanalEventStore中,在第⼀次订阅的情况下,CanalEventStore中的第⼀条binlog的位置,就是当前客户端当前消费的开始位置。
4、通知CanalInstance订阅关系变化
1. /**
2.  * 客户端订阅,重复订阅时会更新对应的filter信息
3.  */
4. @Override
5. public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
6.    Destination());
7.    //1、根据客户端要订阅的destination,到对应的CanalInstance
8.    CanalInstance canalInstance = (Destination());
9.    if (!MetaManager().isStart()) {
10.        MetaManager().start();
11.    }
一叶荻12.  //2、通过CanalInstance的CanalMetaManager组件进⾏元数据管理,记录⼀下当前这个CanalInstance有客户端在订阅
13.    MetaManager().subscribe(clientIdentity); // 执⾏⼀下meta订阅
14.  //3、获取客户端当前订阅的binlog位置(Position),⾸先尝试从CanalMetaManager中获取
15.    Position position = MetaManager().getCursor(clientIdentity);
16.    if (position == null) {
17.  //3.1 如果是第⼀次订阅,尝试从CanalEventStore中获取第⼀个binlog的位置,作为客户端订阅开始的位置。
18.        position = EventStore().getFirstPosition();// 获取⼀下store中的第⼀条
19.        if (position != null) {
20.            MetaManager().updateCursor(clientIdentity, position); // 更新⼀下cursor
21.        }
22.        logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
23.    } else {
24.        logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);
25.    }
26.    //4 通知下订阅关系变化
27.    canalInstance.subscribeChange(clientIdentity);
28. }
unsubscribe⽅法:
unsubscribe⽅法主要⽤于取消订阅关系。在下⾯的代码中,我们可以看到,其实就是到CanalInstance对应的CanalMetaManager,调⽤其unsubscribe取消这个订阅记录。需要注意的是,取消订阅并不意味着停⽌CanalInstance。当某个客户端取消了订阅,还会有新的client来订阅这个Ca
nalInstance,所以不能停。
1. /**
2.  * 取消订阅
3.  */
4. @Override
5. public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
6.    CanalInstance canalInstance = (Destination());
7.    MetaManager().unsubscribe(clientIdentity); // 执⾏⼀下meta订阅
8.    logger.info("unsubscribe successfully, {}", clientIdentity);
9. }
listAllSubscribe⽅法:

本文发布于:2023-05-13 06:53:09,感谢您对本站的认可!

本文链接:https://patent.en369.cn/patent/2/97344.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:请求   客户端   订阅   处理   委派   实现   获取   位置
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 369专利查询检索平台 豫ICP备2021025688号-20 网站地图