• 热门专题

stormfieldsGrouping按照字段分组

作者:  发布日期:2015-08-23 23:06:00
Tag标签:字段  
  • Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证bolt中的每个任务接收到的tuple数目相同.(它能实现较好的负载均衡)
    Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到同一任务, 而不同的userid则会被分配到不同的任务
    All Grouping: 广播发送,对于每一个tuple,Bolts中的所有任务都会收到.
    Global Grouping: 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task.再具体一点就是分配给id值最低的那个task.
    Non Grouping: 不分组,意思是说stream不关心到底谁会收到它的tuple.目前他和Shuffle grouping是一样的效果,有点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程去执行.
    Direct Grouping: 直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息.只有被声明为Direct Stream的消息流可以声明这种分组方法.而且这种消息tuple必须使用emitDirect方法来发射.消息处理者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)

    import java.util.Map;


    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import backtype.storm.utils.Utils;


    public class ClusterStormTopologyFieldsGrouping {

    public static class DataSourceSpout extends BaseRichSpout{
    private Map conf;
    private TopologyContext context;
    private SpoutOutputCollector collector;

    /**
    * 在本实例运行的时候被调用一次
    */
    public void open(Map conf, TopologyContext context,
    SpoutOutputCollector collector) {
    this.conf = conf;
    this.context = context;
    this.collector = collector;
    }
    /**
    * 死循环调用 心跳
    */
    int i=0;
    public void nextTuple() {
    System.err.println("spout :"+i);
    //values 就是value的list列表
    this.collector.emit(new Values(i%2,i++));
    Utils.sleep(1000);
    }
    /**
    * 声明字段名称
    */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    //fields就是field的列表
    declarer.declare(new Fields("flag","num"));
    }
    }

    public static class SumBolt extends BaseRichBolt{

    private Map stormConf;
    private TopologyContext context;
    private OutputCollector collector;
    /**
    * 只会被调用一次
    */
    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {
    this.stormConf = stormConf;
    this.context = context;
    this.collector = collector;
    }
    /**
    * 死循环,循环的获取上一级发送过来的数据(spout/bolt)
    */
    int sum = 0;
    public void execute(Tuple input) {
    //input.getInteger(0);
    Integer count = input.getIntegerByField("num");
    //获取当前线程id及发送来的数据
    System.err.println(Thread.currentThread().getId()+"--"+count);
    }


    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
    }


    public static void main(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();
    String SPOUT_NAME = DataSourceSpout.class.getSimpleName();
    String BOLT_NAME = SumBolt.class.getSimpleName();

    builder.setSpout(SPOUT_NAME, new DataSourceSpout());

    //开启3个线程执行(上面按照基数、偶数分组!这里3个线程,会有一个线程没有数据)

    builder.setBolt(BOLT_NAME, new SumBolt(),3).fieldsGrouping(SPOUT_NAME, new Fields("flag"));
    Config config = new Config();
    try {
    StormSubmitter.submitTopology(ClusterStormTopologyFieldsGrouping.class.getSimpleName(), config, builder.createTopology());
    } catch (AlreadyAliveException e) {
    e.printStackTrace();
    } catch (InvalidTopologyException e) {
    e.printStackTrace();
    }


    }


    }

     

About IT165 - 广告服务 - 隐私声明 - 版权申明 - 免责条款 - 网站地图 - 网友投稿 - 联系方式
本站内容来自于互联网,仅供用于网络技术学习,学习中请遵循相关法律法规