在Apache Nifi中,您可以使用EvaluateJsonPath处理器来从JSON数据中提取特定字段的值。如果您需要处理多个输入,您可以使用MergeContent处理器将它们合并到一个流中,然后将该流作为EvaluateJsonPath的输入。
以下是一个使用多个输入的EvaluateJsonPath的示例流程:
在流程中添加一个MergeContent处理器,将多个输入流合并为一个流。配置MergeContent处理器的属性如下:
在MergeContent处理器后添加一个EvaluateJsonPath处理器。配置EvaluateJsonPath处理器的属性如下:
在EvaluateJsonPath处理器后添加一个LogAttribute处理器,用于将提取的字段值输出到日志中。
以下是一个示例的Groovy脚本,可以使用Apache Nifi的REST API将上述流程导入到Nifi中:
import groovyx.net.http.RESTClient
def baseUrl = 'http://localhost:8080/nifi-api'
def clientId = 'your-client-id'
def processGroupId = 'your-process-group-id'
def restClient = new RESTClient(baseUrl)
// 创建一个新的流程
def createProcessResponse = restClient.post(
path: '/process-groups/' + processGroupId + '/process-groups',
headers: [
'Content-Type': 'application/json',
'Accept': 'application/json'
],
body: [
'revision': [
'clientId': clientId,
'version': 0
],
'component': [
'name': 'Multi-input EvaluateJsonPath',
'position': [
'x': 0,
'y': 0
]
]
]
)
def processGroup = createProcessResponse.data
def processGroupId2 = processGroup.id
// 添加MergeContent处理器
def mergeContentProcessor = [
'revision': [
'clientId': clientId,
'version': 0
],
'component': [
'name': 'MergeContent',
'type': 'org.apache.nifi.processors.standard.MergeContent',
'position': [
'x': 200,
'y': 200
],
'bundle': [
'group': 'org.apache.nifi',
'artifact': 'nifi-standard-nar',
'version': 'x.y.z'
],
'config': {
'mergeStrategy': 'Defragment',
'minimumNumberOfEntries': '2'
}
]
]
def addMergeContentResponse = restClient.post(
path: '/process-groups/' + processGroupId2 + '/processors',
headers: [
'Content-Type': 'application/json',
'Accept': 'application/json'
],
body: mergeContentProcessor
)
def mergeContentProcessorId = addMergeContentResponse.data.id
// 添加EvaluateJsonPath处理器
def evaluateJsonPathProcessor = [
'revision': [
'clientId': clientId,
'version': 0
],
'component': [
'name': 'EvaluateJsonPath',
'type': 'org.apache.nifi.processors.standard.EvaluateJsonPath',
'position': [
'x': 400,
'y': 200
],
'bundle': [
'group': 'org.apache.nifi',
'artifact': 'nifi-standard-nar',
'version': 'x.y.z'
],
'config': {
'jsonPathExpression': '$.field',
'destination': 'field'
}
]
]
def addEvaluateJsonPathResponse = restClient.post(
path: '/process-groups/' + processGroupId2 + '/processors',
headers: [
'Content-Type': 'application/json',
'Accept': 'application/json'
],
body: evaluateJsonPathProcessor
)
def evaluateJsonPathProcessorId = addEvaluateJsonPathResponse.data.id
// 添加LogAttribute处理器
def logAttributeProcessor = [
'revision': [
'clientId': clientId,
'version': 0
],
'component': [
'name': 'LogAttribute',
'type': 'org.apache.nifi.processors.attributes.LogAttribute',
'position': [
'x': 600,
'y': 200
],
'bundle': [
'group': 'org.apache