不使用SeekToCurrentErrorHandler的DeadLetterPublishingRecoverer
创始人
2024-12-29 08:00:39
0

如果您需要在不使用SeekToCurrentErrorHandler的情况下实现Dead Letter Publishing(DLP)恢复器,可以使用ErrorHandlingDeserializer和自定义的ErrorHandler来处理反序列化错误。

以下是一个示例代码,演示了如何在不使用SeekToCurrentErrorHandler的情况下实现DLP恢复器:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory consumerFactory() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, DeadLetterMessage.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // 可信的包路径

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setErrorHandler(new DlpErrorHandler()); // 设置自定义的错误处理器
        return factory;
    }
}

@Component
public class DlpErrorHandler implements ErrorHandler {

    private final KafkaTemplate kafkaTemplate;
    private final String deadLetterTopic = "dead-letter-topic";

    public DlpErrorHandler(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void handle(Exception thrownException, ConsumerRecord record) {
        // 将失败的消息发送到Dead Letter Topic
        kafkaTemplate.send(deadLetterTopic, record.value().toString());
    }
}

在上面的代码中,我们首先在consumerFactory中配置了ErrorHandlingDeserializer,它用于处理反序列化错误。我们还使用了JsonDeserializer来反序列化消息,将其转换为DeadLetterMessage对象。JsonDeserializer.TRUSTED_PACKAGES属性用于设置可信的包路径。

然后,我们创建了一个DlpErrorHandler类来自定义错误处理逻辑。在handle方法中,我们将失败的消息发送到Dead Letter Topic,这里使用了一个KafkaTemplate实例来发送消息。

最后,在kafkaListenerContainerFactory中设置了自定义的错误处理器DlpErrorHandler

这样,当消费者在处理消息时发生反序列化错误时,将会触发DlpErrorHandlerhandle方法,将消息发送到Dead Letter Topic。

请注意,这只是一个简单的示例,您可以根据实际需求进行修改和扩展。

相关内容

热门资讯

透视练习!newpoker脚本... 透视练习!newpoker脚本(透视)epoker透视(辅助)一贯一直都是有插件(哔哩哔哩)1、任何...
目前!菠萝德普辅助器免费版在哪... 目前!菠萝德普辅助器免费版在哪里(透视)兴动互娱技巧(果然是真的辅助下载)-哔哩哔哩1、下载好兴动互...
经核实!wepoker辅助软件... 经核实!wepoker辅助软件视频(透视)金虎爷有挂吗(其实有辅助插件)-哔哩哔哩1、在wepoke...
透视积累!红龙poker辅助(... 透视积累!红龙poker辅助(透视)pokerrrr2辅助(辅助)果然一直总是有工具(哔哩哔哩);1...
现有关情况通报如下!pokem... 现有关情况通报如下!pokemmo手机脚本辅助器(透视)透视辅助功能插件(好像真的是有辅助工具)-哔...
透视步骤!werplan怎么作... 透视步骤!werplan怎么作必弊(透视)拱趴大菠萝有挂吗(辅助)切实是有方法(哔哩哔哩)1、玩家可...
随着!扑克之星辅助(透视)浙江... 随着!扑克之星辅助(透视)浙江温州游戏辅助器(真是真的是有辅助工具)-哔哩哔哩1、浙江温州游戏辅助器...
透视学习!epoker免费透视... 透视学习!epoker免费透视脚本(透视)werplan免费挂下载(辅助)都是真的是有插件(哔哩哔哩...
据权威媒体报道!we poke... 据权威媒体报道!we poker游戏下(透视)创思维激k看底牌辅助开发商(原来有辅助神器)-哔哩哔哩...
透视演示!德州局透视(透视)i... 透视演示!德州局透视(透视)impoker辅助(辅助)切实一直总是有教程(哔哩哔哩)1、金币登录送、...