• 热门专题

Flume-NG源码阅读之SourceRunner,及选择器selector和拦截器interceptor的执行

作者:玖疯  发布日期:2014-05-26 22:43:29
Tag标签:源码  
  •   在AbstractConfigurationProvider类中loadSources方法会将所有的source进行封装成SourceRunner放到了Map<String, SourceRunner> sourceRunnerMap之中。相关代码如下:

     1       Map<String, String> selectorConfig = context.getSubProperties(
     2               BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
     3 
     4           ChannelSelector selector = ChannelSelectorFactory.create(
     5               sourceChannels, selectorConfig);
     6 
     7           ChannelProcessor channelProcessor = new ChannelProcessor(selector);
     8           Configurables.configure(channelProcessor, context);
     9           source.setChannelProcessor(channelProcessor);
    10           sourceRunnerMap.put(sourceName,
    11               SourceRunner.forSource(source));

      每个source都有selector。上述代码会获取配置文件中关于source的selector配置信息;然后构造ChannelSelector对象selector;并封装selector对象成ChannelProcessor对象channelProcessor;执行channelProcessor.configure方法进行配置;设置soure的channelprocessor,最后封装为sourceRunner和source名称一起放入sourceRunnerMap中。 

      一、ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig)会根据配置文件中指定的类型实例化一个ChannelSelector(共两种ReplicatingChannelSelector复制和MultiplexingChannelSelector复用)如果没有指定类型默认是ReplicatingChannelSelector,也就是配置文件中不用配置selector会将每个event复制发送到多个channel;selector.setChannels(channels);对此slector进行配置configure(context)。这两中selector都实现了三个方法getRequiredChannels(Event event)、getOptionalChannels(Event event) 以及configure(Context context)。其实Event要发送到的channel有两种组成:RequiredChannels和OptionalChannels,对应两个方法。

      (1)ReplicatingChannelSelector的configure(context)方法会获得通过'optional'在配置文件中指定的可选发送的channels(可以多个,通过空格分割);获取requiredChannels是此source对应的channel中可以活动的channel列表;然后获取所有channel的名字及其与channel的映射channelNameMap;然后将可选的channel加入optionalChannels并从requiredChannels去掉有对应的channel,在这里并没有检查可选channel的合法性以及可以配置此source指定的channel之外的channel,requiredChannels和optionalChannels不能有交集。getOptionalChannels方法就是直接返回optionalChannels列表。getRequiredChannels方法返回requiredChannels列表,如果requiredChannels为null,则返回全部的可以活动的channel列表。

      (2)MultiplexingChannelSelector的configure(context)先获取要匹配的event的header,headerName;获得默认发送到的channel列表defaultChannels;获得mapping的各个子值,及对应的channel名称mapConfig;用来存储header不同的值及其对应的要发送到的channel列表,可以发送到多个channel的channelMapping;optionalChannels是配置的可选的发送channel,channelMapping中已经出现的channel不允许再次在optionalChannels出现,optionalChannels存储的是对应header的各个值及其等于该值的event要发送到的可选择的channel列表。getOptionalChannels(Event event)方法返回的是optionalChannels中该event的指定header对应的可选择的channel列表。getRequiredChannels(Event event)方法返回的是channelMapping中该event的指定header对应的channel列表,如果为null(表示由于该event的headers没有匹配的channel就发送到默认的channel中)就返回默认发送列表defaultChannels。

      二、 ChannelProcessor channelProcessor = new ChannelProcessor(selector)这个是封装选择器构造channelprocessor。其构造方法会赋值selector并构造一个InterceptorChain对象interceptorChain。ChannelProcessor类负责管理选择器selector和拦截器interceptor。

      三、执行channelProcessor.configure(Context)进行必要的配置,该方法会调用channelProcessor.configureInterceptors(context)对拦截器们进行获取和配置,configureInterceptors方法会先从配置文件中获取interceptor的组件名字interceptorNames[](可以多个),然后获取所有的“interceptors.”的配置信息interceptorContexts,然后遍历所有interceptorNames从配置文件中获取属于这个interceptor的配置信息及类型(type),根据类型构建相应的interceptor并进行配置configure,加入interceptors列表(用来存放实例化的interceptor);最后将列表传递给interceptorChain。关于更多interceptor的信息可以看这篇Flume-NG源码阅读之Interceptor(原创) 。  

      四、source.setChannelProcessor(channelProcessor)赋值。各个source通过getChannelProcessor()方法获取processor调用其processEventBatch(events)或者processEvent(event)来将event送到channel中。

      五、sourceRunnerMap.put(sourceName,SourceRunner.forSource(source))将source封装成SourceRunner放入sourceRunnerMap。SourceRunner.forSource会根据这个source所实现的接口封装成不同的Runner,有两种接口PollableSource和EventDrivenSource,前者是有自己线程来驱动的需要实现process方法,后者是没有单独的线程来驱动的没有process方法。

     1 public static SourceRunner forSource(Source source) {
     2     SourceRunner runner = null;
     3 
     4     if (source instanceof PollableSource) {
     5       runner = new PollableSourceRunner();
     6       ((PollableSourceRunner) runner).setSource((PollableSource) source);
     7     } else if (source instanceof EventDrivenSource) {
     8       runner = new EventDrivenSourceRunner();
     9       ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
    10     } else {
    11       throw new IllegalArgumentException('No known runner type for source '
    12           + source);
    13     }
    14 
    15     return runner;
    16   }

      (1)PollableSourceRunner的start()方法会获取source的ChannelProcessor,然后执行其initialize()方法,该方法会调用interceptorChain.initialize()方法对拦截器们进行初始化(遍历所有拦截器然后执行拦截器的initialize()方法);然后执行source.start()启动source;再启动一个线程PollingRunner,它的run方法会始终执行source.process()并根据返回的状态值做一些统计工作。

      (2)EventDrivenSourceRunner的start()方法会获取source的ChannelProcessor,然后执行其initialize()方法,该方法会调用interceptorChain.initialize()方法对拦截器们进行初始化(遍历所有拦截器然后执行拦截器的initialize()方法);然后执行source.start()启动source。

      这样就完成了sourceRunnerMap的组装。当在Application中的startAllComponents方法中通过materializedConfiguration.getSourceRunners()获取所有的SourceRunner并放入supervisor.supervise中去执行,会调用到SourceRunner.start()方法,即上面刚讲到的内容。这样source就启动了。然后当将封装的Events或者Event发送到channel时,需要使用对应的方法ChannelProcessor.processEventBatch(List<Event> events)或者ChannelProcessor.processEvent(Event event)就可以将数据从source传输到channel中,这两个方法都会在开始调用interceptorChain.intercept(events)或者interceptorChain.intercept(event)对event增加headers(如果有多个interceptor会遍历interceptors处理每个event)。ChannelProcessor都是通过在source中直接调用getChannelProcessor()(在所有的source的父类AbstractSource中实现的)获得。看一看processEventBatch(List<Event> events)代码:

     1 public void processEventBatch(List<Event> events) {
     2     Preconditions.checkNotNull(events, 'Event list must not be null');
     3 
     4     events = interceptorChain.intercept(events);
     5 
     6     Map<Channel, List<Event>> reqChannelQueue =        //需要发送到的每个channel及其要发送到这个channel的event列表
     7         new LinkedHashMap<Channel, List<Event>>();
     8 
     9     Map<Channel, List<Event>> optChannelQueue =        //可选的每个channel及其要发送到这个channel的event列表
    10         new LinkedHashMap<Channel, List<Event>>();
    11 
    12     for (Event event : events) {
    13       List<Channel> reqChannels = selector.getRequiredChannels(event);    //获取需要发送到的所有channel
    14 
    15       for (Channel ch : reqChannels) {
    16         List<Event> eventQueue = reqChannelQueue.get(ch);
    17         if (eventQueue == null) {
    18           eventQueue = new ArrayList<Event>();
    19           reqChannelQueue.put(ch, eventQueue);
    20         }
    21         eventQueue.add(event);        //将event放入对应channel的event列表
    22       }
    23 
    24       List<Channel> optChannels = selector.getOptionalChannels(event);    //获取可选的要发送到的所有channel
    25 
    26       for (Channel ch: optChannels) {
    27         List<Event> eventQueue = optChannelQueue.get(ch);
    28         if (eventQueue == null) {
    29           eventQueue = new ArrayList<Event>();
    30           optChannelQueue.put(ch, eventQueue);
    31         }
    32 
    33         eventQueue.add(event);        //将event放入对应channel的event列表
    34       }
    35     }
    36 
    37     // Process required channels
    38     for (Channel reqChannel : reqChannelQueue.keySet()) {
    39       Transaction tx = reqChannel.getTransaction();    //创建事务
    40       Preconditions.checkNotNull(tx, 'Transaction object must not be null');
    41       try {
    42         tx.begin();
    43 
    44         List<Event> batch = reqChannelQueue.get(reqChannel);
    45 
    46         for (Event event : batch) {        //发送到需要发送到的channel
    47           reqChannel.put(event);
    48         }
    49 
    50         tx.commit();
    51       } catch (Throwable t) {
    52         tx.rollback();    //事务回滚
    53         if (t instanceof Error) {
    54           LOG.error('Error while writing to required channel: ' +
    55               reqChannel, t);
    56           throw (Error) t;
    57         } else {
    58           throw new ChannelException('Unable to put batch on required ' +
    59               'channel: ' + reqChannel, t);
    60         }
    61       } finally {
    62         if (tx != null) {
    63           tx.close();
    64         }
    65       }
    66     }

      上述代码不复杂,会获得所有需要发送到的channel和所有可选的channel,然后针对每个channel,将所有event放入一个列表与该channel组成映射;然后会遍历两种channel列表中的每个channel将它对应的所有event发送到对应的channel中。这个方法写的不够友好,还可以再优化,因为方法的参数本身就是一个列表可以省去一层for循环,直接将reqChannelQueue.put(ch, eventQueue)和optChannelQueue.put(ch, eventQueue)中的eventQueue改为传递过来的参数List<Event> events就可以达到优化的目的。

      processEvent(Event event)方法就更简单了,将这个event发送到这两种channel列表中每个channel就可以。

      在发送到channel的过程中我们也发现都会有事务的创建(getTransaction())、开始(tx.begin())、提交(tx.commit())、回滚(tx.rollback())、关闭(tx.close())等操作,这是必须的。在sink中这些操作需要显示的去调用,而在source端则封装在processEvent和processEventBatch方法中,不需要显示的调用了,但不是不调用。

      至此,sourceRunner的配置、初始化、执行就讲解完毕了。在配置文件中看到的interceptor和selector都是在这里进行配置及执行的。通过了解上述,我们自定义source组件是不是更容易了。呵呵

     

About IT165 - 广告服务 - 隐私声明 - 版权申明 - 免责条款 - 网站地图 - 网友投稿 - 联系方式
本站内容来自于互联网,仅供用于网络技术学习,学习中请遵循相关法律法规