使用avro
部署schema registry
schema-registry:
image: docker.io/bitnami/schema-registry:7.5
restart: always
ports:
- '8074:8081'
depends_on:
- kafka
environment:
- SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
- SCHEMA_REGISTRY_KAFKA_BROKERS=PLAINTEXT://kafka:9092
添加依赖
dependencise
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
repositories
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
plugins
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<configuration>
<sourceDirectory>src/main/resources/avro</sourceDirectory>
<outputDirectory>src/main/java</outputDirectory>
</configuration>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
</execution>
</executions>
</plugin>
注意
为了让 pom.xml 中配置的仓库地址能够生效,需要修改 Maven 的 settings 文件,让镜像不要代理 confluent 仓库,如下所示:
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*,!confluent</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>
使用avro插件生成类
avsc 文件
{
"namespace": "com.study.organization.model",
"type": "record",
"name": "OrganizationChangeModel",
"fields": [
{
"name": "typeName",
"type": "string"
},
{
"name": "action",
"type": "string"
},
{
"name": "organizationId",
"type": "string"
},
{
"name": "correlationId",
"type": ["null", "string"],
"default": null
}
]
}
执行命令:
mvn avro:schema
配置 channel
生产者
spring:
cloud:
function:
definition: send
stream:
bindings:
send-out-0:
destination: orgChangeTopic
producer:
useNativeEncoding: true
kafka:
bindings:
send-out-0:
producer:
configuration:
schema.registry.url: http://192.168.10.110:8074
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
binder:
brokers: 192.168.10.110:9094
requiredAcks: all
消费者
spring:
cloud:
function:
definition: consume
stream:
bindings:
consume-in-0:
destination: orgChangeTopic
group: license
contentType: application/*+avro
consumer:
useNativeDecoding: true
kafka:
bindings:
consume-in-0:
consumer:
configuration:
schema.registry.url: http://192.168.10.110:8074
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
binder:
brokers: 192.168.10.110:9094
requiredAcks: all
Last updated