本文介绍了NiFi-如何在ExecuteStreamCommand中引用flowFile?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要执行以下操作:sed'1d'simple.tsv> noHeader.tsv

I need to execute something like:sed '1d' simple.tsv > noHeader.tsv

这将从我的大流量文件(> 1 GB)中删除第一行.

which will remove first line from my big flow file (> 1 GB).

问题是-我需要在流文件上执行它,所以它将是:

The thing is - I need to execute it on my flow file, so it'd be:

sed'1d'myFlowFile> myFlowFile

sed '1d' myFlowFile > myFlowFile

问题是:我应该如何配置ExecuteStreamCommand处理器,以便它在流文件上运行命令并将其返回到流文件?如果sed不是最佳选择,我可以考虑采用其他方式(例如,tail)

Question is: how I should configure the ExecuteStreamCommand processor so that it runs the command on my flow file and returns it back to my flow file?If sed is not a best option, I can consider doing this other way (e.g. tail)

谢谢米哈尔

编辑2(解决方案):

下面是完成我需要的最终ExecuteStreamCommand配置(从流文件中删除第一行).@Andy-非常感谢所有宝贵的提示.

Below is the final ExecuteStreamCommand config that does what I need (remove 1st line from the flow file).@Andy - thanks a lot for all the precious hints.

推荐答案

Michal,

我想确保我正确理解了您的问题,因为我认为有更好的解决方案.

I want to make sure I'm understanding your problem correctly, because I think there are better solutions.

问题:

您已将1GB TSV加载到NiFi中,并且要删除第一行.

You have a 1GB TSV loaded into NiFi and you want to remove the first line.

解决方案:

如果文件较小,最好的解决方案是使用 ReplaceText 处理器:

If your file was smaller, the best solution would be to use a ReplaceText processor with the following processor properties:

  • 搜索值:^.*\n
  • 替换值:<-空字符串
  • Search Value: ^.*\n
  • Replacement Value: <- empty string

这将删除第一行,而不必将1GB内容从NiFi发送到命令行,然后重新摄取结果.不幸的是,要使用正则表达式,您需要设置最大缓冲区大小,这意味着需要将全部内容读入堆内存才能执行此操作.

That would strip the first line out without having to send the 1GB content out of NiFi to the command-line and then re-ingest the results. Unfortunately, to use a regular expression, you need to set a Maximum Buffer Size, which means the entire contents need to be read into heap memory to perform this operation.

对于1G​​B的文件,如果您知道第一行的确切值,则应尝试 ModifyBytes ,您可以从流文件内容的开头和/或结尾开始修剪字节数.然后,您可以简单地指示处理器丢弃内容的前 n 个字节.由于使用了NiFi的写时复制内容存储库,您仍然会有〜2GB的数据,但是它使用8192B的缓冲区大小以流方式进行处理.

With a 1GB file, if you know the exact value of the first line, you should try ModifyBytes which allows you to trim a byte count from the beginning and/or end of the flowfile contents. Then you could simply instruct the processor to drop the first n bytes of the content. Because of NiFi's copy-on-write content repository, you will still have ~2GB of data, but it does it in a streaming manner using an 8192B buffer size.

我最好的建议是使用 ExecuteScript 处理器.该处理器允许您以多种语言(Groovy,Python,Ruby,Lua,JS)编写自定义代码,并在流文件上执行该代码.使用如下所示的Groovy脚本,您可以删除第一行并以流方式复制其余部分,这样就不会对堆造成不必要的负担.

My best suggestion is to use an ExecuteScript processor. This processor allows you to write custom code in a variety of languages (Groovy, Python, Ruby, Lua, JS) and have it execute on the flowfile. Using a Groovy script like the one below, you could remove the first line and copy the remainder in a streaming fashion so the heap does not get unnecessarily taxed.

