本文介绍了如何将结果写入Dataflow/Beam中的gcs中的JSON文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Python Beam SDK 0.6.0.我想将输出写入Google Cloud Storage中的JSON.最好的方法是什么?

I'm using the Python Beam SDK 0.6.0. And I would like to write my output to JSON in Google Cloud Storage. What is the best way to do this?

我可以从Text IO接收器中使用WriteToText,但是然后我必须分别格式化每一行,对吗?如何将结果保存到包含对象列表的有效JSON文件中?

I quess I can use WriteToText from the Text IO sink but then I have to format each row separately, right? How do I save my result into valid JSON files that contain lists of objects?

推荐答案

好,作为参考,我通过在Beam SDK的WriteToText使用的_TextSink上编写自己的接收器构建来解决此问题.

Ok, for reference, I solved this by writing my own sink building on the _TextSink used by WriteToText in the beam SDK.

不确定这是否是最好的方法,但到目前为止效果很好.希望它可以帮助其他人.

Not sure if this is the best way to do it but it works well so far. Hope it might help someone else.

import os
import json

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.iobase import Write
from apache_beam.transforms import PTransform   

class _JsonSink(beam.io.FileSink):
    """A Dataflow sink for writing JSON files."""

    def __init__(self,
                 file_path_prefix,
                 file_name_suffix='',
                 num_shards=0,
                 shard_name_template=None,
                 coder=coders.ToStringCoder(),
                 compression_type=beam.io.CompressionTypes.AUTO):

        super(_JsonSink, self).__init__(
            file_path_prefix,
            file_name_suffix=file_name_suffix,
            num_shards=num_shards,
            shard_name_template=shard_name_template,
            coder=coder,
            mime_type='text/plain',
            compression_type=compression_type)
        self.last_rows = dict()

    def open(self, temp_path):
        """ Open file and initialize it w opening a list."""
        file_handle = super(_JsonSink, self).open(temp_path)
        file_handle.write('[\n')
        return file_handle

    def write_record(self, file_handle, value):
        """Writes a single encoded record converted to JSON and terminates the
        line w a comma."""
        if self.last_rows.get(file_handle, None) is not None:
            file_handle.write(self.coder.encode(
                json.dumps(self.last_rows[file_handle])))
            file_handle.write(',\n')

        self.last_rows[file_handle] = value

    def close(self, file_handle):
        """Finalize the JSON list and close the file handle returned from
        ``open()``. Called after all records are written.
        """
        if file_handle is not None:
            # Write last row without a comma
            file_handle.write(self.coder.encode(
                json.dumps(self.last_rows[file_handle])))

            # Close list and then the file
            file_handle.write('\n]\n')
            file_handle.close()


class WriteToJson(PTransform):
    """PTransform for writing to JSON files."""

    def __init__(self,
                 file_path_prefix,
                 file_name_suffix='',
                 num_shards=0,
                 shard_name_template=None,
                 coder=coders.ToStringCoder(),
                 compression_type=beam.io.CompressionTypes.AUTO):

        self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards,
                               shard_name_template, coder, compression_type)

    def expand(self, pcoll):
        return pcoll | Write(self._sink)

使用接收器类似于使用文本接收器:

Using the sink is similar to how you use the the text sink:

pcol | WriteToJson('gs://path/to/file', file_name_suffix='.json')

这篇关于如何将结果写入Dataflow/Beam中的gcs中的JSON文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-01 08:13