在Spring Cloud Stream中混合使用avro和JSON
在 Spring Cloud Stream 中混合使用 avro 序列化 和 普通的 JSON 序列化时遇到了一个奇怪的问题。
主要代码
在上述代码中,根据 avro 的值,发送消息到不同的主题,两个主题中的消息使用了不同的序列化方式:
send-out-0 使用普通的 JSON 序列化,由 Spring 的 MessageConverter 机制进行处理
而 sendAvro-out-0 使用 kafka 生产者的 value serializer 进行序列化
问题
发送消息的次序不同,产生了不同的异常:
分析
为了找出上述问题的原因,使用 IDEA 的 DEBUG 对代码进行了追踪,最后发现出现问题的原因在于 StreamBridge 中的 getStreamBridgeFunction 方法:
private synchronized FunctionInvocationWrapper getStreamBridgeFunction(String outputContentType, ProducerProperties producerProperties) {
if (StringUtils.hasText(outputContentType) && this.streamBridgeFunctionCache.containsKey(outputContentType)) {
return this.streamBridgeFunctionCache.get(outputContentType);
}
else {
FunctionInvocationWrapper functionToInvoke = this.functionCatalog.lookup(STREAM_BRIDGE_FUNC_NAME, outputContentType.toString());
this.streamBridgeFunctionCache.put(outputContentType, functionToInvoke);
functionToInvoke.setSkipOutputConversion(producerProperties.isUseNativeEncoding());
return functionToInvoke;
}
}
上述代码中使用了一个名为 streamBridgeFunctionCache 的内部缓存,缓存的键是 outputContentType,而 avro 和 JSON 序列化方式的 outputContentType 值都是 application/json,最重要的是 skipOutputConversion 字段,两种序列化方式理应是完全不一样的,但是缓存的命中违背了这一要求!
skipOutputConversion 决定了是否对消息的载荷(payload)使用 MessageConverter 进行处理,avro 不能使用该方式进行处理,而 JSON 需要使用 MessageConverter 将对象转换成二进制数组。
消息序列化机制
根据上述内容,可以分析 StreamBridge 中消息序列化的机制如下:
首先,判断 binding 的 useNativeEncoding 值,该配置值赋给了 FunctionInvocationWrapper 的 skipOutputConversion
如果 useNativeEncoding 为 false,则使用 MessageConverter 将消息的载荷序列化成二进制数组(由 JsonMapper 完成)
如果 useNativeEncoding 为 true,不对消息载荷进行处理。
然后,将消息 Message 交给 KafkaTemplate,由 KafkaTemplate 使用配置的 value.serializer 进行序列化,如果没有设置,默认为 ByteArraySerializer。
解决方案
根据上述对问题的分析,只需要名为 sendAvro-out-0 的 binding 下增加 contentType 的配置即可解决问题:
sendAvro-out-0:
destination: orgChangeTopicAvro
contentType: application/*+avro
producer:
useNativeEncoding: true
增加 contentType 配置后,便可以不命中缓存。
Last updated