这个问题通常是由于RabbitMQ通道在执行操作之前已经被关闭导致的。以下是一种可能的解决方法的代码示例:
import apache_beam as beam
import pika
# 创建 RabbitMQ 连接和通道
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 定义一个 Beam 自定义 PTransform 类来发送数据到 RabbitMQ
class SendToRabbitMQ(beam.DoFn):
def __init__(self, queue_name):
self.queue_name = queue_name
def start_bundle(self):
# 在每个 bundle 开始之前重新建立 RabbitMQ 通道
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
def process(self, element):
# 将数据发送到 RabbitMQ 队列
self.channel.basic_publish(exchange='', routing_key=self.queue_name, body=element)
def finish_bundle(self):
# 在每个 bundle 结束之后关闭 RabbitMQ 通道
self.channel.close()
self.connection.close()
# 创建一个 PCollection,并将数据发送到 RabbitMQ 队列
with beam.Pipeline() as pipeline:
data = pipeline | beam.Create(['message 1', 'message 2', 'message 3'])
data | beam.ParDo(SendToRabbitMQ('my_queue'))
在上述代码示例中,我们定义了一个自定义的 SendToRabbitMQ
类,它继承自 beam.DoFn
。在 start_bundle
方法中,我们重新建立 RabbitMQ 连接和通道。在 process
方法中,我们将数据发送到 RabbitMQ 队列。在 finish_bundle
方法中,我们关闭 RabbitMQ 通道和连接。
通过在每个 bundle 开始之前重新建立 RabbitMQ 通道,并在每个 bundle 结束之后关闭通道,我们可以确保在执行操作之前通道是打开的,并且在操作完成后通道被正确关闭,从而避免了 "ChannelAlreadyClosedException" 错误。