微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

将 GenericRecord 转换为 SpecificRecord 抛出错误

如何解决将 GenericRecord 转换为 SpecificRecord 抛出错误

我正在尝试使用 avro 序列化将数据发送到 kafka 主题。整个想法是将 Json 数据转换为 avro 并发送到 kafka。我没有在两者之间使用任何 Pojo,只是直接处理 json 并转换为通用数据。我实现了一个自定义的 AvroSerializer。下面显示了教程 here 中使用的序列化程序代码


public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {

  private static final Logger LOGGER = LoggerFactory.getLogger(AvroSerializer.class);

  @Override
  public void close() {
    // No-op
  }

  @Override
  public void configure(Map<String,?> arg0,boolean arg1) {
    // No-op
  }

  @Override
  public byte[] serialize(String topic,T data) {
    try {
      byte[] result = null;

      if (data != null) {
        LOGGER.debug("data='{}'",data);


        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
            EncoderFactory.get().binaryEncoder(byteArrayOutputStream,null);

        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.write(data,binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close();

        result = byteArrayOutputStream.toByteArray();
        LOGGER.debug("serialized data='{}'",DatatypeConverter.printHexBinary(result));
      }
      return result;
    } catch (IOException ex) {
      throw new SerializationException(
          "Can't serialize data='" + data + "' for topic='" + topic + "'",ex);
    }
  }
}

当我使用这个序列化器向 kafka 主题发送通用记录时,它会出现以下错误

org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.specific.SpecificRecord
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) ~[avro-1.8.2.jar:1.8.2]
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) ~[avro-1.8.2.jar:1.8.2]
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) ~[avro-1.8.2.jar:1.8.2]

当我尝试使用以下代码将通用记录转换为特定记录时。

            GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(data.getSchema());
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            Encoder encoder = EncoderFactory.get().binaryEncoder(out,null);
            writer.write(data,encoder);
            encoder.flush();

            byte[] avroData = out.toByteArray();
            out.close();

            SpecificDatumReader<SpecificRecord> reader = new SpecificDatumReader<SpecificRecord>(SpecificRecord.class);
            Decoder decoder = DecoderFactory.get().binaryDecoder(avroData,null);
            SpecificRecord myCustomrecord = reader.read(null,decoder);


这会引发另一个错误

2021-03-11 21:14:04.774 ERROR 10464 --- [nio-8088-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing Failed; nested exception is org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.specific.SpecificRecord] with root cause

org.apache.avro.AvroRuntimeException: Not a Specific class: interface org.apache.avro.specific.SpecificRecord
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) ~[avro-1.8.2.jar:1.8.2]
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) ~[avro-1.8.2.jar:1.8.2]
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) ~[avro-1.8.2.jar:1.8.2]

属性如下所示。

        Properties properties = new Properties();

        // normal producer
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("acks","all");
        properties.setProperty("retries","10");
        // avro part

        properties.setProperty("key.serializer",StringSerializer.class.getName());
        properties.setProperty("value.serializer",AvroSerializer.class.getName());

有人可以建议一种方法来克服这个问题。我已经参考了大多数类似的 stackoverflow 问题,但似乎没有一个我有用。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?