不使用SeekToCurrentErrorHandler的DeadLetterPublishingRecoverer
创始人
2024-12-29 08:00:39
0

如果您需要在不使用SeekToCurrentErrorHandler的情况下实现Dead Letter Publishing(DLP)恢复器,可以使用ErrorHandlingDeserializer和自定义的ErrorHandler来处理反序列化错误。

以下是一个示例代码,演示了如何在不使用SeekToCurrentErrorHandler的情况下实现DLP恢复器:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory consumerFactory() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, DeadLetterMessage.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // 可信的包路径

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setErrorHandler(new DlpErrorHandler()); // 设置自定义的错误处理器
        return factory;
    }
}

@Component
public class DlpErrorHandler implements ErrorHandler {

    private final KafkaTemplate kafkaTemplate;
    private final String deadLetterTopic = "dead-letter-topic";

    public DlpErrorHandler(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void handle(Exception thrownException, ConsumerRecord record) {
        // 将失败的消息发送到Dead Letter Topic
        kafkaTemplate.send(deadLetterTopic, record.value().toString());
    }
}

在上面的代码中,我们首先在consumerFactory中配置了ErrorHandlingDeserializer,它用于处理反序列化错误。我们还使用了JsonDeserializer来反序列化消息,将其转换为DeadLetterMessage对象。JsonDeserializer.TRUSTED_PACKAGES属性用于设置可信的包路径。

然后,我们创建了一个DlpErrorHandler类来自定义错误处理逻辑。在handle方法中,我们将失败的消息发送到Dead Letter Topic,这里使用了一个KafkaTemplate实例来发送消息。

最后,在kafkaListenerContainerFactory中设置了自定义的错误处理器DlpErrorHandler

这样,当消费者在处理消息时发生反序列化错误时,将会触发DlpErrorHandlerhandle方法,将消息发送到Dead Letter Topic。

请注意,这只是一个简单的示例,您可以根据实际需求进行修改和扩展。

相关内容

热门资讯

盘点辅助!手机透视辅助器app... 盘点辅助!手机透视辅助器app,开心泉州小程序辅助,玩家必用开挂(有挂总结);无需打开直接搜索加薇1...
正版辅助!葫芦娃七子连心攻略,... >>您好:葫芦娃七子连心攻略确实是有挂的,很多玩家在这款葫芦娃七子连心攻略游戏中打牌都会发现很多用户...
科技辅助!随意玩透视科技游戏,... 您好:随意玩透视科技游戏这款游戏可以开挂的,确实是有挂的,很多玩家在这款游戏中打牌都会发现很多用户的...
传授辅助!丰城瓜瓜棋牌辅助,四... 传授辅助!丰城瓜瓜棋牌辅助,四川途游辅助软件下载,一分钟揭秘开挂(有挂教程);无需打开直接搜索微信(...
发现辅助!约局吧可以看到别人底... 发现辅助!约局吧可以看到别人底牌,兴动互娱辅助脚本,透视黑科技开挂(有挂猫腻)约局吧可以看到别人底牌...
实测辅助!微信小程序微乐房间怎... 实测辅助!微信小程序微乐房间怎么辅助,萍乡滚筒四幅攻略,普及知识开挂(果真有挂)您好:微信小程序微乐...
发现辅助!人海大厅挂件怎么买,... 人海大厅挂件怎么买是一款可以让一直输的玩家,快速成为一个“必胜”的ai辅助神器,有需要的用户可以加我...
传授辅助!yy比鸡辅助有哪些功... 传授辅助!yy比鸡辅助有哪些功能,九九山城辅助免费,记者揭秘开挂(有挂透明挂);无需打开直接搜索加(...
分享辅助!吉祥填大坑脚本,新九... 分享辅助!吉祥填大坑脚本,新九天辅助,一分钟带你了解开挂(有挂规律)1、下载安装好吉祥填大坑脚本,进...
详细辅助!新518互娱脚本下载... 较多好评“微乐万能挂官网”开挂(透视)辅助教程 了解更多开挂安装加(136704302)微信号是一款...