📜  Apache NiFi-自定义处理器(1)

📅  最后修改于: 2023-12-03 15:29:25.797000             🧑  作者: Mango

Apache NiFi-自定义处理器

Apache NiFi 是一个强大而易于使用的数据流处理工具,可以通过数据流管理、转换、集成和系统间通信来支持企业级数据处理流程。Apache NiFi 提供了许多内置的处理器。但是,有时候我们需要针对自己的特定需求定制化处理器。 在 Apache NiFi 中,可以通过自定义处理器来实现这样的要求。在本文中,我们将会介绍如何在 Apache NiFi 中创建和部署自定义处理器。

开始之前

在开始创建自定义处理器之前,需要先安装和配置 Apache NiFi 环境,该过程可通过阅读官方文档了解。

编写代码

创建自定义处理器的第一步是编写代码。所以,我们需要使用 Java 编写一个继承于 org.apache.nifi.processor.AbstractProcessor 的类,并在其中实现 process 方法来执行自定义逻辑。该方法会在处理器接收到每个流文件时被调用。

下面是一个简单的示例代码:

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;

@Tags({"example", "custom"})
@CapabilityDescription("Example custom processor")
public class CustomProcessor extends AbstractProcessor {

    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        
        if (flowFile == null) {
            return;
        }

        try {
            InputStream inputStream = session.read(flowFile);
            OutputStream outputStream = session.write(flowFile);

            byte[] buffer = new byte[4096];
            int bytesRead;
            while ((bytesRead = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, bytesRead);
            }

            session.transfer(flowFile, REL_SUCCESS);
            session.commit();

        } catch (IOException e) {
            session.rollback();
            getLogger().error("Failed to process incoming flow file due to {}", e);
        }
    }
}
打包处理器

编写处理器代码后,我们需要将其打包为 JAR 文件。 将编写好的代码打包成 JAR,可以使用 Maven 进行构建,也可以使用 IDE中的打包功能。

在Maven项目中,在 pom.xml 文件中添加以下内容:

<build>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.7.0</version>
      <configuration>
        <source>1.8</source>
        <target>1.8</target>
      </configuration>
    </plugin>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-jar-plugin</artifactId>
      <version>3.2.0</version>
      <configuration>
        <archive>
          <manifest>
            <addClasspath>true</addClasspath>
            <mainClass>com.example.CustomProcessor</mainClass>
          </manifest>
        </archive>
      </configuration>
    </plugin>
  </plugins>
</build>

将以上代码添加到 pom.xml 文件中后,可以通过在 Maven 命令行中输入 mvn package 来生成 JAR 文件。该文件将会被保存在 target 目录下。

部署处理器

在打包处理器代码的步骤中,生成的文件会以 JAR 包的形式存在。接下来,我们需要将该 JAR 文件部署到 Apache NiFi 中。也就是说,我们需要将生成的 JAR 文件放到 NiFi 的类路径中。

将文件直接拷贝到目录中
  1. 打开 Apache NiFi 的安装目录。
  2. 找到 nifi/lib 目录。
  3. 将生成的 JAR 文件拷贝到 nifi/lib 目录下。
使用 NiFi UI 部署
  1. 打开 NiFi UI。
  2. 选择 Global menu 中的 Controller Settings。
  3. 在 Controller Settings 中,找到 “Additional Details” 部分。
  4. 在 “Additional Details” 中,找到 “Flow Configuration” 选项卡。
  5. 在选项卡中,可以添加要包含的处理器 JAR 文件。

image

使用自定义处理器

处理器成功部署后,我们可以在 Apache NiFi 中使用它。打开 NiFi UI,然后将创建的自定义处理器拖入到数据流中即可使用。

结论

本篇文章演示了如何编写、打包和部署自定义处理器,并在Apache NiFi 中进行使用。自定义处理器可以极大地增强 Apache NiFI 中的功能,并使它能够更好地与自己的数据处理流程集成。