本文介绍了在 spark-kafka 中使用 schema 将 ConsumerRecord 值转换为 Dataframe的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 Spark 2.0.2 和 Kafka 0.11.0,并且我试图在火花流中使用来自 kafka 的消息.代码如下:

I am using Spark 2.0.2, with Kafka 0.11.0, and I am trying to consume message from kafka in spark streaming. Following is the code:

val topics = "notes"
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:7092",
  "schema.registry.url" -> "http://localhost:7070",
  "group.id" -> "connect-cluster1",
  "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)
val topicSet: Set[String] = Set(topics)
val stream = KafkaUtils.createDirectStream[String, String](
  SparkStream.ssc,
  PreferConsistent,
  Subscribe[String, String](topicSet, kafkaParams)
)
stream.foreachRDD ( rdd => {
  rdd.foreachPartition(iterator => {
    while (iterator.hasNext) {
      val next = iterator.next()
      println(next.value())
    }
  })
})

如果 Kafka 消息包含记录,输出将是:

If Kafka message contain records, the output would be:

{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312886984, "createdby": "karthik", "notes": "testing20"}
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312890472, "createdby": "karthik", "notes": "testing21"}

因此,从 consumerRecord 的值可以看出,收到的消息是 Avro 解码的.现在我需要数据帧格式的这些记录,但我不知道如何从这里开始,即使手头的架构如下:

Thus, received message is Avro decoded as seen from the consumerRecord's value.Now I need those records in a dataframe format, but I do not know how to proceed from here, even with the schema at hand as follows:

val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val m = sr.getLatestSchemaMetadata(topics + "-value")
val schemaId = m.getId
val schemaString = m.getSchema

val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry)
val parser = new Schema.Parser()
val avroSchema = parser.parse(schemaString)
println(avroSchema)

模式打印如下:

{"type":"record","name":"notes","namespace":"db","fields":[{"name":"id","type":["null","string"],"default":null},{"name":"createdat","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"createdby","type":["null","string"],"default":null},{"name":"notes","type":["null","string"],"default":null}],"connect.name":"db.notes"}

谁能帮助我了解如何从消费者记录的值中获取数据框?我查看了其他问题,例如 使用架构使用 Spark 将 AVRO 消息转换为 DataFrame在运行 Spark Streaming 应用程序时处理架构更改,但它们首先不处理 consumerRecord.

Can anyone help me understand how to get the dataframe from the consumer record's value? I have looked at other questions such as Use schema to convert AVRO messages with Spark to DataFrame, Handling schema changes in running Spark Streaming application, but they are not dealing with the consumerRecord in the firstplace.

推荐答案

您可以使用以下代码段:stream是kafka010的kafkaUtils api返回的消费者记录的DStream:

You can use below snippet :stream is the DStream of consumer record returned from kafkaUtils api of kafka010 :

stream.foreachRDD(rdd =>
    if (!rdd.isEmpty()) {
        val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
        import sqlContext.implicits._
        val topicValueStrings = rdd.map(record => (record.value()).toString)
        val df = sqlContext.read.json(topicValueStrings)
        df.show()
    })

这篇关于在 spark-kafka 中使用 schema 将 ConsumerRecord 值转换为 Dataframe的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 23:58