本文共 7018 字,大约阅读时间需要 23 分钟。
Storm 说的是 语言无关性: Storm的topology和消息处理组件(Bolt)可以用任何语言来定义, 这一点使得任何人都可以使用storm.
这里将说明下StormI在Java中的使用,java程序是基于Springboot.这最重要的是storm如何拆解任务.其它的都是流程化的东西.(cuiyaonan2000@163.com)
参考资料:
Storm主要的三个Component:Topology、Spout、Bolt。Topology作为主进程控制着spout、bolt线程的运行,他们相当于独立运行的容器分布于storm集群中的各个机器节点。
根据如上的描述我们要在代码中创建 3个东西:
还有就是要连接数据库,连接kafka,连接Redis等等的东西,这里就需要跟我们传统的框架进行整合了.
在SpringBoot程序中如何提交storm的Topolgy?
storm是通过提交Topolgy来确定如何启动的,一般使用过运行main方法来启动,但是SpringBoot启动方式一般也是通过main方法启动的。所以应该怎么样解决呢?
2 如何让bolt和spout类使用spring注解?
package cui.yao.nan.storm.demo;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.topology.TopologyBuilder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;/** * * @author cuiyaonan2000@163.com * */@Componentpublic class CreateTopology { private static final Logger logger = LoggerFactory.getLogger(CreateTopology.class); public void createTopology(String[] args) {// 编写bolt,storm提供了两种bolt,BasicBolt和RichBolt,// RichBolt在执行execute后要手动提交ack或者fail,// BasicBolt在execute执行后会自动提交ack,但是只对FailedException异常捕获并自动执行fail方法,其他异常需自己处理。 // 定义一个拓扑 //Storm框架支持多语言,在Java环境下创建一个拓扑,需要使用TopologyBuilder进行构建 TopologyBuilder builder = new TopologyBuilder(); // 设置1个Executeor(线程),默认一个 builder.setSpout("theNameForSpout",new SpoutOjbect(), 1); //parallelism_hint 执行线程数 //setNumTasks 所有线程运行任务总数,以下配置是2个spout线程各自运行一个任务 //topologyBuilder.setSpout("worldCountSpout", new WorldCountSpout(), 2).setNumTasks(2); // shuffleGrouping:表示是随机分组 // 设置1个Executeor(线程),和两个task builder.setBolt("theNameForBolt", new BoltObject(), 1).shuffleGrouping("theNameForSpout"); //tuple随机分发给下一阶段的bolt ; parallelism_hint 执行线程数 ; setNumTasks 所有线程运行任务总数,以下配置是2个线程各自运行一个Bolt任务// topologyBuilder.setBolt("worldCutBolt", new WorldCutBasicBolt(), 2)// .setNumTasks(2)// .shuffleGrouping("worldCountSpout"); //启动topology的配置信息 Config conf = new Config(); //设置一个应答者 //conf.setNumAckers(1); //设置一个work //conf.setNumWorkers(1); //TOPOLOGY_DEBUG(setDebug),当他被设置成true的话,storm会记录下每个组件所发射的每条消息 //这在本地环境调试topology很有用。但是在线上这么做的话,会影响性能 conf.setDebug(false); try { // 有参数时,表示向集群提交作业,并把第一个参数当做topology名称 // 没有参数时,本地提交 if (args != null && args.length > 0) { logger.info("运行远程模式"); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { // 启动本地模式 logger.info("运行本地模式"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TopologyApp", conf, builder.createTopology()); } } catch (Exception e) { logger.error("storm启动失败!程序退出!",e); System.exit(1); } logger.info("storm启动成功..."); }}
package cui.yao.nan.storm.demo;import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;/** * * @author cuiyaonan2000@163.com * */@Componentpublic class SpoutOjbect extends BaseRichSpout { Mapconf; TopologyContext context; SpoutOutputCollector collector; int num = 0 ; private static final Logger logger = LoggerFactory.getLogger(SpoutOjbect.class); /** * 该方法调用一次,主要由storm框架传入SpoutOutputCollector * @param stormConf * @param context */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub this.conf = conf; this.context = context; this.collector = collector; } /** * nextTuple()方法是Spout实现的核心。 也就是主要执行方法,用于输出信息,通过collector.emit方法发射。 */ @Override public void nextTuple() { // TODO Auto-generated method stub //发送到bolt中 this.collector.emit(new Values(" i am the values")); // logger.info("----------------------nextTuple:" + ++num); } /** * declareOutputFields是在IComponent接口中定义,用于声明数据格式。 即输出的一个Tuple中,包含几个字段。 * 这个也只执行一次 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub Fields fields = new Fields("char");// declarer.declareStream( fields); declarer.declare(fields); logger.info("----------------------declareOutputFields:" ); } /** ** 当一个Tuple处理成功时,会调用这个方法 param obj emit方法中的msgId */// @Override// public void ack(Object obj) {// System.out.println("成功:" + obj);// } /** ** 当Topology停止时,会调用这个方法 */// @Override// public void close() { // } /** ** 当一个Tuple处理失败时,会调用这个方法 */// @Override// public void fail(Object obj) {// // } }
package cui.yao.nan.storm.demo;import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Tuple;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;/** * * @author cuiyaonan2000@163.com * */@Componentpublic class BoltObject extends BaseBasicBolt { Mapconf; TopologyContext context; SpoutOutputCollector collector; private int num = 0 ; private static final Logger logger = LoggerFactory.getLogger(BoltObject.class); /** ** 在Bolt启动前执行,提供Bolt启动环境配置的入口 一般对于不可序列化的对象进行实例化。 */ @Override public void execute(Tuple input, BasicOutputCollector collector) { // TODO Auto-generated method stub String str = (String)input.getValueByField("char"); logger.info("----------------------execute:" + ++num); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { }}
本地模式类似storm集群是一个进程,用来编写和测试topology。在本地模式上运行topology类似在一个集群上运行topology。创建一个本地集群:
import backtype.storm.LocalCluster; LocalCluster cluster = new LocalCluster();
转载地址:http://qbcmf.baihongyu.com/