http://blog.csdn.net/sqh201030412/article/details/78426812


一,Storm集群的安装

1.下载安装包:


2.解压移动到软件安装目录

tar -zxvf   xxxx

3.修改配置文件, conf目录下:
vim storm.yaml


storm.zookeeper.servers:
      - "hadoop6"
      - "hadoop7"
      - "hadoop8"
nimbus.host: "hadoop5"
nimbus.childopts: "-Xmx1024m"
#
supervisor.childopts: "-Xmx1024m"
#
worker.childopts: "-Xmx1024m"
#
ui.childopts: "-Xmx1024m"
#
supervisor.slots.ports:
      - 6700
      - 6701
      - 6702
      - 6703

4.分发到运行的节点上:

scp -r storm  hadoop@hadoop8:/home/hadoop/app/
......

5.在各个节点启动服务:

  • nimbus.host所属的机器上启动 nimbus服务

cd /export/servers/storm/bin/

nohup ./storm nimbus &

  • nimbus.host所属的机器上启动ui服务

cd /export/servers/storm/bin/

nohup ./storm ui &

  • 在其它个点击上启动supervisor服务

cd /export/servers/storm/bin/

nohup ./storm supervisor &


6.查看


访问nimbus.host:/8080,即可看到stormui界面。


二,Storm得使用Demo

1.官方统计字数例子

bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.7.jar storm.starter.WordCountTopology wordcount

2.运行结果


3.自己书写一个字符统计的Demo

[java]  view plain  copy
  1. package com.demo.storm.wordcount;  
  2.   
  3. import java.util.HashMap;  
  4. import java.util.Map;  
  5.   
  6. import org.apache.storm.Config;  
  7. import org.apache.storm.StormSubmitter;  
  8. import org.apache.storm.generated.AlreadyAliveException;  
  9. import org.apache.storm.generated.AuthorizationException;  
  10. import org.apache.storm.generated.InvalidTopologyException;  
  11. import org.apache.storm.spout.SpoutOutputCollector;  
  12. import org.apache.storm.task.OutputCollector;  
  13. import org.apache.storm.task.TopologyContext;  
  14. import org.apache.storm.topology.OutputFieldsDeclarer;  
  15. import org.apache.storm.topology.TopologyBuilder;  
  16. import org.apache.storm.topology.base.BaseRichBolt;  
  17. import org.apache.storm.topology.base.BaseRichSpout;  
  18. import org.apache.storm.tuple.Fields;  
  19. import org.apache.storm.tuple.Tuple;  
  20. import org.apache.storm.tuple.Values;  
  21.   
  22. /** 
  23.  * @Description: Storm 单词统计主类 
  24.  * @author: songqinghu 
  25.  * @date: 2017年11月2日 下午2:18:19 
  26.  * Version:1.0 
  27.  */  
  28. public class WordCountStart {  
  29.   
  30.   
  31.     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {  
  32.         //建造者  
  33.         TopologyBuilder builder = new TopologyBuilder();  
  34.         //构建spout模块  
  35.         builder.setSpout("mySpout"new MyWordSpout(), Integer.valueOf(4));  
  36.         //构建bolt模块 --切割  定义 mySpout为随机发送规则  
  37.         builder.setBolt("mySplitBolt"new MySplitWordBolt(), Integer.valueOf(5)).shuffleGrouping("mySpout");  
  38.         //构建bolt模块 --统计  定义 mySplitBolt为Hash分发  
  39.         builder.setBolt("myCountBolt"new MyCountWordBolt(), Integer.valueOf(5)).fieldsGrouping("mySplitBolt"new Fields("word"));  
  40.   
  41.         Config config = new Config();  
  42.   
  43.         config.setNumWorkers(Integer.valueOf(3));  
  44.         //向集群提交任务  
  45.         if(args.length==1){  
  46.             StormSubmitter.submitTopology(args[0],config, builder.createTopology());  
  47.         }else{  
  48.             StormSubmitter.submitTopology("mywordcount",config, builder.createTopology());  
  49.         }  
  50.     }  
  51. }  
  52. /** 
  53.  * @Description: 对接受到的数据进行处理 --统计 
  54.  * @author: songqinghu 
  55.  * @date: 2017年11月2日 下午2:42:33 
  56.  * Version:1.0 
  57.  */  
  58. class MyCountWordBolt extends BaseRichBolt {  
  59.   
  60.   
  61.     private Map<String, Integer> wordCount = new HashMap<String,Integer>();  
  62.   
  63.     @Override  
  64.     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {  
  65.   
  66.     }  
  67.   
  68.     @Override  
  69.     public void execute(Tuple input) {  
  70.         String word = input.getString(0);  
  71.   
  72.         if(wordCount.containsKey(word)){  
  73.             wordCount.put(word, wordCount.get(word)+1);  
  74.         }else{  
  75.             wordCount.put(word, 1);  
  76.         }  
  77.   
  78.         System.out.println(wordCount);  
  79.     }  
  80.   
  81.     @Override  
  82.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  83.         //不在向下发送  
  84.     }  
  85.   
  86. }  
  87.   
  88.   
  89. /** 
  90.  * @Description: 对接受到的数据进行处理 --切割  分组发送 
  91.  * @author: songqinghu 
  92.  * @date: 2017年11月2日 下午2:42:33 
  93.  * Version:1.0 
  94.  */  
  95. class MySplitWordBolt extends BaseRichBolt {  
  96.   
  97.     private OutputCollector collector;  
  98.   
  99.     @Override  
  100.     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {  
  101.         this.collector =collector;  
  102.     }  
  103.   
  104.     @Override  
  105.     public void execute(Tuple input) {  
  106.         String words = input.getString(0);  
  107.         String[] split = words.split(" ");  
  108.         for (String word : split) {  
  109.             collector.emit(new Values(word.trim()));  
  110.         }  
  111.     }  
  112.   
  113.     @Override  
  114.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  115.         declarer.declare(new Fields("word"));  
  116.     }  
  117.   
  118. }  
  119.   
  120.   
  121.   
  122. /** 
  123.  * @Description: 数据源接入类,完成数据源的接入和传递工作 
  124.  * @author: songqinghu 
  125.  * @date: 2017年11月2日 下午2:41:23 
  126.  * Version:1.0 
  127.  */  
  128. class MyWordSpout extends BaseRichSpout{  
  129.   
  130.     private SpoutOutputCollector collector;  
  131.   
  132.   
  133.     @Override  
  134.     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {  
  135.         //初始执行   
  136.         this.collector = collector;  
  137.     }  
  138.   
  139.     @Override  
  140.     public void nextTuple() {  
  141.         //发送数据  
  142.         collector.emit(new Values("My life is in these books Read these and know my heart"));  
  143.     }  
  144.   
  145.   
  146.     @Override  
  147.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  148.         //定义发送的tuple对应字段  
  149.         declarer.declare(new Fields("words"));  
  150.     }  
  151.   
  152. }  

4.打包运行即可:
bin/storm jar wordcount.jar  com.demo.storm.wordcount.WordCountStart mywordcount