本文介绍了如何在收到记录的年,月和日中将流写入S3?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的流,它从Kafka主题中读取一些数据:

I have a simple streams that reads some data from a Kafka topic:

 val ds = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1")
      .option("subscribe", "topic1")
      .option("startingOffsets", "earliest")
      .load()

val df = ds.selectExpr("cast (value as string) as json")
      .select(from_json($"json", schema).as("data"))
      .select("data.*")

我想根据收到日期将该数据存储在S3中,所以类似:

I want to store this data in S3 based on the day it's received, so something like:

s3_bucket/year/month/day/data.json

当我想写数据时,我要做:

When I want to write the data I do:

df.writeStream
  .format("json")
  .outputMode("append")
  .option("path", s3_path)
  .start()

但是,如果我这样做,我只能指定一个路径.有没有一种方法可以根据日期动态更改s3路径?

But if I do this I get to only specify one path. Is there a way to change the s3 path dynamically based on the date?

推荐答案

使用partitionBy子句:

import org.apache.spark.sql.functions._

df.select(
    dayofmonth(current_date()) as "day",
    month(current_date()) as "month",
    year(current_date()) as "year",
    $"*")
  .writeStream
  .partitionBy("year", "month", "day")
  ... // all other options

这篇关于如何在收到记录的年,月和日中将流写入S3?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-01 08:11