@RefreshScope
@SpringBootApplication
@MapperScan("com.study.organization.mapper")
public class OrganizationServiceApplication {
public static void main(String[] args) {
SpringApplication.run(OrganizationServiceApplication.class, args);
}
@Bean
public Function<String, String> send() {
return value -> value.toUpperCase();
}
}
发布消息
下一步是创建发布消息的逻辑:
@Slf4j
@Component
public class SimpleSourceBean {
private StreamBridge streamBridge;
public void publishOrganizationChange(ActionEnum action,
String organizationId) {
log.debug("Sending Kafka message {} for Organization Id: {}",
action, organizationId);
// Publishes a Java POJO message
OrganizationChangeModel change = new OrganizationChangeModel(
OrganizationChangeModel.class.getTypeName(),
action.toString(),
organizationId,
UserContextHolder.getContext().getCorrelationId()
);
// Sends the message from a channel defined in the Source class
streamBridge.send("send-out-0", change);
}
@Autowired
public void setStreamBridge(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
}
public enum ActionEnum {
GET,
CREATED,
UPDATED,
DELETED
}