问题描述
我正在使用Python Beam SDK 0.6.0.我想将输出写入Google Cloud Storage中的JSON.最好的方法是什么?
I'm using the Python Beam SDK 0.6.0. And I would like to write my output to JSON in Google Cloud Storage. What is the best way to do this?
我可以从Text IO接收器中使用WriteToText
,但是然后我必须分别格式化每一行,对吗?如何将结果保存到包含对象列表的有效JSON文件中?
I quess I can use WriteToText
from the Text IO sink but then I have to format each row separately, right? How do I save my result into valid JSON files that contain lists of objects?
推荐答案
好,作为参考,我通过在Beam SDK的WriteToText
使用的_TextSink
上编写自己的接收器构建来解决此问题.
Ok, for reference, I solved this by writing my own sink building on the _TextSink
used by WriteToText
in the beam SDK.
不确定这是否是最好的方法,但到目前为止效果很好.希望它可以帮助其他人.
Not sure if this is the best way to do it but it works well so far. Hope it might help someone else.
import os
import json
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.iobase import Write
from apache_beam.transforms import PTransform
class _JsonSink(beam.io.FileSink):
"""A Dataflow sink for writing JSON files."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=beam.io.CompressionTypes.AUTO):
super(_JsonSink, self).__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
num_shards=num_shards,
shard_name_template=shard_name_template,
coder=coder,
mime_type='text/plain',
compression_type=compression_type)
self.last_rows = dict()
def open(self, temp_path):
""" Open file and initialize it w opening a list."""
file_handle = super(_JsonSink, self).open(temp_path)
file_handle.write('[\n')
return file_handle
def write_record(self, file_handle, value):
"""Writes a single encoded record converted to JSON and terminates the
line w a comma."""
if self.last_rows.get(file_handle, None) is not None:
file_handle.write(self.coder.encode(
json.dumps(self.last_rows[file_handle])))
file_handle.write(',\n')
self.last_rows[file_handle] = value
def close(self, file_handle):
"""Finalize the JSON list and close the file handle returned from
``open()``. Called after all records are written.
"""
if file_handle is not None:
# Write last row without a comma
file_handle.write(self.coder.encode(
json.dumps(self.last_rows[file_handle])))
# Close list and then the file
file_handle.write('\n]\n')
file_handle.close()
class WriteToJson(PTransform):
"""PTransform for writing to JSON files."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=beam.io.CompressionTypes.AUTO):
self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards,
shard_name_template, coder, compression_type)
def expand(self, pcoll):
return pcoll | Write(self._sink)
使用接收器类似于使用文本接收器:
Using the sink is similar to how you use the the text sink:
pcol | WriteToJson('gs://path/to/file', file_name_suffix='.json')
这篇关于如何将结果写入Dataflow/Beam中的gcs中的JSON文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!