使用以下 Beam 代码来从 Kafka 读取数据并将其写入到文件中,每 10 秒钟创建一个新文件:
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms.trigger import AfterProcessingTime
# Kafka configuration
KAFKA_BOOTSTRAP_SERVERS = ''
KAFKA_TOPIC = ''
# Beam pipeline configuration
OUTPUT_DIR = ''
# Pipeline
def run():
with beam.Pipeline() as p:
records = (
p
| 'ReadFromKafka' >> beam.io.ReadFromKafka(
consumer_config={'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS},
topics=[KAFKA_TOPIC],
)
| 'Format' >> beam.Map(lambda r: str(r.value, 'utf-8'))
| 'Window' >> beam.WindowInto(window.FixedWindows(10))
| 'WriteToFiles' >> beam.io.WriteToText(
OUTPUT_DIR,
file_name_suffix='.txt'
)
)
if __name__ == '__main__':
run()
这段代码定义了一个 Beam 管道,它包含从 Kafka 读取记录、转换记录、应用固定窗口和将数据写入文本文件的步骤。这里的窗口大小为10秒钟,并且每10秒钟都会创建一个新文件。
上一篇:ApacheBeamCombine.perKeyusingacompoundkey
下一篇:ApacheBeam错误信息“无匹配签名的运算符=',参数类型为DATE和INT64。支持的签名:ANY=ANY。”