根据
How do we set maximum_bad_records when loading a Bigquery table from dataflow?从数据流将数据加载到BigQuery时,当前无法设置maxBadRecords配置。建议在将数据流作业中的行插入BigQuery之前对其进行验证。

如果我有TableSchemaTableRow,该如何确保该行可以安全地插入表中?

比遍历模式中的字段,查看它们的类型和查看行中值的类,必须有一种更简单的方法来执行此操作,对吗?这似乎容易出错,并且该方法必须是防呆的,因为如果无法加载单行,则整个管道都会失败。

更新:

我的用例是一个ETL作业,该作业最初将在JSON(每行一个对象)上运行,并登录Cloud Storage并批量写入BigQuery,但稍后将从PubSub读取对象并连续写入BigQuery。这些对象包含BigQuery不需要的大量信息,还包含甚至无法在架构中描述的部分(基本上是自由形式的JSON有效负载)。时间戳等内容也需要进行格式化才能与BigQuery一起使用。此作业将有几种变体,它们在不同的输入上运行并写入不同的表。

从理论上讲,这不是一个很困难的过程,它需要一个对象,提取一些属性(50-100),格式化其中一些属性,然后将该对象输出到BigQuery。我或多或少地只是遍历属性名称列表,从源对象中提取值,查看配置以查看属性是否应该以某种方式进行格式化,并在必要时应用格式化(这可能是精简的方式,除以毫秒的时间戳记) ,然后从URL中提取主机名等),然后将该值写入TableRow对象。

我的问题是数据困惑。有几亿个对象,有些看上去并不像预期的那样,这很罕见,但是由于数量如此之多,还是有罕见的事情发生。有时,应包含字符串的属性包含整数,反之亦然。有时,应该有一个字符串的数组或对象。

理想情况下,我想将自己的TableRow传递给TableSchema并询问“这工作吗?”。

由于这是不可能的,所以我要做的是查看TableSchema对象,然后尝试自己验证/转换值。如果TableSchema表示某个属性的类型为STRING,则在将其添加到value.toString()之前,我先运行TableRow。如果它是INTEGER,则检查它是IntegerLong还是BigInteger,依此类推。这种方法的问题在于,我只是在猜测BigQuery中可以使用的方法。 FLOAT可以接受哪些Java数据类型?对于TIMESTAMP?我认为我的验证/广播解决了大多数问题,但是总是有异常(exception)和极端情况。

根据我的经验,这非常有限,如果单行未通过BigQuery的验证,整个工作管道(作业?工作流程?不确定正确的术语)就会失败(就像常规加载一样,除非将maxBadRecords设置为足够大的数字) 。它还会失败,并显示诸如BigQuery导入作业“dataflow_job_xxx”之类的表面有益的消息失败。原因:(5db0b2cdab1557e0):项目“xxx”中的BigQuery作业“dataflow_job_xxx”完成,错误为:errorResult:为非记录字段指定了JSON映射,错误:为非记录字段指定了JSON映射,错误:指定了JSON映射对于非记录字段,错误:为非记录字段指定了JSON映射,错误:为非记录字段指定了JSON映射,错误:为非记录字段指定的JSON映射”。也许在某个地方可以看到更详细的错误消息,该错误消息可以告诉我它是哪个属性以及值是什么?没有这些信息,它也可能会说“坏数据”。

据我所知,至少在以批处理模式运行时,Dataflow会将TableRow对象写入Cloud Storage的登台区域,然后在一切就绪后开始加载。这意味着我无处可去捕获任何错误,并且在加载BigQuery时我的代码不再运行。我还没有在流模式下运行任何作业,但是我不确定那里有什么不同,因为我(公认有限)了解基本原理是相同的,只是批大小较小。

人们使用Dataflow和BigQuery,因此,不必总是担心由于单个错误的输入而导致整个管道停止运行,就可以使这项工作变得不可能。人们是如何做到的?

最佳答案

我假设您将文件中的JSON反序列化为Map<String, Object>。然后,您应该可以使用TableSchema递归地对其进行类型检查。

我建议通过以下两个步骤来开发模式验证的迭代方法。

  • 编写一个PTransform<Map<String, Object>, TableRow>,将您的JSON行转换为TableRow对象。 TableSchema也应该是该函数的构造函数参数。您可以开始使此功能真正严格-要求JSON直接将输入解析为Integer(例如,当找到BigQuery INTEGER模式时)-并大胆地声明错误的记录。基本上,请通过严格处理来确保不输出任何无效记录。

    我们的code here做了一些类似的事情-给定BigQuery生成并以JSON格式写入GCS的文件,我们递归遍历该模式并进行一些类型转换。但是,我们不需要验证,因为BigQuery本身已写入数据。

    请注意,TableSchema对象不是Serializable。我们已通过将TableSchemaDoFn构造函数中的PTransform转换为JSON String并返回的方法来解决。参见the code in BigQueryIO.java that uses the jsonTableSchema variable
  • 使用此blog post中描述的“死信”策略来处理不良记录-从PTransform输出有问题的Map<String, Object>行并将其写入文件。这样,您可以稍后检查未通过验证的行。

  • 您可能会从一些小文件开始,并使用DirectPipelineRunner而不是DataflowPipelineRunner。直接运行程序在您的计算机上而不是Google Cloud Dataflow服务上运行管道,并且使用BigQuery流式写入。我相信,当这些写入失败时,您会收到更好的错误消息。

    (我们将GCS-> BigQuery加载作业模式用于批处理作业,因为它效率更高且更具成本效益,但是BigQuery流式传输是在Streaming作业中写入的,因为它们的延迟时间很短。)

    最后,就记录信息而言:
  • 肯定地检查Cloud Logging(通过遵循“日志”面板上的Worker Logs链接。
  • 如果您运行 bq command-line utility:bq show -j PROJECT:dataflow_job_XXXXXXX,则可能会得到有关为何批处理数据流触发的加载作业失败的更好信息。
  • 关于google-bigquery - 从数据流插入BigQuery之前先验证行,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/35180012/

    10-13 08:35