本文介绍了访问文件名中的信息(元数据)&输入 Beam 管道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的文件名包含我在管道中需要的信息,例如我的数据点的标识符是文件名的一部分,而不是数据中的字段.例如,每个风力涡轮机都会生成一个文件 Turbo-loc-001-007.csv.例如,我需要管道内的 loc 数据.

My filename contains information that I need in my pipeline, for example the identifier for my data points is part of the filename and not a field in the data. e.g Every wind turbine generates a file turbine-loc-001-007.csv. e.g And I need the loc data within the pipeline.

推荐答案

Java (sdk 2.9.0):

Java (sdk 2.9.0):

Beams TextIO 阅读器不授予访问文件名本身的权限,对于这些用例,我们需要使用 FileIO 来匹配文件并访问存储在文件名中的信息.与 TextIO 不同,文件的读取需要由用户在 FileIO 读取下游的转换中处理.FileIO 读取的结果是 PCollection,ReadableFile 类包含文件名作为元数据,可与文件内容一起使用.

Beams TextIO readers do not give access to the filename itself, for these use cases we need to make use of FileIO to match the files and gain access to the information stored in the file name. Unlike TextIO, the reading of the file needs to be taken care of by the user in transforms downstream of the FileIO read. The results of a FileIO read is a PCollection the ReadableFile class contains the file name as metadata which can be used along with the contents of the file.

FileIO 确实有一个方便的方法 readFullyAsUTF8String() 它将整个文件读入一个 String 对象,这将首先将整个文件读入内存.如果内存是一个问题,您可以使用 FileSystems 等实用程序类直接处理文件.

FileIO does have a convenience method readFullyAsUTF8String() which will read the entire file into a String object, this will read the whole file into memory first. If memory is a concern you can work directly with the file with utility classes like FileSystems.

来自:文档链接

PCollection<KV<String, String>> filesAndContents = p
     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
     // withCompression can be omitted - by default compression is detected from the filename.
     .apply(FileIO.readMatches().withCompression(GZIP))
     .apply(MapElements
         // uses imports from TypeDescriptors
         .into(KVs(strings(), strings()))
         .via((ReadableFile f) -> KV.of(
             f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));

Python (sdk 2.9.0):

Python (sdk 2.9.0):

对于 Python 2.9.0,您需要从 Dataflow 管道外部收集 URI 列表,并将其作为参数提供给管道.例如,使用 FileSystems 通过 Glob 模式读入文件列表,然后将其传递给 PCollection 进行处理.

For 2.9.0 for python you will need to collect the list of URI from outside of the Dataflow pipeline and feed it in as a parameter to the pipeline. For example making use of FileSystems to read in the list of files via a Glob pattern and then passing that to a PCollection for processing.

一旦fileio见PR https://github.com/apache/beam/pull/7791/ 可用,下面的代码也是python的一个选项.

Once fileio see PR https://github.com/apache/beam/pull/7791/ is available, the following code would also be an option for python.

import apache_beam as beam
from apache_beam.io import fileio

with beam.Pipeline() as p:
  readable_files = (p 
                    | fileio.MatchFiles(‘hdfs://path/to/*.txt’)
                    | fileio.ReadMatches()
                    | beam.Reshuffle())
  files_and_contents = (readable_files 
                        | beam.Map(lambda x: (x.metadata.path, 
                                              x.read_utf8()))

这篇关于访问文件名中的信息(元数据)&amp;输入 Beam 管道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-24 01:59