我用1MB的文件进行了测试,每个流文件(MacBook Pro 2015、16 GB RAM,OS X 10.11.6)花费了大约1.06秒.在更好的计算机上,您显然会获得更好的吞吐量,并且可以将其扩展到更大的文件.

I tested this with 1MB files and it took about 1.06 seconds for each flowfile (MacBook Pro 2015, 16 GB RAM, OS X 10.11.6). On a better machine you'll obviously get better throughput, and you can scale that up to your larger files.

def flowfile = session.get()
if (!flowfile) return

try {
    // Here we are reading from the current flowfile content and writing to the new content
    flowfile = session.write(flowfile, { inputStream, outputStream ->
        def bufferedReader = new BufferedReader(new InputStreamReader(inputStream))

        // Ignoring the first line
        def ignoredFirstLine = bufferedReader.readLine()

        def bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream))
        def line

        int i = 0

        // While the incoming line is not empty, write it to the outputStream
        while ((line = bufferedReader.readLine()) != null) {
            bufferedWriter.write(line)
            bufferedWriter.newLine()
            i++
        }

        // By default, INFO doesn't show in the logs and WARN will appear in the processor bulletins
        log.warn("Wrote ${i} lines to output")

        bufferedReader.close()
        bufferedWriter.close()
    } as StreamCallback)

    session.transfer(flowfile, REL_SUCCESS)
} catch (Exception e) {
    log.error(e)
    session.transfer(flowfile, REL_FAILURE)
}

请注意,通常,NiFi的一个好习惯是在可能的情况下将巨型文本文件拆分为较小的组件流文件(使用类似SplitText的文件),以获得并行处理的好处.如果1GB的输入是视频,那将不适用,但是正如您提到的TSV一样,我认为很可能会将初始流文件分割成较小的部分并对其进行并行处理(甚至发送到集群中的其他节点以用于负载平衡)可能会帮助您提高性能.

One side note, in general a good practice for NiFi is to split giant text files into smaller component flowfiles (using something like SplitText) when possible to get the benefits of parallel processing. If the 1GB input was video, this wouldn't be applicable, but as you mentioned TSV, I think it's likely that splitting the initial flowfile into smaller pieces and operating on them in parallel (or even sending out to other nodes in the cluster for load balancing) may help your performance here.

我意识到我没有回答您的原始问题-如何将流文件的内容导入到ExecuteStreamCommand处理器命令行调用中.如果要对属性值进行操作,则可以使用参数"字段中的"rel ="noreferrer">表达语言语法${attribute_name}.但是,由于内容不能从EL引用,并且您不想通过将1GB的内容移到属性中来破坏堆,因此最好的解决方案是使用PutFile将内容写出到文件中,对提供的文件名运行sed命令并将其写入另一个文件,然后使用GetFile将这些内容读回到NiFi中的流文件中.

I realized I did not answer your original question -- how to get the content of a flowfile into the ExecuteStreamCommand processor command-line invocation. If you wanted to operate on the value of an attribute, you could reference the attribute value with the Expression Language syntax ${attribute_name} in the Arguments field. However, as the content is not referenceable from the EL, and you don't want to destroy the heap by moving the 1GB content into an attribute, the best solution there would be to write the contents out to a file using PutFile, run the sed command against the provided filename and write it to another file, and then use GetFile to read those contents back into a flowfile in NiFi.

这是一个模板,该示例演示了将ExecuteStreamCommandrev和针对流文件内容,然后将输出放入新流文件的内容中.您可以运行流并监视logs/nifi-app.log以查看输出,或使用数据出处查询来检查每个处理器执行的修改.

Here is a template which demonstrates using ExecuteStreamCommand with both rev and sed against flowfile content and putting the output into the content of the new flowfile. You can run the flow and monitor logs/nifi-app.log to see the output or use the data provenance query to examine the modification that each processor performs.

这篇关于NiFi-如何在ExecuteStreamCommand中引用flowFile?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-01 08:30