博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm在Java中的应用
阅读量:2068 次
发布时间:2019-04-29

本文共 7018 字,大约阅读时间需要 23 分钟。

序言

      Storm 说的是 语言无关性: Storm的topology和消息处理组件(Bolt)可以用任何语言来定义, 这一点使得任何人都可以使用storm.

这里将说明下StormI在Java中的使用,java程序是基于Springboot.这最重要的是storm如何拆解任务.其它的都是流程化的东西.(cuiyaonan2000@163.com)

参考资料:

 

整合思路

      Storm主要的三个Component:Topology、Spout、Bolt。Topology作为主进程控制着spout、bolt线程的运行,他们相当于独立运行的容器分布于storm集群中的各个机器节点。

       根据如上的描述我们要在代码中创建 3个东西:

  • Topology : 主进程,控制Spout和Bolt
  • Spout :  用于从Kafka,Mysql,Redis,Oracle 等等地方获取要处理的数据,并发送给Bolt
  • Bolt :  接收Spout发送来的Tuple并进行处理------------这里可以得出一个很重要的结论,就是Storm不负责插接任务,需要你自己拆解任务.并封装到Tuple中cuiyaonan2000@163.com

 

Storm程序就是创建如上的内容

    还有就是要连接数据库,连接kafka,连接Redis等等的东西,这里就需要跟我们传统的框架进行整合了.

在SpringBoot程序中如何提交storm的Topolgy?

storm是通过提交Topolgy来确定如何启动的,一般使用过运行main方法来启动,但是SpringBoot启动方式一般也是通过main方法启动的。所以应该怎么样解决呢?

  • 解决思路:将storm的Topology写在SpringBoot启动的主类中,随着SpringBoot启动而启动。
  • 实验结果:可以一起启动(按理来说也是可以的)。但是随之而来的是下一个问题,bolt和spout类无法使用spring注解。

2 如何让bolt和spout类使用spring注解?

  • 解决思路:在了解到spout和bolt类是由nimbus端实例化,然后通过序列化传输到supervisor,再反向序列化,因此无法使用注解,所以这里可以换个思路,既然不能使用注解,那么就动态获取Spring的bean就好了。
  • 实验结果:使用动态获取bean的方法之后,可以成功启动storm了。

 

 

代码贴图

Topology

