一个易于使用,功能强大且可靠的系统来处理和分发数据。
一、特征
Apache NiFi支持强大且可扩展的数据路由,转换和系统中介逻辑的有向图。Apache NiFi的一些高级功能和目标包括:
- 基于Web的用户界面
- 设计,控制,反馈和监控之间的无缝体验
- 高度可配置
- 容忍损失与保证交付
- 低延迟vs高吞吐量
- 动态优先化
- 流量可以在运行时修改
- 背压
- 数据来源
- 跟踪数据流从头到尾
- 为扩展而设计
- 建立你自己的处理器等等
- 实现快速开发和有效测试
- 安全
- SSL,SSH,HTTPS,加密内容等…
- 多租户授权和内部授权/策略管理
二、安装
官网下载: http://nifi.apache.org/download.html
版本:1.6.0
wget http://mirrors.hust.edu.cn/apache/nifi/1.6.0/nifi-1.6.0-bin.tar.gz
解压:
tar -zxvf nifi-1.6.0-bin.tar.gz
简单配置:
vi nifi-1.6.0/conf/nifi.properties
133 nifi.web.war.directory=./lib
134 nifi.web.http.host=192.168.66.193 #本机ip
135 nifi.web.http.port=9191 #端口号
136 nifi.web.http.network.interface.default=
137 nifi.web.https.host=
138 nifi.web.https.port=
139 nifi.web.https.network.interface.default=
140 nifi.web.jetty.working.directory=./work/jetty
141 nifi.web.jetty.threads=200
142 nifi.web.max.header.size=16 KB
143 nifi.web.proxy.context.path=
144 nifi.web.proxy.host=
nifi启动:
前台运行:bin/nifi.sh run
后台运行:bin/nifi.sh start
重新启动:bin/nifi,sh restart
三、NiFi实例
1.读取某个文件夹下的所有文件,并移动到到另一个文件夹下。
如:
将/root/shirukai/nifi_test/1目录下所有的文件,读取,然后存到/root/shirukai/nifi_test/2下
a.添加一个GetFile处理器,用于读取文件
拖动处理器到画布,这时候会出现弹窗,需要我们选择合适的处理器,这时候我们在搜索框搜索 【GetFile】
然后点击【ADD】将处理器添加到画布,如下图:
配置GetFile处理器:选中处理器之后,右击鼠标,会弹出选项框,我们选择【Configure】进行处理器配置
在弹出的配置框中我们选择【PROPERTIES】tab页来进行相关配置,在这里我们仅仅设置【Input Directory】输入的文件目录为:/root/shirukai/nifi_test/1即可,其他的过滤文件的正则等配置暂时用默认的。
然后点击【APPLY】应用即可。
b 创建PutFile处理器
创建PutFile的过程跟上面一样,先将处理器拖到画布中,然后搜索【PutFile】处理添加到画布。
配置PutFile处理器,在【PROPERTIES】的【Directory】的value值填为我们要将文件保存到的路径,然后点击【APPLY】
c 建立连接,将GetFile与PutFile两个处理器连接
建立连接完成后,我们会发现GetFile处理器前面出现一个停止的状态
而PutFile处理前面为警告状态,将鼠标放到警告图标上,会出现警告信息,提示失败和成功关系无效,因为没有连接任何组件并且不会自动终止
这时候我们在PutFile的配置里选择【SETTINGS】将自动终止关系的【failure】和【success】设置为勾选状态。
d启动实例
在画布空白处右击会出现选项框,选择【start】运行实例
运行状态如下图所示:
这一个过程是一个实时的,只要/root/shirukai/nifi_test/1下有文件nifi就会移动到/root/shirukai/nifi_test/2目录下。
2.执行python脚本编写的wordCount实例
a 创建GenerateFlowFile处理器
将处理器图标拖到画布,并选择【GenerateFlowFile】处理器,用于生成流数据。
修改GenerateFlowFile处理器,添加自定义文本,用来做单词计数的源文件。
在处理器配置里【PROPERTIES】里的【Custom Text】的value栏,将以下文本填入其中:
Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. Some of the high-level capabilities and objectives of Apache NiFi include:
b创建ExecuteScript处理器,用于执行脚本
如何编写脚本?可参考博客:https://blog.csdn.net/quguang65265/article/details/77916855
配置ExecuteScript处理器,在【PROPERTIES】里的【Script Engine】选择执行的脚本语言,并将事先写好的脚本加到【Script Body】中。python脚本代码如下:
import re
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class WordCount(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
words = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
words_array = re.split(r'(?:,|;|\s)\s*', words)
word_count = {}
for word in words_array:
if word in word_count:
tmp_count = word_count[word]
word_count[word] = tmp_count + 1
else:
word_count[word] = 1
word_str = ''
for key, val in word_count.items():
word_str += key + ":" + str(val) + "\n"
outputStream.write(bytearray(word_str.encode('utf-8')))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, WordCount())
flowFile = session.putAttribute(flowFile, "filename", "wordCount")
session.transfer(flowFile, REL_SUCCESS)
session.commit()
c 添加PutFile处理器,用来将处理后的结果一文件的形式保存到指定目录下的文件里
配置Putfile处理器,添加要输出的文件目录
d建立关联
执行:
结果:
当然也可以从文件中读取数据,然后进行单词统计。只需添加一个FetchFile处理器,选择要获取数据的文件即可。