不使用SeekToCurrentErrorHandler的DeadLetterPublishingRecoverer
创始人
2024-12-29 08:00:39
0

如果您需要在不使用SeekToCurrentErrorHandler的情况下实现Dead Letter Publishing(DLP)恢复器,可以使用ErrorHandlingDeserializer和自定义的ErrorHandler来处理反序列化错误。

以下是一个示例代码,演示了如何在不使用SeekToCurrentErrorHandler的情况下实现DLP恢复器:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory consumerFactory() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, DeadLetterMessage.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // 可信的包路径

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setErrorHandler(new DlpErrorHandler()); // 设置自定义的错误处理器
        return factory;
    }
}

@Component
public class DlpErrorHandler implements ErrorHandler {

    private final KafkaTemplate kafkaTemplate;
    private final String deadLetterTopic = "dead-letter-topic";

    public DlpErrorHandler(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void handle(Exception thrownException, ConsumerRecord record) {
        // 将失败的消息发送到Dead Letter Topic
        kafkaTemplate.send(deadLetterTopic, record.value().toString());
    }
}

在上面的代码中,我们首先在consumerFactory中配置了ErrorHandlingDeserializer,它用于处理反序列化错误。我们还使用了JsonDeserializer来反序列化消息,将其转换为DeadLetterMessage对象。JsonDeserializer.TRUSTED_PACKAGES属性用于设置可信的包路径。

然后,我们创建了一个DlpErrorHandler类来自定义错误处理逻辑。在handle方法中,我们将失败的消息发送到Dead Letter Topic,这里使用了一个KafkaTemplate实例来发送消息。

最后,在kafkaListenerContainerFactory中设置了自定义的错误处理器DlpErrorHandler

这样,当消费者在处理消息时发生反序列化错误时,将会触发DlpErrorHandlerhandle方法,将消息发送到Dead Letter Topic。

请注意,这只是一个简单的示例,您可以根据实际需求进行修改和扩展。

相关内容

热门资讯

8分钟透明!乐宝数独有挂吗,微... 8分钟透明!乐宝数独有挂吗,微扑克有挂(详细透视辅助挂教程)1、让任何用户在无需AI插件第三方神器的...
8分钟了解!jj斗地主看底牌神... 8分钟了解!jj斗地主看底牌神器,来玩德州app外 挂(详细透视辅助app教程);玩家必备必赢加哟《...
七分钟技巧!碣石暗宝吗,wep... 这是一款非常优秀的碣石暗宝吗 ia辅助检测软件,能够让你了解到碣石暗宝吗中牌率当中全部隐藏参数,与同...
7分钟攻略!南通长牌辅助软件出... 7分钟攻略!南通长牌辅助软件出售,线上德州有后台操控(详细透视辅助app教程);原来确实真的有挂(需...
一分钟透明!约战丹东麻将怎么赢... 一分钟透明!约战丹东麻将怎么赢,眯眯扑克可以赢(详细透视辅助挂教程);《WPK辅助透视》‌:支持手机...
五分钟黑科技!闲逸碰胡怎么拿到... 五分钟黑科技!闲逸碰胡怎么拿到好牌,德州ai软件使用(详细透视辅助器教程)准备好在闲逸碰胡怎么拿到好...
4分钟教程!白金岛跑胡子辅助器... 4分钟教程!白金岛跑胡子辅助器,德州之星辅助器哪里买(详细透视辅助助手教程)1、很好的工具软件,可以...
七分钟介绍!吉祥辅牌器哪里搞,... 相信很多朋友都在电脑上玩过吉祥辅牌器哪里搞吧,但是很多朋友都在抱怨用电脑玩起来不方便。为此小编给大家...
9分钟攻略!温州游戏茶苑有没有... 9分钟攻略!温州游戏茶苑有没有外挂,wepower伙牌(详细透视辅助挂教程);(需添加指定薇7574...
四分钟黑科技!约战竞技场能开挂... 四分钟黑科技!约战竞技场能开挂吗,wopoker有外 挂(详细透视辅助黑科技教程);约战竞技场能开挂...