# 在organization服务中编写消息生产者

每当组织数据被添加、更新或删除时，organization 服务将向Kafka主题发布一条消息，表示发生了组织变更事件。发布的消息将包括与变更事件相关联的组织ID以及发生的操作（添加、更新或删除）。

## 添加依赖

在 pom.xml 中，我们需要添加两个依赖项：

```xml
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
```

## 绑定消息代理

接下来，需要告诉应用程序它将绑定到一个Spring Cloud Stream消息代理。

{% code overflow="wrap" %}

```java
@RefreshScope
@SpringBootApplication
@MapperScan("com.study.organization.mapper")
public class OrganizationServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrganizationServiceApplication.class, args);
    }

    @Bean
    public Function<String, String> send() {
        return value -> value.toUpperCase();
    }
}
```

{% endcode %}

## 发布消息

下一步是创建发布消息的逻辑：

```java
@Slf4j
@Component
public class SimpleSourceBean {

    private StreamBridge streamBridge;

    public void publishOrganizationChange(ActionEnum action,
                                          String organizationId) {
        log.debug("Sending Kafka message {} for Organization Id: {}",
                action, organizationId);
        // Publishes a Java POJO message
        OrganizationChangeModel change = new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action.toString(),
                organizationId,
                UserContextHolder.getContext().getCorrelationId()
        );
        // Sends the message from a channel defined in the Source class
        streamBridge.send("send-out-0", change);
    }

    @Autowired
    public void setStreamBridge(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }
}
```

```java
public enum ActionEnum {
   GET,
   CREATED,
   UPDATED,
   DELETED
}
```

消息的实际发布发生在 publishOrganizationChange() 方法中。该方法构建一个名为 OrganizationChangeModel 的 Java POJO。

> ### <mark style="color:orange;">**在事件中始终包含关联 ID 对于通过我们的服务跟踪和调试消息流非常有帮助。**</mark>

## 配置 binding

```yaml
spring:
  cloud:
    function:
      definition: send
    stream:
      bindings:
        send-out-0:
          destination: orgChangeTopic
          contentType: application/json
      kafka:
        binder:
          brokers: 192.168.10.110:9094
          requiredAcks: all
```

> ## 在消息中放入什么数据？
>
> **在上面的示例中，仅返回了发生更改的组织记录的组织 ID，**&#x4ECE;未在消息中放入数据更改的副本。此外，基于系统事件告知其他服务数据状态已更改，并**始终强制其他服务返回主服务（拥有数据的服务）检索数据的新副本**。
>
> * 从执行时间的角度来看，这种方法更昂贵，但它保证我们始终拥有数据的最新副本。但存在这样一种可能性，在从源系统读取数据后，数据可能会在处理期间发生变化。但是，这种情况发生的可能性较小，远不及直接从队列中获取信息的可能性大。
> * 需要仔细考虑传递的数据量和迟早会遇到的数据“陈旧”问题。这可能是因为问题导致它在消息队列中停留的时间太长，或者之前包含数据的消息发送失败，现在传递的数据表示不一致状态的数据。如果将数据传递给消息，请确保包含日期时间戳或版本号，以便消费数据的服务可以检查传递给它的数据，并确保其不比其已有的数据副本更旧。
>
> **在消息中放入什么数据取决于系统的具体要求和约束。**


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://bohans.gitbook.io/ji-chu/spring/group-1/spring-cloud/spring-cloud-stream/bian-xie-yi-ge-jian-dan-de-xiao-xi-sheng-chan-zhe-he-xiao-fei-zhe/zai-organization-fu-wu-zhong-bian-xie-xiao-xi-sheng-chan-zhe.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
