在Spring Cloud Stream中混合使用avro和JSON
在 Spring Cloud Stream 中混合使用 avro 序列化 和 普通的 JSON 序列化时遇到了一个奇怪的问题。
主要代码
yaml配置
spring:
cloud:
function:
definition: send;sendAvro
stream:
bindings:
send-in-0:
autoStartup: false
group: organization
sendAvro-in-0:
autoStartup: false
group: organization
send-out-0:
destination: orgChangeTopic
contentType: application/json
producer:
useNativeEncoding: false
sendAvro-out-0:
destination: orgChangeTopicAvro
producer:
useNativeEncoding: true
kafka:
bindings:
sendAvro-out-0:
producer:
configuration:
schema.registry.url: http://kubernetes:8074
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
binder:
brokers: kubernetes:9094
requiredAcks: all发送消息的代码
@Slf4j
@Component
public class SimpleSourceBean {
private StreamBridge streamBridge;
public void publishOrganizationChange(ActionEnum action,
String organizationId,
boolean avro) {
log.debug("Sending Kafka message {} for Organization Id: {}",
action, organizationId);
if (avro) {
OrganizationChangeModel change = new OrganizationChangeModel(
OrganizationChangeModel.class.getTypeName(),
action.toString(),
organizationId,
UserContextHolder.getContext().getCorrelationId()
);
// Sends the message from a channel defined in the Source class
streamBridge.send("sendAvro-out-0", change);
} else {
// Publishes a Java POJO message
OrganizationChangeModel2 change = new OrganizationChangeModel2(
OrganizationChangeModel2.class.getTypeName(),
action.toString(),
organizationId,
UserContextHolder.getContext().getCorrelationId()
);
// Sends the message from a channel defined in the Source class
streamBridge.send("send-out-0", change);
}
}
@Autowired
public void setStreamBridge(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
}在上述代码中,根据 avro 的值,发送消息到不同的主题,两个主题中的消息使用了不同的序列化方式:
send-out-0 使用普通的 JSON 序列化,由 Spring 的 MessageConverter 机制进行处理
而 sendAvro-out-0 使用 kafka 生产者的 value serializer 进行序列化
问题
发送消息的次序不同,产生了不同的异常:
先使用 send-out-0 发送消息,后使用 sendAvro-out-0 发送消息,遇到的异常
java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "result" is null
at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.doPostProcessResult(CloudEventsFunctionInvocationHelper.java:138) ~[spring-cloud-function-context-4.0.5.jar:4.0.5]
at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:114) ~[spring-cloud-function-context-4.0.5.jar:4.0.5]
at org.springframework.cloud.function.cloudevent.CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:48) ~[spring-cloud-function-context-4.0.5.jar:4.0.5]
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:185) ~[spring-cloud-stream-4.0.4.jar:4.0.4]
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:146) ~[spring-cloud-stream-4.0.4.jar:4.0.4]
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:141) ~[spring-cloud-stream-4.0.4.jar:4.0.4]
at com.study.organization.message.SimpleSourceBean.publishOrganizationChange(SimpleSourceBean.java:35) ~[classes/:na]
...分析
为了找出上述问题的原因,使用 IDEA 的 DEBUG 对代码进行了追踪,最后发现出现问题的原因在于 StreamBridge 中的 getStreamBridgeFunction 方法:
上述代码中使用了一个名为 streamBridgeFunctionCache 的内部缓存,缓存的键是 outputContentType,而 avro 和 JSON 序列化方式的 outputContentType 值都是 application/json,最重要的是 skipOutputConversion 字段,两种序列化方式理应是完全不一样的,但是缓存的命中违背了这一要求!
skipOutputConversion 决定了是否对消息的载荷(payload)使用 MessageConverter 进行处理,avro 不能使用该方式进行处理,而 JSON 需要使用 MessageConverter 将对象转换成二进制数组。
消息序列化机制
根据上述内容,可以分析 StreamBridge 中消息序列化的机制如下:
首先,判断 binding 的 useNativeEncoding 值,该配置值赋给了 FunctionInvocationWrapper 的 skipOutputConversion
如果 useNativeEncoding 为 false,则使用 MessageConverter 将消息的载荷序列化成二进制数组(由 JsonMapper 完成)
如果 useNativeEncoding 为 true,不对消息载荷进行处理。
然后,将消息 Message 交给 KafkaTemplate,由 KafkaTemplate 使用配置的 value.serializer 进行序列化,如果没有设置,默认为 ByteArraySerializer。
解决方案
根据上述对问题的分析,只需要名为 sendAvro-out-0 的 binding 下增加 contentType 的配置即可解决问题:
增加 contentType 配置后,便可以不命中缓存。
Last updated