📜  Apache Storm-工作示例(1)

📅  最后修改于: 2023-12-03 15:13:26.511000             🧑  作者: Mango

Apache Storm 工作示例

Apache Storm 是一款开源的分布式实时计算系统,它可以处理海量数据,支持高实时性、高可靠性的数据处理。下面,我们将通过一个简单的示例来了解 Apache Storm 的使用。

示例需求

我们有一个数据平台,收集并存储用户的访问日志,现在需要通过分析日志,实时计算出每个用户访问平台的次数,并将结果输出到控制台。

环境设置

首先,需要安装 Apache Storm,可以参考官方文档进行安装:

https://storm.apache.org/releases/2.2.0/index.html

在本示例中,我们假定 Storm 已经安装完成,并且已经正确配置了环境变量。

编写代码
Spout

我们需要实现一个 Spout 来读取访问日志,代码如下:

public class LogSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private BufferedReader reader;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        try {
            FileReader fileReader = new FileReader("access.log");
            reader = new BufferedReader(fileReader);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        try {
            String line = reader.readLine();
            if (line != null) {
                collector.emit(new Values(line));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

这里的 LogSpout 继承了 BaseRichSpout 类,并实现了 opennextTupledeclareOutputFields 三个方法。其中,open 方法在 Spout 初始化时调用,用于创建文件读取器;nextTuple 方法会不断读取文件中的一行数据,并以 Values 对象的形式发送到下一个 Bolt 中;declareOutputFields 方法用于声明输出字段名。

Bolt

我们需要实现一个 Bolt 来统计每个用户的访问次数并输出结果,代码如下:

public class CountBolt extends BaseRichBolt {
    private Map<String, Integer> counts;
    private OutputCollector collector;
  
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        counts = new HashMap<>();
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String line = tuple.getStringByField("line");
        String[] fields = line.split(" ");
        String user = fields[0];
        counts.put(user, counts.getOrDefault(user, 0) + 1);
        collector.emit(new Values(user, counts.get(user)));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("user", "count"));
    }
}

这里的 CountBolt 继承了 BaseRichBolt 类,并实现了 prepareexecutedeclareOutputFields 三个方法。其中,prepare 方法在 Bolt 初始化时调用,用于创建计数器;execute 方法会从 Spout 中接收到一行数据,统计访问次数,并以 Values 对象的形式发送到下一个 Bolt 中;declareOutputFields 方法用于声明输出字段名。

Topology

我们需要将 Spout 和 Bolt 连接起来,形成一个完整的数据处理流程,代码如下:

public class UserVisitTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("log-spout", new LogSpout());
        builder.setBolt("count-bolt", new CountBolt())
            .shuffleGrouping("log-spout");
        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("user-visit", config, builder.createTopology());
        Thread.sleep(60000);
        cluster.shutdown();
    }
}

这里的 UserVisitTopology 是我们的 Topology 类,它将 LogSpoutCountBolt 连接起来,构成了一个简单的数据处理流程。在 main 方法中,我们创建了一个 TopologyBuilder 对象,并使用 builder.setSpoutbuilder.setBolt 方法来添加 Spout 和 Bolt;然后,我们创建了一个 LocalCluster 对象,并将 Topology 提交到本地集群中执行;最后,为了避免程序过早退出,我们使用 Thread.sleep 方法暂停了 60 秒钟,然后关闭了集群。

运行程序

我们在项目根目录下创建一个名为 access.log 的文件,并写入以下数据:

user1 /api/foo 2019-01-01T00:00:00
user2 /api/bar 2019-01-01T00:01:00
user1 /api/baz 2019-01-01T00:02:00
user3 /api/qux 2019-01-01T00:03:00
user1 /api/quux 2019-01-01T00:04:00
user2 /api/corge 2019-01-01T00:05:00

然后,我们运行 UserVisitTopology 类,等待程序输出结果:

[user1, 1]
[user2, 1]
[user1, 2]
[user3, 1]
[user1, 3]
[user2, 2]

可以看到,程序输出了每个用户的访问次数。

总结

本示例介绍了如何使用 Apache Storm 构建一个简单的实时数据处理系统。通过这个示例,我们可以学习到 Spout、Bolt 和 Topology 的基本概念,以及如何在本地集群中运行程序。实际运用中,Apache Storm 可以处理更加复杂的数据处理任务,能够帮助我们应对海量数据的处理问题。