📜  Apache Storm-工作示例

📅  最后修改于: 2020-12-02 05:57:19             🧑  作者: Mango


我们已经了解了Apache Storm的核心技术细节,现在是时候编写一些简单的场景了。

方案–移动呼叫日志分析器

移动呼叫及其持续时间将作为Apache Storm的输入,Storm将处理和分组同一呼叫者和接收者之间的呼叫及其总呼叫数。

壶嘴创作

Spout是用于数据生成的组件。基本上,喷口将实现IRichSpout接口。 “ IRichSpout”界面具有以下重要方法-

  • 开放式-为壶嘴提供执行环境。执行者将运行此方法来初始化喷口。

  • nextTuple-通过收集器发出生成的数据。

  • 关闭-当口会关机时调用此方法。

  • clarifyOutputFields-声明元组的输出模式。

  • ack-确认已处理特定的元组

  • fail-指定特定元组不被处理并且不被重新处理。

打开

打开方法的签名如下-

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf-为该喷嘴提供风暴配置。

  • context-提供有关拓扑中喷口位置,其任务ID,输入和输出信息的完整信息。

  • collector-使我们能够发出将由螺栓处理的元组。

nextTuple

nextTuple方法的签名如下-

nextTuple()

从与ack()和fail()方法相同的循环中定期调用nextTuple()。当没有工作要做时,它必须释放线程的控制权,以便其他方法有机会被调用。因此,nextTuple的第一行将检查处理是否完成。如果是这样,它应该休眠至少一毫秒以减少返回之前的处理器负载。

close方法的签名如下-

close()

声明输出字段

clarifyOutputFields方法的签名如下-

declareOutputFields(OutputFieldsDeclarer declarer)

声明者-用于声明输出流ID,输出字段等。

此方法用于指定元组的输出模式。

确认

ack方法的签名如下-

ack(Object msgId)

此方法确认已处理特定的元组。

失败

nextTuple方法的签名如下-

ack(Object msgId)

此方法通知特定的元组尚未完全处理。 Storm将重新处理特定的元组。

FakeCallLogReaderSpout

在我们的方案中,我们需要收集呼叫日志详细信息。呼叫日志的信息包含。

  • 来电号码
  • 接收方号码
  • 持续时间

由于我们没有通话记录的实时信息,因此我们将生成伪造的通话记录。伪造的信息将使用Random类创建。完整的程序代码如下。

编码-FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
    
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
    
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
    
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List mobileNumbers = new ArrayList();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
                
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
                
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map getComponentConfiguration() {
      return null;
   }
}

螺栓创建

Bolt是一个将元组作为输入,处理该元组并生成新的元组作为输出的组件。螺栓将实现IRichBolt接口。在该程序中,两个螺栓类CallLogCreatorBoltCallLogCounterBolt用于执行操作。

IRichBolt接口具有以下方法-

  • 准备-为螺栓提供执行环境。执行者将运行此方法来初始化喷口。

  • execute-处理单个元组输入。

  • cleanup-螺栓即将关闭时调用。

  • clarifyOutputFields-声明元组的输出模式。

准备

准备方法的签名如下-

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf-为该螺栓提供Storm配置。

  • 上下文-提供有关拓扑中螺栓位置,其任务ID,输入和输出信息等的完整信息。

  • collector-使我们能够发出已处理的元组。

执行

execute方法的签名如下-

execute(Tuple tuple)

这里的元组是要处理的输入元组。

execute方法一次处理一个元组。元组数据可以通过Tuple类的getValue方法访问。不必立即处理输入元组。可以处理多个元组并将其作为单个输出元组输出。可以使用OutputCollector类来发出已处理的元组。

清理

清除方法的签名如下-

cleanup()

声明输出字段

clarifyOutputFields方法的签名如下-

declareOutputFields(OutputFieldsDeclarer declarer)

在此,参数声明器用于声明输出流ID,输出字段等。

此方法用于指定元组的输出模式

通话记录创建者螺栓

呼叫日志创建者螺栓接收呼叫日志元组。呼叫日志元组具有呼叫者号码,接收者号码和呼叫持续时间。该螺栓简单地通过组合呼叫者号码和接收者号码来创建新值。新值的格式为“主叫方号码-接收方号码”,并将其命名为新字段“ call”。完整的代码如下。

编码-CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
    
   @Override
   public Map getComponentConfiguration() {
      return null;
   }
}

通话记录计数器螺栓

呼叫日志计数器螺栓以元组形式接收呼叫及其持续时间。此螺栓在prepare方法中初始化字典(地图)对象。在execute方法中,它检查元组,并为元组中的每个新“调用”值在字典对象中创建一个新条目,并在字典对象中将值设置为1。对于字典中已经可用的条目,它只是增加其值。简而言之,此螺栓将调用及其计数保存在字典对象中。除了将调用及其计数保存在字典中之外,我们还可以将其保存到数据源中。完整的程序代码如下-

编码-CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
        
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
        
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
    
   @Override
   public Map getComponentConfiguration() {
      return null;
   }
    
}

创建拓扑

Storm拓扑基本上是Thrift结构。 TopologyBuilder类提供了创建复杂拓扑的简单方法。 TopologyBuilder类具有设置喷口(setSpout)和设置螺栓(setBolt)的方法。最后,TopologyBuilder具有createTopology来创建拓扑。使用以下代码片段创建拓扑-

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGroupingfieldsGrouping方法有助于为喷口和螺栓设置流分组。

本地集群

出于开发目的,我们可以使用“ LocalCluster”对象创建本地集群,然后使用“ LocalCluster”类的“ submitTopology”方法提交拓扑。 “ submitTopology”的参数之一是“ Config”类的实例。 “ Config”类用于在提交拓扑之前设置配置选项。该配置选项将在运行时与集群配置合并,并通过prepare方法发送到所有任务(喷嘴和螺栓)。将拓扑提交到集群后,我们将等待10秒钟,等待集群计算提交的拓扑,然后使用“ LocalCluster”的“ shutdown”方法关闭集群。完整的程序代码如下-

编码-LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
        
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
            
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
        
      //Stop the topology
        
      cluster.shutdown();
   }
}

构建和运行应用程序

完整的应用程序具有四个Java代码。他们是-

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

可以使用以下命令构建应用程序-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

可以使用以下命令运行该应用程序-

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

输出

一旦启动应用程序,它将输出有关群集启动过程,喷口和螺栓处理以及最后群集关闭过程的完整详细信息。在“ CallLogCounterBolt”中,我们已打印呼叫及其计数详细信息。此信息将在控制台上显示,如下所示:

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

非JVM语言

Storm拓扑由Thrift接口实现,可以轻松以任何语言提交拓扑。 Storm支持Ruby, Python和许多其他语言。让我们看一下Python绑定。

Python绑定

Python是一种通用的解释型,交互式,面向对象的高级编程语言。 Storm支持Python来实现其拓扑。 Python支持发射,锚定,确认和记录操作。

如您所知,可以用任何语言定义螺栓。用另一种语言编写的螺栓作为子流程执行,Storm通过stdin / stdout上的JSON消息与这些子流程进行通信。首先,获取支持Python绑定的示例螺栓WordCount。

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
    
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

在这里, WordCount类实现了IRichBolt接口,并使用指定的超级方法参数“ splitword.py”的Python实现运行。现在创建一个名为“ splitword.py”的Python实现。

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

这是Python的示例实现,它对给定句子中的单词进行计数。同样,您也可以绑定其他支持语言。