http://blog.csdn.net/sqh201030412/article/details/78426812
一,Storm集群的安装
1.下载安装包:
2.解压移动到软件安装目录
tar -zxvf xxxx
3.修改配置文件, conf目录下:
vim storm.yaml
storm.zookeeper.servers:
- "hadoop6"
- "hadoop7"
- "hadoop8"
nimbus.host: "hadoop5"
#
nimbus.childopts: "-Xmx1024m"
#
supervisor.childopts: "-Xmx1024m"
#
worker.childopts: "-Xmx1024m"
#
ui.childopts: "-Xmx1024m"
#
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
scp -r storm hadoop@hadoop8:/home/hadoop/app/
......
5.在各个节点启动服务:
- 在nimbus.host所属的机器上启动 nimbus服务
cd /export/servers/storm/bin/
nohup ./storm nimbus &
- 在nimbus.host所属的机器上启动ui服务
cd /export/servers/storm/bin/
nohup ./storm ui &
- 在其它个点击上启动supervisor服务
cd /export/servers/storm/bin/
nohup ./storm supervisor &
6.查看
访问nimbus.host:/8080,即可看到storm的ui界面。
二,Storm得使用Demo
1.官方统计字数例子
bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.7.jar storm.starter.WordCountTopology wordcount
2.运行结果
3.自己书写一个字符统计的Demo
- package com.demo.storm.wordcount;
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.storm.Config;
- import org.apache.storm.StormSubmitter;
- import org.apache.storm.generated.AlreadyAliveException;
- import org.apache.storm.generated.AuthorizationException;
- import org.apache.storm.generated.InvalidTopologyException;
- import org.apache.storm.spout.SpoutOutputCollector;
- import org.apache.storm.task.OutputCollector;
- import org.apache.storm.task.TopologyContext;
- import org.apache.storm.topology.OutputFieldsDeclarer;
- import org.apache.storm.topology.TopologyBuilder;
- import org.apache.storm.topology.base.BaseRichBolt;
- import org.apache.storm.topology.base.BaseRichSpout;
- import org.apache.storm.tuple.Fields;
- import org.apache.storm.tuple.Tuple;
- import org.apache.storm.tuple.Values;
- /**
- * @Description: Storm 单词统计主类
- * @author: songqinghu
- * @date: 2017年11月2日 下午2:18:19
- * Version:1.0
- */
- public class WordCountStart {
- public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
- //建造者
- TopologyBuilder builder = new TopologyBuilder();
- //构建spout模块
- builder.setSpout("mySpout", new MyWordSpout(), Integer.valueOf(4));
- //构建bolt模块 --切割 定义 mySpout为随机发送规则
- builder.setBolt("mySplitBolt", new MySplitWordBolt(), Integer.valueOf(5)).shuffleGrouping("mySpout");
- //构建bolt模块 --统计 定义 mySplitBolt为Hash分发
- builder.setBolt("myCountBolt", new MyCountWordBolt(), Integer.valueOf(5)).fieldsGrouping("mySplitBolt", new Fields("word"));
- Config config = new Config();
- config.setNumWorkers(Integer.valueOf(3));
- //向集群提交任务
- if(args.length==1){
- StormSubmitter.submitTopology(args[0],config, builder.createTopology());
- }else{
- StormSubmitter.submitTopology("mywordcount",config, builder.createTopology());
- }
- }
- }
- /**
- * @Description: 对接受到的数据进行处理 --统计
- * @author: songqinghu
- * @date: 2017年11月2日 下午2:42:33
- * Version:1.0
- */
- class MyCountWordBolt extends BaseRichBolt {
- private Map<String, Integer> wordCount = new HashMap<String,Integer>();
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- }
- @Override
- public void execute(Tuple input) {
- String word = input.getString(0);
- if(wordCount.containsKey(word)){
- wordCount.put(word, wordCount.get(word)+1);
- }else{
- wordCount.put(word, 1);
- }
- System.out.println(wordCount);
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- //不在向下发送
- }
- }
- /**
- * @Description: 对接受到的数据进行处理 --切割 分组发送
- * @author: songqinghu
- * @date: 2017年11月2日 下午2:42:33
- * Version:1.0
- */
- class MySplitWordBolt extends BaseRichBolt {
- private OutputCollector collector;
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector =collector;
- }
- @Override
- public void execute(Tuple input) {
- String words = input.getString(0);
- String[] split = words.split(" ");
- for (String word : split) {
- collector.emit(new Values(word.trim()));
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- }
- /**
- * @Description: 数据源接入类,完成数据源的接入和传递工作
- * @author: songqinghu
- * @date: 2017年11月2日 下午2:41:23
- * Version:1.0
- */
- class MyWordSpout extends BaseRichSpout{
- private SpoutOutputCollector collector;
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- //初始执行
- this.collector = collector;
- }
- @Override
- public void nextTuple() {
- //发送数据
- collector.emit(new Values("My life is in these books Read these and know my heart"));
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- //定义发送的tuple对应字段
- declarer.declare(new Fields("words"));
- }
- }
4.打包运行即可:
bin/storm jar wordcount.jar com.demo.storm.wordcount.WordCountStart mywordcount