使用avro

部署schema registry

schema-registry:
  image: docker.io/bitnami/schema-registry:7.5
  restart: always
  ports:
    - '8074:8081'
  depends_on:
    - kafka
  environment:
    - SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
    - SCHEMA_REGISTRY_KAFKA_BROKERS=PLAINTEXT://kafka:9092

添加依赖

properties
<confluent.version>5.2.0</confluent.version>
<avro.version>1.11.3</avro.version>
dependencise
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>${avro.version}</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-avro-serde</artifactId>
    <version>${confluent.version}</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>${confluent.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>
repositories
<repository>
    <id>confluent</id>
    <url>https://packages.confluent.io/maven/</url>
</repository>
plugins
<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>${avro.version}</version>
    <configuration>
        <sourceDirectory>src/main/resources/avro</sourceDirectory>
        <outputDirectory>src/main/java</outputDirectory>
    </configuration>
    <executions>
        <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
                <goal>protocol</goal>
                <goal>idl-protocol</goal>
            </goals>
        </execution>
    </executions>
</plugin>

注意

为了让 pom.xml 中配置的仓库地址能够生效,需要修改 Maven 的 settings 文件,让镜像不要代理 confluent 仓库,如下所示:

<mirror>
  <id>aliyunmaven</id>
  <mirrorOf>*,!confluent</mirrorOf>
  <name>阿里云公共仓库</name>
  <url>https://maven.aliyun.com/repository/public</url>
</mirror>

使用avro插件生成类

avsc 文件
{
  "namespace": "com.study.organization.model",
  "type": "record",
  "name": "OrganizationChangeModel",
  "fields": [
    {
      "name": "typeName",
      "type": "string"
    },
    {
      "name": "action",
      "type": "string"
    },
    {
      "name": "organizationId",
      "type": "string"
    },
    {
      "name": "correlationId",
      "type": ["null", "string"],
      "default": null
    }
  ]
}

执行命令:

mvn avro:schema

配置 channel

生产者

spring:
  cloud:
    function:
      definition: send
    stream:
      bindings:
        send-out-0:
          destination: orgChangeTopic
          producer:
            useNativeEncoding: true
      kafka:
        bindings:
          send-out-0:
            producer:
              configuration:
                schema.registry.url: http://192.168.10.110:8074
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        binder:
          brokers: 192.168.10.110:9094
          requiredAcks: all

消费者

spring:
  cloud:
    function:
      definition: consume
    stream:
      bindings:
        consume-in-0:
          destination: orgChangeTopic
          group: license
          contentType: application/*+avro
          consumer:
            useNativeDecoding: true
      kafka:
        bindings:
          consume-in-0:
            consumer:
              configuration:
                schema.registry.url: http://192.168.10.110:8074
                value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
                specific.avro.reader: true
        binder:
          brokers: 192.168.10.110:9094
          requiredAcks: all

Last updated