Spark Structured Streaming 支持精确一次性语义。但是,需要注意的是,如果按照系统时间进行分区,则必须确保具有相同时间戳的事件会被放置到同一个分区中,以避免重复处理或数据丢失。
下面是一个示例代码,演示了如何将事件按照系统时间进行分区:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.ProcessingTime
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input_topic")
.option("startingOffsets", "earliest")
.load()
val partitionByTime = stream.selectExpr("*", "cast(timestamp as date) as system_time")
.writeStream
.format("parquet")
.option("path", "output_directory")
.partitionBy("system_time")
.option("checkpointLocation", "checkpoints_directory")
.trigger(ProcessingTime("1 minute"))
.start()
partitionByTime.awaitTermination()
在上述示例中,我们通过从 Kafka 主题 input_topic 接收数据,并将它们写入到输出目录 output_directory 中,使用日期作为分区键。代码处理时间为一分钟。
需要注意的是,这种分区策略只适用于按天处理数据的情况。如果要按小时、分钟或秒进行分区,则需要将日期和时间戳作为分区键。
上一篇:按系统分类的逐月SQL查询