本文介绍了在 HDFS 上查找数据的 Kafka Streams的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Kafka Streams (v0.10.0.1) 编写一个应用程序,并希望使用查找数据来丰富我正在处理的记录.这些数据(带时间戳的文件)每天(或每天 2-3 次)写入 HDFS 目录.

I'm writing an application with Kafka Streams (v0.10.0.1) and would like to enrich the records I'm processing with lookup data. This data (timestamped file) is written into a HDFS directory on daily basis (or 2-3 times a day).

如何在 Kafka Streams 应用程序中加载它并加入实际的 KStream?
当新文件到达那里时,从 HDFS 重新读取数据的最佳做法是什么?

How can I load this in the Kafka Streams application and join to the actual KStream?
What would be the best practice to reread the data from HDFS when a new file arrives there?

还是切换到 Kafka Connect 并将 RDBMS 表内容写入所有 Kafka Streams 应用程序实例都可以使用的 Kafka 主题会更好?

Or would it be better switching to Kafka Connect and write the RDBMS table content to a Kafka topic which can be consumed by all the Kafka Streams application instances?

更新:
正如所建议的那样,Kafka Connect 将是要走的路.因为查找数据在 RDBMS 中每天更新,我正在考虑将 Kafka Connect 作为预定的 一次性工作 而不是保持连接始终打开.是的,因为语义和保持连接始终打开并确保它不会被中断等的开销.对我来说,在这种情况下有一个预定的 fetch 看起来更安全.

Update:
As suggested Kafka Connect would be the way to go. Because the lookup data is updated in the RDBMS on a daily basis I was thinking about running Kafka Connect as a scheduled one-off job instead of keeping the connection always open. Yes, because of semantics and the overhead of keeping a connection always open and making sure that it won't be interrupted..etc. For me having a scheduled fetch in this case looks safer.

查找数据不大,记录可能被删除/添加/修改.我也不知道如何始终将完整转储到 Kafka 主题并截断以前的记录.启用日志压缩并为已删除的键发送空值可能不起作用,因为我不知道源系统中已删除的内容.另外,当压缩发生时我没有控制权.

The lookup data is not big and records may be deleted / added / modified. I don't know either how I can always have a full dump into a Kafka topic and truncate the previous records. Enabling log compaction and sending null values for the keys that have been deleted would probably won't work as I don't know what has been deleted in the source system. Additionally AFAIK I don't have a control when the compaction happens.

推荐答案

推荐的方法确实也是将查找数据摄取到 Kafka 中——例如通过 Kafka Connect——正如你上面自己建议的那样.

The recommend approach is indeed to ingest the lookup data into Kafka, too -- for example via Kafka Connect -- as you suggested above yourself.

但在这种情况下,我如何安排 Connect 作业每天运行,而不是连续从源表中获取,这在我的情况下是不必要的?

也许您可以更新您不希望连续运行 Kafka Connect 作业的问题?您是否关心资源消耗(DB 上的负载),如果不是每日更新",您是否关心处理的语义,或者...?

Perhaps you can update your question you do not want to have a continuous Kafka Connect job running? Are you concerned about resource consumption (load on the DB), are you concerned about the semantics of the processing if it's not "daily udpates", or...?

更新:正如所建议的,Kafka Connect 将是要走的路.由于 RDBMS 中的查找数据每天都在更新,因此我考虑将 Kafka Connect 作为计划的一次性作业运行,而不是始终保持连接打开.是的,因为语义和保持连接始终打开并确保它不会被中断等的开销.对我来说,在这种情况下有一个预定的 fetch 看起来更安全.

Kafka Connect 是安全的,并且 JDBC 连接器的构建正是为了以健壮、容错和高性能的方式将数据库表提供给 Kafka(已经有许多生产部署).所以我建议不要仅仅因为看起来更安全"而退回到批量更新"模式;就我个人而言,我认为触发每日摄取在操作上不如保持运行以进行连续(和实时!)摄取更方便,而且它还会给您的实际用例带来一些缺点(请参阅下一段).

Kafka Connect is safe, and the JDBC connector has been built for exactly the purpose of feeding DB tables into Kafka in a robust, fault-tolerant, and performant way (there are many production deployments already). So I would suggest to not fallback to "batch update" pattern just because "it looks safer"; personally, I think triggering daily ingestions is operationally less convenient than just keeping it running for continuous (and real-time!) ingestion, and it also leads to several downsides for your actual use case (see next paragraph).

当然,您的里程可能会有所不同——因此,如果您打算每天只更新一次,那就去做吧.但是你失去了 a) 在丰富发生的时间点用最新的数据库数据来丰富你的传入记录的能力,相反,b) 你实际上可能用过时/旧数据来丰富传入的记录,直到下一天更新已完成,这很可能会导致您向下游发送/提供给其他应用程序以供使用的错误数据.例如,如果客户更新了她的送货地址(在数据库中),但您每天只将此信息提供给您的流处理应用程序(可能还有许多其他应用程序),那么订单处理应用程序会将包裹运送到错误的地址,直到下一次每日摄取完成.

But of course, your mileage may vary -- so if you are set on updating just once a day, go for it. But you lose a) the ability to enrich your incoming records with the very latest DB data at the point in time when the enrichment happens, and, conversely, b) you might actually enrich the incoming records with stale/old data until the next daily update completed, which most probably will lead to incorrect data that you are sending downstream / making available to other applications for consumption. If, for example, a customer updates her shipping address (in the DB) but you only make this information available to your stream processing app (and potentially many other apps) once per day, then an order processing app will ship packages to the wrong address until the next daily ingest will complete.

查找数据不大,可能会删除/添加/修改记录.我也不知道如何始终将完整转储到 Kafka 主题并截断以前的记录.启用日志压缩并为已删除的键发送空值可能不起作用,因为我不知道源系统中已删除的内容.

Kafka Connect 的 JDBC 连接器已经为您自动处理了:1. 它确保数据库插入/更新/删除正确反映在 Kafka 主题中,以及 2. Kafka 的日志压缩确保目标主题不会增长越界.也许您可能想阅读文档中的 JDBC 连接器以了解您可以免费获得哪些功能:http://docs.confluent.io/current/connect/connect-jdbc/docs/ ?

The JDBC connector for Kafka Connect already handles this automatically for you: 1. it ensures that DB inserts/updates/deletes are properly reflected in a Kafka topic, and 2. Kafka's log compaction ensures that the target topic doesn't grow out of bounds. Perhaps you may want to read up on the JDBC connector in the docs to learn which functionality you just get for free: http://docs.confluent.io/current/connect/connect-jdbc/docs/ ?

这篇关于在 HDFS 上查找数据的 Kafka Streams的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-11 01:10