package cui.yao.nan.storm.demo;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.topology.TopologyBuilder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;/** *  * @author cuiyaonan2000@163.com * */@Componentpublic class CreateTopology {		private static final Logger logger = LoggerFactory.getLogger(CreateTopology.class);		public void createTopology(String[] args) {//		编写bolt,storm提供了两种bolt,BasicBolt和RichBolt,//		RichBolt在执行execute后要手动提交ack或者fail,//		BasicBolt在execute执行后会自动提交ack,但是只对FailedException异常捕获并自动执行fail方法,其他异常需自己处理。				// 定义一个拓扑		//Storm框架支持多语言,在Java环境下创建一个拓扑,需要使用TopologyBuilder进行构建		TopologyBuilder builder = new TopologyBuilder();				// 设置1个Executeor(线程),默认一个		builder.setSpout("theNameForSpout",new SpoutOjbect(), 1);		        //parallelism_hint 执行线程数 		//setNumTasks 所有线程运行任务总数,以下配置是2个spout线程各自运行一个任务        //topologyBuilder.setSpout("worldCountSpout", new WorldCountSpout(), 2).setNumTasks(2);                        				// shuffleGrouping:表示是随机分组		// 设置1个Executeor(线程),和两个task		builder.setBolt("theNameForBolt", new BoltObject(), 1).shuffleGrouping("theNameForSpout");		        //tuple随机分发给下一阶段的bolt ; parallelism_hint 执行线程数  ;  setNumTasks 所有线程运行任务总数,以下配置是2个线程各自运行一个Bolt任务//        topologyBuilder.setBolt("worldCutBolt", new WorldCutBasicBolt(), 2)//        .setNumTasks(2)//        .shuffleGrouping("worldCountSpout");		        		//启动topology的配置信息		Config conf = new Config();				//设置一个应答者		//conf.setNumAckers(1);				//设置一个work		//conf.setNumWorkers(1);				//TOPOLOGY_DEBUG(setDebug),当他被设置成true的话,storm会记录下每个组件所发射的每条消息        //这在本地环境调试topology很有用。但是在线上这么做的话,会影响性能        conf.setDebug(false);				try {			// 有参数时,表示向集群提交作业,并把第一个参数当做topology名称			// 没有参数时,本地提交			if (args != null && args.length > 0) { 								logger.info("运行远程模式");				StormSubmitter.submitTopology(args[0], conf, builder.createTopology());							} else {								// 启动本地模式				logger.info("运行本地模式");				LocalCluster cluster = new LocalCluster();				cluster.submitTopology("TopologyApp", conf, builder.createTopology());							}		} catch (Exception e) {						logger.error("storm启动失败!程序退出!",e);			System.exit(1);					}				logger.info("storm启动成功...");	}}

Spout

package cui.yao.nan.storm.demo;import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;/** *  * @author cuiyaonan2000@163.com * */@Componentpublic class SpoutOjbect extends BaseRichSpout {		Map
conf; TopologyContext context; SpoutOutputCollector collector; int num = 0 ; private static final Logger logger = LoggerFactory.getLogger(SpoutOjbect.class); /** * 该方法调用一次,主要由storm框架传入SpoutOutputCollector * @param stormConf * @param context */ @Override public void open(Map
conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub this.conf = conf; this.context = context; this.collector = collector; } /** * nextTuple()方法是Spout实现的核心。 也就是主要执行方法,用于输出信息,通过collector.emit方法发射。 */ @Override public void nextTuple() { // TODO Auto-generated method stub //发送到bolt中 this.collector.emit(new Values(" i am the values")); // logger.info("----------------------nextTuple:" + ++num); } /** * declareOutputFields是在IComponent接口中定义,用于声明数据格式。 即输出的一个Tuple中,包含几个字段。 * 这个也只执行一次 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub Fields fields = new Fields("char");// declarer.declareStream( fields); declarer.declare(fields); logger.info("----------------------declareOutputFields:" ); } /** ** 当一个Tuple处理成功时,会调用这个方法 param obj emit方法中的msgId */// @Override// public void ack(Object obj) {// System.out.println("成功:" + obj);// } /** ** 当Topology停止时,会调用这个方法 */// @Override// public void close() { // } /** ** 当一个Tuple处理失败时,会调用这个方法 */// @Override// public void fail(Object obj) {// // } }

Bolt

package cui.yao.nan.storm.demo;import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Tuple;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;/** *  * @author cuiyaonan2000@163.com * */@Componentpublic class BoltObject extends BaseBasicBolt {		Map
conf; TopologyContext context; SpoutOutputCollector collector; private int num = 0 ; private static final Logger logger = LoggerFactory.getLogger(BoltObject.class); /** ** 在Bolt启动前执行,提供Bolt启动环境配置的入口 一般对于不可序列化的对象进行实例化。 */ @Override public void execute(Tuple input, BasicOutputCollector collector) { // TODO Auto-generated method stub String str = (String)input.getValueByField("char"); logger.info("----------------------execute:" + ++num); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { }}

 

运行模式

  • 本地模式(Local Mode): 即Topology运行在本地机器的单一JVM上,这个模式主要用来开发、调试。
  • 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式。

 

本地模式

    本地模式类似storm集群是一个进程,用来编写和测试topology。在本地模式上运行topology类似在一个集群上运行topology。创建一个本地集群:

    import backtype.storm.LocalCluster;
    LocalCluster cluster = new LocalCluster();

  1. 提交集群使用submitTopology,
  2. 杀死集群使用killTopology
  3. 关闭一个本地集群使用cluster.shutdown();

远程模式

  • 提交topology :   为jar包指定参数: storm jar storm.jar 启动类 arg1 arg2 arg3
  • 杀死topology :   storm kill stormname

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

转载地址:http://qbcmf.baihongyu.com/

你可能感兴趣的文章
【LEETCODE】236-Lowest Common Ancestor of a Binary Tree
查看>>
【TED】处乱不惊-Daniel Levitin
查看>>
【LEETCODE】105-Construct Binary Tree from Preorder and Inorder Traversal
查看>>
【TED】只需专注10分钟-Andy Puddicombe
查看>>
【MachineLearning】数据挖掘中的分类和聚类的区别
查看>>
【LEETCODE】292-Nim Game
查看>>
【LEETCODE】237-Delete Node in a Linked List
查看>>
【LEETCODE】206-Reverse Linked List
查看>>
【LEETCODE】203-Remove Linked List Elements
查看>>
【LEETCODE】234-Palindrome Linked List
查看>>
【LEETCODE】141-Linked List Cycle
查看>>
【LEETCODE】142-Linked List Cycle II
查看>>
【LEETCODE】92-Reverse Linked List II
查看>>
【LEETCODE】283-Move Zeroes
查看>>
【LEETCODE】217-Contains Duplicate
查看>>
【LEETCODE】219-Contains Duplicate II
查看>>
【LEETCODE】220-Contains Duplicate III
查看>>
【LEETCODE】171-Excel Sheet Column Number
查看>>
【LEETCODE】169-Majority Element
查看>>
【LEETCODE】191-Number of 1 Bits
查看>>