使用自定义编码器实例化kafka生产者时,无法实例化

采取行动

我是kafka的新手。我想在我的春季项目中使用kafka生产者/消费者来替换activeMQ(jms)。我需要的是kafka生产者将我的消息对象发布到某个主题,而消费者则从该主题中订阅它。

首先是我的自定义编码器,与解码器相同(对于我的消息类ConfigurationActionMsg):

@Component
public class ConfigActionMessageEncoder implements Encoder<ConfigurationActionMsg> {

    public ConfigActionMessageEncoder() {
    /* This constructor must be present for successful compile. */
    }
    public ConfigActionMessageEncoder(VerifiableProperties verifiableProperties) {
    /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(ConfigurationActionMsg actionMsg){
        return SerializationUtils.serialize(actionMsg);
    }}

以下是我针对处理器和使用者的配置

@Configuration
@ComponentScan(basePackages = {"XXX"})
public class KafkaConfig {
@Bean
public KafkaProducer<String,ConfigurationActionMsg> kafkaProducer(){
    Properties props = new Properties();
    props.put("zk.connect", "127.0.0.1:2181");
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "com.atlas.configengine2.XXX.ConfigActionMessageEncoder");

    return new KafkaProducer<>(props);
}

@Bean
public KafkaConsumer<String, ConfigurationActionMsg> kafkaConsumer(){

    Properties props = new Properties();
    props.put("zk.connect", "127.0.0.1:2181");
    props.put("bootstrap.servers", "localhost:9092");
    //We should only have one process running for consumer
    props.put("group.id", "resolverActionTrigger");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "com.atlas.configengine2.XXX.ConfigActionMessageDecoder");
    KafkaConsumer<String, ConfigurationActionMsg> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("configAction"));
    return consumer;
}}

我不确定这是否是实例化生产者/消费者的正确方法。但是这种方式行不通。因为我的kafkaProducer无法实例化。

一些调试信息:

引起原因:org.apache.kafka.common.KafkaException:com.atlas.configengine2.jms.ConfigActionMessageEncoder不是org.apache.kafka.common.serialization.Serializer的实例

但是我不确定这是否是唯一的问题?以及如何编写自定义编码器呢?

卡玛尔·昌德拉珀卡什

你应该实现org.apache.kafka.common.serialization.SerializerEncoder

请参见CustomSerializer示例。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

使用自定义编码器实例化kafka生产者时,无法实例化

来自分类Dev

如何在PCollection <KV <String,B >>中使用自定义编码器?

来自分类Dev

编译Kafka的自定义生产者

来自分类Dev

无法使用情节提要自定义实例化窗口控制器

来自分类Dev

如何在Spark 2.X数据集中创建自定义编码器?

来自分类Dev

WCF。在自定义编码器中绑定请求和响应

来自分类Dev

WCF。在自定义编码器中绑定请求和响应

来自分类Dev

使用NSData自定义编码,初始化但崩溃

来自分类Dev

如何在Eclipse下使用自定义Kafka生产者修复NoClassDefFoundError?

来自分类Dev

WCF自定义编码器-在运行时分配消息内容类型

来自分类Dev

我是否需要通过CDI中的生产者进行所有对象实例化

来自分类Dev

使用自定义初始化程序swift以编程方式实例化和推送视图控制器

来自分类Dev

WordPress和自定义编码

来自分类Dev

无法使用 dockerized 生产者生产到 Kafka

来自分类Dev

通过GeneralizedNewtypeDeriving派生实例时使用自定义实例

来自分类Dev

如何定义运行使用自定义初始化程序的结构实例时应运行的自定义闭包(SwiftUI)

来自分类Dev

SAP Fiori Launchpad 自定义应用程序错误:无法实例化对象:缺少“新”

来自分类常见问题

无法使用Springboot构建Kafka生产者

来自分类Dev

Kafka 生产者和消费者在单独的 EC2 实例中

来自分类Dev

实例化自定义Java(NIO)文件系统的问题

来自分类Dev

以正确的方式以编程方式实例化自定义视图

来自分类Dev

实例化泛型类型以支持自定义对象

来自分类Dev

Django:自定义主键的自动实例化?

来自分类Dev

与实例化自定义对象数组的混淆

来自分类Dev

实例化后,为Slim设置自定义视图

来自分类Dev

从GameObject继承的“实例化”自定义类型

来自分类Dev

实例化泛型类型以支持自定义对象

来自分类Dev

无法在生产中实例化模块

来自分类Dev

无法在生产中实例化模块

Related 相关文章

  1. 1

    使用自定义编码器实例化kafka生产者时,无法实例化

  2. 2

    如何在PCollection <KV <String,B >>中使用自定义编码器?

  3. 3

    编译Kafka的自定义生产者

  4. 4

    无法使用情节提要自定义实例化窗口控制器

  5. 5

    如何在Spark 2.X数据集中创建自定义编码器?

  6. 6

    WCF。在自定义编码器中绑定请求和响应

  7. 7

    WCF。在自定义编码器中绑定请求和响应

  8. 8

    使用NSData自定义编码,初始化但崩溃

  9. 9

    如何在Eclipse下使用自定义Kafka生产者修复NoClassDefFoundError?

  10. 10

    WCF自定义编码器-在运行时分配消息内容类型

  11. 11

    我是否需要通过CDI中的生产者进行所有对象实例化

  12. 12

    使用自定义初始化程序swift以编程方式实例化和推送视图控制器

  13. 13

    WordPress和自定义编码

  14. 14

    无法使用 dockerized 生产者生产到 Kafka

  15. 15

    通过GeneralizedNewtypeDeriving派生实例时使用自定义实例

  16. 16

    如何定义运行使用自定义初始化程序的结构实例时应运行的自定义闭包(SwiftUI)

  17. 17

    SAP Fiori Launchpad 自定义应用程序错误:无法实例化对象:缺少“新”

  18. 18

    无法使用Springboot构建Kafka生产者

  19. 19

    Kafka 生产者和消费者在单独的 EC2 实例中

  20. 20

    实例化自定义Java(NIO)文件系统的问题

  21. 21

    以正确的方式以编程方式实例化自定义视图

  22. 22

    实例化泛型类型以支持自定义对象

  23. 23

    Django:自定义主键的自动实例化?

  24. 24

    与实例化自定义对象数组的混淆

  25. 25

    实例化后,为Slim设置自定义视图

  26. 26

    从GameObject继承的“实例化”自定义类型

  27. 27

    实例化泛型类型以支持自定义对象

  28. 28

    无法在生产中实例化模块

  29. 29

    无法在生产中实例化模块

热门标签

归档