diff --git a/.agents/skills/flyway-decoupled/SKILL.md b/.agents/skills/flyway-decoupled/SKILL.md
new file mode 100644
index 0000000..582366a
--- /dev/null
+++ b/.agents/skills/flyway-decoupled/SKILL.md
@@ -0,0 +1,54 @@
+---
+name: flyway-decoupled
+description: Use when changing schema migrations, Flyway container wiring, or startup ordering for the template's decoupled migration flow.
+---
+
+# Flyway Decoupled
+
+Use when a schema or startup change must preserve this repository's pattern of running Flyway outside the Spring Boot process.
+
+## Destination
+
+- `resources/flyway/db/migration/*.sql`
+- `resources/flyway/pom.xml`
+- `resources/flyway/run-migration.sh`
+- `Dockerfile`
+- `.docker-compose-local/application.yaml`
+- `.docker-compose-local/infrastructure.yaml`
+- `README.md` only when developer-facing execution guidance changes
+
+## Use For
+
+- adding new versioned SQL migrations
+- changing Flyway runner commands, locations, or connection variables
+- adjusting startup ordering between the database, migration step, and application
+- keeping local and deployment-like migration flows reproducible
+
+## Inputs
+
+1. Schema change or migration behavior to implement
+2. Expected database URL, credentials, and migration location
+3. Whether the change affects local startup, deployment startup, or both
+4. Compatibility, rollback, or rollout expectations
+
+## Workflow
+
+1. Add or update versioned SQL files under `resources/flyway/db/migration/` using Flyway naming.
+2. Keep migration execution decoupled from the Spring Boot startup path; prefer the dedicated runner script or migration service over in-app hooks.
+3. Align `resources/flyway/pom.xml`, `run-migration.sh`, and compose environment variables when connection settings or migration locations change.
+4. Preserve explicit startup ordering so the application waits for the migration step in `.docker-compose-local/application.yaml`.
+5. Keep `.docker-compose-local/infrastructure.yaml` and the application startup flow coherent when the local stack also needs Flyway changes.
+6. If the work changes operational recovery or rollout procedure, route to `docs-runbook`; if it changes long-lived migration strategy, route to `docs-design-doc` or `docs-adr`.
+
+## Output Shape
+
+1. Versioned migration files
+2. Minimal wiring changes for Dockerfile, compose, or Flyway runner files
+3. README or runbook follow-up only when operator behavior changed
+
+## Rules
+
+- Do not move schema migration responsibility back into the application startup unless the architecture intentionally changes.
+- Keep migrations append-only and versioned; prefer new scripts over rewriting applied ones.
+- Keep connection settings and migration locations aligned across compose files and runner scripts.
+- Use `make run-all` as the default validation target when startup wiring changes.
diff --git a/.agents/skills/observability/SKILL.md b/.agents/skills/observability/SKILL.md
new file mode 100644
index 0000000..a8ad0c5
--- /dev/null
+++ b/.agents/skills/observability/SKILL.md
@@ -0,0 +1,54 @@
+---
+name: observability
+description: Use when changing the local OpenTelemetry, Prometheus, Grafana, or Jaeger stack and its wiring to the application.
+---
+
+# Observability
+
+Use when telemetry collection, export, or local observability services change and the template's metrics and traces setup must stay coherent.
+
+## Destination
+
+- `.docker-compose-local/observability.yaml`
+- `.docker-compose-local/config/otel-collector.yaml`
+- `.docker-compose-local/config/prometheus.yaml`
+- `.docker-compose-local/config/ds-prometheus.yaml`
+- `.docker-compose-local/application.yaml`
+- `README.md` only when access URLs or startup guidance changes
+
+## Use For
+
+- changing OTLP endpoints or application telemetry environment variables
+- adjusting OpenTelemetry Collector pipelines, exporters, or processors
+- changing Prometheus scraping or remote-write configuration
+- updating Grafana datasource wiring or Jaeger exposure
+- troubleshooting missing local metrics, traces, or dashboards
+
+## Inputs
+
+1. The affected signal or user-visible behavior: metrics, traces, dashboards, or collector flow
+2. The config or service being changed
+3. Expected URLs, ports, networks, and dependencies
+4. Any application environment changes required for the new telemetry path
+
+## Workflow
+
+1. Start from `.docker-compose-local/application.yaml` and confirm how the app points to OTLP endpoints.
+2. Follow the signal path through `.docker-compose-local/config/otel-collector.yaml`.
+3. Keep downstream exporters and services aligned in `.docker-compose-local/observability.yaml`.
+4. Preserve Grafana `:3000`, Prometheus `:9090`, Jaeger `:16686`, and OTLP `:4317/:4318` access paths unless a deliberate change is required.
+5. Keep Prometheus and Grafana configuration in sync with the collector and service ports.
+6. If the change affects operational response or support workflows, route to `docs-runbook`; if it changes cross-cutting platform design, route to `docs-design-doc`.
+
+## Output Shape
+
+1. Minimal stack or config changes across application wiring, collector, Prometheus, Grafana, or Jaeger
+2. A clear telemetry path that remains coherent end to end
+3. README or runbook updates only when developer-visible behavior changed
+
+## Rules
+
+- Keep the telemetry path coherent end to end: app -> OTEL collector -> Jaeger or Prometheus -> Grafana.
+- Prefer config-as-code under `.docker-compose-local/` over manual container tweaks.
+- Do not change developer-facing ports casually; update docs when you do.
+- Use `make run-observability` as the default validation target, plus `make run-app` when application wiring changes.
diff --git a/AGENTS.md b/AGENTS.md
index 226ec23..943841b 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -62,6 +62,8 @@ Use these repository-native skills for common delivery workflows. For new produc
- `.agents/skills/archunit-guard/SKILL.md` for diagnosing or preserving architecture rules enforced by ArchUnit
- `.agents/skills/acceptance-scenario-scaffold/SKILL.md` for black-box acceptance coverage in the `acceptance-test/` module
- `.agents/skills/api-doc-auditor/SKILL.md` for keeping OpenAPI and AsyncAPI output aligned with source code
+- `.agents/skills/flyway-decoupled/SKILL.md` for decoupled database migration scripts, runner wiring, and startup ordering
+- `.agents/skills/observability/SKILL.md` for local OpenTelemetry, Prometheus, Grafana, and Jaeger stack wiring
## Documentation Conventions
diff --git a/README.md b/README.md
index d80b889..68dbcba 100644
--- a/README.md
+++ b/README.md
@@ -6,9 +6,10 @@
Here you will describe this project, what it does, and its goals, making it clear to everyone. Example:
-The **Java Architecture Template** is a project designed to serve as a template for creating applications, aiming for development with exceptional
-technical quality to ensure long-term maintainability.
-In this template, we provide a user registration endpoint that triggers an event in the broker when a user is registered. A listener will receive these creation events and enrich them with address data.
+The **Java Architecture Template** is a project designed to serve as a template for creating applications, aiming for development with exceptional
+technical quality to ensure long-term maintainability.
+In this template, we provide a user registration endpoint that publishes a **CloudEvent** to Kafka when a user is registered. A listener consumes
+CloudEvents of type `br.com.helpdev.sample.user.created` and enriches the user with address data.
📖 Read this in:
- 🇧🇷 [Português](README.pt.md)
@@ -16,6 +17,8 @@ In this template, we provide a user registration endpoint that triggers an event
## **Architecture**
+**Related skill:** [`hexagon-scaffold`](.agents/skills/hexagon-scaffold/SKILL.md) for adding feature slices that follow the template's hexagonal structure.
+
This project follows the **Hexagonal Architecture**, as proposed by Alistair Cockburn, focuses on **decoupling the application’s core business logic from its input and output
mechanisms**. This design principle promotes adaptability, testability, and sustainability by encapsulating the application layer (business core) and
exposing defined ports for interactions with external systems.
@@ -64,6 +67,8 @@ acceptance-test
### **Architecture Tests**
+**Related skill:** [`archunit-guard`](.agents/skills/archunit-guard/SKILL.md) for preserving and evolving the repository's architecture rules safely.
+
This architecture is warranted by [**ArchUnit tests**](application/src/test/java/br/com/helpdev/sample/ArchitectureTest.java) to ensure the project's
compliance with the defined structure.
These tests validate the project's adherence to the Hexagonal Architecture principles,
@@ -75,6 +80,8 @@ _Read more about: [Garantindo a arquitetura de uma aplicação sem complexidade]
### **Acceptance Tests**
+**Related skill:** [`acceptance-scenario-scaffold`](.agents/skills/acceptance-scenario-scaffold/SKILL.md) for Docker-based black-box scenarios in the `acceptance-test/` module.
+
To ensure robust testing, the **acceptance-test** module encapsulates the application within a Docker image and executes integration tests in an
environment that closely mimics the real-world behavior of the application. This approach guarantees homogeneity in the application modules by
restricting unit tests to the main application module, while handling integration tests separately within the acceptance-test module.
@@ -144,6 +151,8 @@ environment.
### **The Flyway Database Migration Tool**
+**Related skill:** [`flyway-decoupled`](.agents/skills/flyway-decoupled/SKILL.md) for versioned migrations, Flyway runner wiring, and startup ordering.
+
To ensure better startup performance and avoid concurrency issues in Kubernetes environments, **Flyway** has been implemented as a decoupled database
migration tool. This design enables the migration process to run independently of the application.
@@ -159,6 +168,9 @@ This approach enhances deployment reliability and maintains a clean separation o
You see the sample how to execute in: [application docker-compose file](.docker-compose-local/application.yaml).
### **OpenAPI**
+
+**Related skill:** [`api-doc-auditor`](.agents/skills/api-doc-auditor/SKILL.md) for keeping generated API documentation aligned with the source code.
+
This project uses **Springdoc OpenAPI** to automatically document REST endpoints.
🔗 [Official OpenAPI site](https://swagger.io/specification/)
@@ -170,7 +182,11 @@ After starting the application, access:
- **OpenAPI specification in JSON**: [http://localhost:8080/v3/api-docs](http://localhost:8080/v3/api-docs)
### **AsyncAPI**
+
+**Related skill:** [`api-doc-auditor`](.agents/skills/api-doc-auditor/SKILL.md) for keeping async contracts and generated docs aligned with the source code.
+
This project uses **Springwolf** to document asynchronous events (Kafka, RabbitMQ, etc.) with **AsyncAPI**.
+Kafka messages on the `user-events` topic follow the **CloudEvents structured JSON** format (`application/cloudevents+json`).
🔗 [Official AsyncAPI site](https://www.asyncapi.com/)
@@ -187,6 +203,8 @@ functionalities:
#### Observability Services
+**Related skill:** [`observability`](.agents/skills/observability/SKILL.md) for the local OpenTelemetry, Prometheus, Grafana, and Jaeger stack.
+
See the stack: [docker-compose-observability.yaml](.docker-compose-local/observability.yaml)
- **Grafana**: Visualization and monitoring dashboard, available at [http://localhost:3000](http://localhost:3000).
@@ -204,12 +222,28 @@ These services are orchestrated using Docker Compose to ensure seamless setup an
-## **Architectural Decision Records (ADR)**
+## **Docs**
+
+**Related skills:** [`docs`](.agents/skills/docs/SKILL.md) as the entrypoint, then [`docs-spec`](.agents/skills/docs-spec/SKILL.md), [`docs-adr`](.agents/skills/docs-adr/SKILL.md), [`docs-design-doc`](.agents/skills/docs-design-doc/SKILL.md), [`docs-runbook`](.agents/skills/docs-runbook/SKILL.md), and [`docs-selective-persistence`](.agents/skills/docs-selective-persistence/SKILL.md).
+
+This template treats documentation as an active part of delivery, guided by repository skills instead of ad-hoc files. The entry point is
+`.agents/skills/docs/SKILL.md`, which decides whether the change needs durable documentation, whether an existing document should be updated, or whether
+explicitly no new document is needed.
+
+The flow works like this:
+
+1. Clarify scope first through definition, spec, or plan mode.
+2. Route the work to the right artifact:
+ - **Spec** for scope, scenarios, constraints, and acceptance criteria (`docs/specs/` when created).
+ - **ADR** for long-lived architectural decisions and guardrails in [`docs/adr/`](./docs/adr/README.md).
+ - **Design Doc** for non-trivial structure, integrations, migrations, or risks in [`docs/design/`](./docs/design/README.md).
+ - **Runbook** for operations, rollout, rollback, support, and incident handling (`docs/runbooks/` when created).
+3. When required, ADRs and design docs are written and aligned before implementation starts.
+4. After planning or execution, `docs-selective-persistence` decides what remains durable and what should stay transient.
-The project includes a dedicated folder for **Architectural Decision Records (ADR)**, located in the `docs/adr` directory. This folder documents key
-architectural decisions made throughout the project, providing context, rationale, and implications for these choices.
+This keeps documentation lean, decision-oriented, and connected to execution instead of turning every discussion into a permanent artifact.
-To learn more about the ADRs and explore the documented decisions, refer to the [ADR README](./docs/adr/README.md).
+Read more about this perspective in [Documentacao na era da IA: quando a documentacao vira contexto de execucao](https://medium.com/@guilherme.zarelli/documenta%C3%A7%C3%A3o-na-era-da-ia-quando-a-documenta%C3%A7%C3%A3o-vira-contexto-de-execu%C3%A7%C3%A3o-cb8d6fdf84ed) _(Portuguese)_.
## **Contribute**
diff --git a/README.pt.md b/README.pt.md
index beb5eb9..3fde0ea 100644
--- a/README.pt.md
+++ b/README.pt.md
@@ -6,8 +6,9 @@
Aqui você deve descrever seu projeto, seu funcionamento e seus objetivos, tornando-o claro para todos. Exemplo:
-O **Java Architecture Template** é um projeto criado para servir como modelo na criação de aplicações, visando um desenvolvimento com **qualidade técnica excepcional** para garantir **manutenção a longo prazo**.
-Neste template, fornecemos um **endpoint de cadastro de usuário**, que **dispara um evento no broker** quando um usuário é registrado. Um **listener recebe esses eventos** de criação e os enriquece com dados de endereço.
+O **Java Architecture Template** é um projeto criado para servir como modelo na criação de aplicações, visando um desenvolvimento com **qualidade técnica excepcional** para garantir **manutenção a longo prazo**.
+Neste template, fornecemos um **endpoint de cadastro de usuário** que publica um **CloudEvent** no Kafka quando um usuário é registrado. Um
+**listener consome CloudEvents** do tipo `br.com.helpdev.sample.user.created` e enriquece o usuário com dados de endereço.
📚 Leia em:
- 🇬🇧 [English](README.md)
@@ -15,6 +16,8 @@ Neste template, fornecemos um **endpoint de cadastro de usuário**, que **dispar
## **Architecture**
+**Skill relacionada:** [`hexagon-scaffold`](.agents/skills/hexagon-scaffold/SKILL.md) para adicionar novas fatias de funcionalidade seguindo a estrutura hexagonal do template.
+
Este projeto segue a **Arquitetura Hexagonal**, conforme proposta por **Alistair Cockburn**, focando em **desacoplar a lógica de negócio principal da aplicação de seus mecanismos de entrada e saída**. Esse princípio de design promove **adaptabilidade, testabilidade e sustentabilidade**, encapsulando a camada de aplicação (núcleo de negócio) e expondo portas definidas para interação com sistemas externos.
@@ -61,6 +64,8 @@ acceptance-test
### **Architecture Tests**
+**Skill relacionada:** [`archunit-guard`](.agents/skills/archunit-guard/SKILL.md) para preservar e evoluir com segurança as regras de arquitetura do repositório.
+
Esta arquitetura é garantida por meio de testes **ArchUnit**, que validam a conformidade do projeto com os princípios da Arquitetura Hexagonal, assegurando a separação de responsabilidades e a independência da lógica de negócio central em relação aos sistemas externos.
_Read more about: [Garantindo a arquitetura de uma aplicação sem complexidade](https://medium.com/luizalabs/garantindo-a-arquitetura-de-uma-aplica%C3%A7%C3%A3o-sem-complexidade-6f675653799c)_
@@ -69,6 +74,8 @@ _Read more about: [Garantindo a arquitetura de uma aplicação sem complexidade]
### **Acceptance Tests**
+**Skill relacionada:** [`acceptance-scenario-scaffold`](.agents/skills/acceptance-scenario-scaffold/SKILL.md) para cenários black-box baseados em Docker no módulo `acceptance-test/`.
+
Para garantir testes robustos, o módulo **acceptance-test** encapsula a aplicação dentro de uma imagem Docker e executa testes de integração em um ambiente que imita de perto o comportamento real da aplicação. Essa abordagem garante a homogeneidade nos módulos da aplicação ao restringir os testes unitários ao módulo principal, enquanto lida com testes de integração separadamente no módulo acceptance-test.
Esta separação garante:
@@ -134,6 +141,8 @@ Esta configuração garante uma experiência de desenvolvimento eficiente e cons
### **The Flyway Database Migration Tool**
+**Skill relacionada:** [`flyway-decoupled`](.agents/skills/flyway-decoupled/SKILL.md) para migrações versionadas, wiring do Flyway e ordenação de startup.
+
Para garantir um melhor desempenho de inicialização e evitar problemas de concorrência em ambientes Kubernetes, o **Flyway** foi implementado como uma ferramenta de migração de banco de dados desacoplada. Este design permite que o processo de migração seja executado de forma independente da aplicação.
Principais Características:
@@ -147,6 +156,8 @@ Essa abordagem melhora a confiabilidade da implantação e mantém uma separaç
Você pode ver um exemplo de como executar em: [arquivo docker-compose da aplicação](.docker-compose-local/application.yaml).
### **OpenAPI**
+**Skill relacionada:** [`api-doc-auditor`](.agents/skills/api-doc-auditor/SKILL.md) para manter a documentação gerada da API alinhada com o código-fonte.
+
Este projeto utiliza o **Springdoc OpenAPI** para documentar automaticamente os endpoints REST.
🔗 [Site oficial da OpenAPI](https://swagger.io/specification/)
@@ -158,7 +169,10 @@ Após iniciar a aplicação, acesse:
- **Especificação OpenAPI em JSON**: [http://localhost:8080/v3/api-docs](http://localhost:8080/v3/api-docs)
### **AsyncAPI**
+**Skill relacionada:** [`api-doc-auditor`](.agents/skills/api-doc-auditor/SKILL.md) para manter contratos assíncronos e documentação gerada alinhados com o código-fonte.
+
Este projeto utiliza o **Springwolf** para documentar eventos assíncronos (Kafka, RabbitMQ, etc.) com **AsyncAPI**.
+As mensagens Kafka no tópico `user-events` seguem o formato **CloudEvents structured JSON** (`application/cloudevents+json`).
🔗 [Site oficial da AsyncAPI](https://www.asyncapi.com/)
@@ -175,6 +189,8 @@ essenciais:
#### Observability Services
+**Skill relacionada:** [`observability`](.agents/skills/observability/SKILL.md) para a stack local de OpenTelemetry, Prometheus, Grafana e Jaeger.
+
Veja a stack: [docker-compose-observability.yaml](.docker-compose-local/observability.yaml)
- **Grafana**: Visualization and monitoring dashboard, available at [http://localhost:3000](http://localhost:3000).
@@ -191,11 +207,28 @@ Veja a stack: [docker-compose-infrastructure.yaml](.docker-compose-local/infrast
Esses serviços são orquestrados usando o Docker Compose para garantir configuração e operação perfeitas em um ambiente de desenvolvimento local.
-## **Architectural Decision Records (ADR)**
-O projeto inclui uma pasta dedicada para **Registros de Decisões Arquiteturais (ADR)**, localizada no diretório `docs/adr`. Esta pasta documenta as principais
-decisões arquiteturais tomadas ao longo do projeto, fornecendo contexto, justificativa e implicações para essas escolhas.
+## **Docs**
+
+**Skills relacionadas:** [`docs`](.agents/skills/docs/SKILL.md) como entrada, depois [`docs-spec`](.agents/skills/docs-spec/SKILL.md), [`docs-adr`](.agents/skills/docs-adr/SKILL.md), [`docs-design-doc`](.agents/skills/docs-design-doc/SKILL.md), [`docs-runbook`](.agents/skills/docs-runbook/SKILL.md) e [`docs-selective-persistence`](.agents/skills/docs-selective-persistence/SKILL.md).
+
+Este template trata a documentação como uma parte ativa da entrega, guiada por skills do repositório em vez de arquivos criados de forma ad hoc. O ponto
+de entrada é `.agents/skills/docs/SKILL.md`, que decide se a mudança precisa de documentação durável, se um documento existente deve ser atualizado, ou se
+explicitamente nenhum novo documento é necessário.
+
+O fluxo funciona assim:
+
+1. Primeiro, o escopo é esclarecido por definição, spec ou plan mode.
+2. Depois, a necessidade é roteada para o artefato correto:
+ - **Spec** para escopo, cenários, restrições e critérios de aceite (`docs/specs/`, quando existir).
+ - **ADR** para decisões arquiteturais duráveis e guardrails em [`docs/adr/`](./docs/adr/README.md).
+ - **Design Doc** para estrutura não trivial, integrações, migrações ou riscos em [`docs/design/`](./docs/design/README.md).
+ - **Runbook** para operação, rollout, rollback, suporte e incidentes (`docs/runbooks/`, quando existir).
+3. Quando necessários, ADRs e design docs são escritos e alinhados antes do início da implementação.
+4. Após o planejamento ou a execução, `docs-selective-persistence` decide o que permanece durável e o que deve continuar transitório.
+
+Com isso, a documentação fica enxuta, orientada a decisão e conectada à execução, em vez de transformar toda conversa em artefato permanente.
-Para saber mais sobre os ADRs e explorar as decisões documentadas, consulte o [README do ADR](./docs/adr/README.md).
+Leia mais em [Documentação na era da IA: quando a documentação vira contexto de execução](https://medium.com/@guilherme.zarelli/documenta%C3%A7%C3%A3o-na-era-da-ia-quando-a-documenta%C3%A7%C3%A3o-vira-contexto-de-execu%C3%A7%C3%A3o-cb8d6fdf84ed).
## **Contribua**
diff --git a/application/pom.xml b/application/pom.xml
index fc3268c..095f721 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -17,6 +17,7 @@
2024.0.0
3.4.1
1.3.0
+ 4.0.2
8.4.0
2.8.1
1.9.0
@@ -52,6 +53,18 @@
${springwolf.version}
+
+
+ io.cloudevents
+ cloudevents-core
+ ${cloudevents.version}
+
+
+ io.cloudevents
+ cloudevents-json-jackson
+ ${cloudevents.version}
+
+
org.springframework.cloud
diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListener.java b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListener.java
index b6513d7..8cd6a48 100644
--- a/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListener.java
+++ b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListener.java
@@ -1,11 +1,17 @@
package br.com.helpdev.sample.adapters.input.kafka;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
import java.util.UUID;
import io.github.springwolf.bindings.kafka.annotations.KafkaAsyncOperationBinding;
import io.github.springwolf.core.asyncapi.annotations.AsyncListener;
import io.github.springwolf.core.asyncapi.annotations.AsyncMessage;
import io.github.springwolf.core.asyncapi.annotations.AsyncOperation;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.provider.EventFormatProvider;
+import io.cloudevents.jackson.JsonFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -14,9 +20,9 @@
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Controller;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import br.com.helpdev.sample.adapters.input.kafka.dto.UserEventDataDto;
import br.com.helpdev.sample.adapters.input.kafka.dto.UserEventDto;
import br.com.helpdev.sample.core.ports.input.UserEnricherPort;
@@ -25,6 +31,9 @@ class UserEventListener {
private static final String TOPIC_NAME = "user-events";
+ private static final EventFormat EVENT_FORMAT = Objects.requireNonNull(
+ EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE));
+
private final Logger logger = LoggerFactory.getLogger(UserEventListener.class);
private final ObjectMapper objectMapper;
@@ -41,8 +50,8 @@ class UserEventListener {
description = "Listen for user events",
message = @AsyncMessage(
name = "UserEventDto",
- contentType = "application/json",
- messageId = "uuid"
+ contentType = JsonFormat.CONTENT_TYPE,
+ messageId = "id"
),
headers = @AsyncOperation.Headers(
notUsed = true
@@ -52,16 +61,22 @@ class UserEventListener {
@KafkaAsyncOperationBinding(bindingVersion = "1.0.0")
@KafkaListener(topics = TOPIC_NAME)
@RetryableTopic(attempts = "3", backoff = @Backoff(delay = 1000, maxDelay = 10000, multiplier = 2), autoCreateTopics = "true")
- public void listen(final String message) throws JsonProcessingException {
- final var userEventDto = objectMapper.readValue(message, UserEventDto.class);
+ public void listen(final String message) throws IOException {
+ final var cloudEvent = EVENT_FORMAT.deserialize(message.getBytes(StandardCharsets.UTF_8));
+
+ if (UserEventDto.EVENT_TYPE_CREATED.equals(cloudEvent.getType())) {
+ final var eventData = cloudEvent.getData();
+ if (eventData == null) {
+ throw new IllegalArgumentException("CloudEvent data is required for user events");
+ }
+ final var userEventData = objectMapper.readValue(eventData.toBytes(), UserEventDataDto.class);
- if (UserEventDto.EVENT_CREATED.equals(userEventDto.event())) {
- userEnricherPort.enrichUser(UUID.fromString(userEventDto.uuid()));
- logger.info("User enriched: {}", userEventDto.uuid());
+ userEnricherPort.enrichUser(UUID.fromString(userEventData.userUuid()));
+ logger.info("User enriched: {}", userEventData.userUuid());
return;
}
- logger.info("User event ignored to enrich process: {}", userEventDto.event());
+ logger.info("User event ignored to enrich process: {}", cloudEvent.getType());
}
}
diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDataDto.java b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDataDto.java
new file mode 100644
index 0000000..aabcfe4
--- /dev/null
+++ b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDataDto.java
@@ -0,0 +1,8 @@
+package br.com.helpdev.sample.adapters.input.kafka.dto;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
+public record UserEventDataDto(
+ @Schema(title = "User UUID", example = "f0f8cf3e-e856-4d61-a613-44f5df7742ca") String userUuid)
+{
+}
diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDto.java b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDto.java
index ce01235..eace819 100644
--- a/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDto.java
+++ b/application/src/main/java/br/com/helpdev/sample/adapters/input/kafka/dto/UserEventDto.java
@@ -1,12 +1,19 @@
package br.com.helpdev.sample.adapters.input.kafka.dto;
+import java.time.OffsetDateTime;
+
import io.swagger.v3.oas.annotations.media.Schema;
public record UserEventDto(
- @Schema(title = "Event", example = "CREATED|UPDATED") String event,
- @Schema(title = "UUID", example = "uuid") String uuid)
+ @Schema(title = "CloudEvent Spec Version", example = "1.0") String specversion,
+ @Schema(title = "CloudEvent Identifier", example = "ef5f318c-4b7c-4fd7-a661-56293d8b8a91") String id,
+ @Schema(title = "CloudEvent Source", example = "urn:helpdev:sample:user") String source,
+ @Schema(title = "CloudEvent Type", example = "br.com.helpdev.sample.user.created") String type,
+ @Schema(title = "CloudEvent Time", example = "2026-04-12T21:33:04Z") OffsetDateTime time,
+ @Schema(title = "CloudEvent Data Content Type", example = "application/json") String datacontenttype,
+ @Schema(title = "CloudEvent Data") UserEventDataDto data)
{
- public static final String EVENT_CREATED = "CREATED";
+ public static final String EVENT_TYPE_CREATED = "br.com.helpdev.sample.user.created";
}
diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEvent.java b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEvent.java
index 72dfb73..5a071ed 100644
--- a/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEvent.java
+++ b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEvent.java
@@ -1,27 +1,16 @@
package br.com.helpdev.sample.adapters.output.kafka;
-import java.util.UUID;
+import java.time.OffsetDateTime;
import io.swagger.v3.oas.annotations.media.Schema;
public record UserEvent(
- @Schema(title = "Event", example = "CREATED|UPDATED") String event,
- @Schema(title = "UUID", example = "uuid") String uuid) {
-
- public static final String EVENT_CREATED = "CREATED";
-
- public static final String EVENT_UPDATED = "UPDATED";
-
- public static UserEvent ofCreated(UUID uuid) {
- return new UserEvent(EVENT_CREATED, uuid.toString());
- }
-
- public static UserEvent ofUpdated(UUID uuid) {
- return new UserEvent(EVENT_UPDATED, uuid.toString());
- }
-
- public String toJson() {
- return String.format("{\"event\":\"%s\",\"uuid\":\"%s\"}", event, uuid);
- }
-
+ @Schema(title = "CloudEvent Spec Version", example = "1.0") String specversion,
+ @Schema(title = "CloudEvent Identifier", example = "ef5f318c-4b7c-4fd7-a661-56293d8b8a91") String id,
+ @Schema(title = "CloudEvent Source", example = "urn:helpdev:sample:user") String source,
+ @Schema(title = "CloudEvent Type", example = "br.com.helpdev.sample.user.created") String type,
+ @Schema(title = "CloudEvent Time", example = "2026-04-12T21:33:04Z") OffsetDateTime time,
+ @Schema(title = "CloudEvent Data Content Type", example = "application/json") String datacontenttype,
+ @Schema(title = "CloudEvent Data") UserEventData data)
+{
}
diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventData.java b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventData.java
new file mode 100644
index 0000000..f0524d4
--- /dev/null
+++ b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventData.java
@@ -0,0 +1,8 @@
+package br.com.helpdev.sample.adapters.output.kafka;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
+public record UserEventData(
+ @Schema(title = "User UUID", example = "f0f8cf3e-e856-4d61-a613-44f5df7742ca") String userUuid)
+{
+}
diff --git a/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcher.java b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcher.java
index d32820d..c37f9bf 100644
--- a/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcher.java
+++ b/application/src/main/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcher.java
@@ -1,13 +1,28 @@
package br.com.helpdev.sample.adapters.output.kafka;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Objects;
+import java.util.UUID;
+
import io.github.springwolf.bindings.kafka.annotations.KafkaAsyncOperationBinding;
import io.github.springwolf.core.asyncapi.annotations.AsyncMessage;
import io.github.springwolf.core.asyncapi.annotations.AsyncOperation;
import io.github.springwolf.core.asyncapi.annotations.AsyncPublisher;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.provider.EventFormatProvider;
+import io.cloudevents.jackson.JsonFormat;
+import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import br.com.helpdev.sample.core.domain.entities.User;
import br.com.helpdev.sample.core.ports.output.UserEventDispatcherPort;
@@ -16,37 +31,67 @@ class UserEventDispatcher implements UserEventDispatcherPort {
private static final String USER_EVENTS_TOPIC = "user-events";
+ private static final String CLOUD_EVENT_SOURCE = "urn:helpdev:sample:user";
+
+ private static final String USER_CREATED_EVENT_TYPE = "br.com.helpdev.sample.user.created";
+
+ private static final String USER_ADDRESS_UPDATED_EVENT_TYPE = "br.com.helpdev.sample.user.address.updated";
+
+ private static final EventFormat EVENT_FORMAT = Objects.requireNonNull(
+ EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE));
+
private final KafkaTemplate kafkaProducer;
- UserEventDispatcher(final KafkaTemplate kafkaProducer) {
+ private final ObjectMapper objectMapper;
+
+ UserEventDispatcher(final KafkaTemplate kafkaProducer, final ObjectMapper objectMapper) {
this.kafkaProducer = kafkaProducer;
+ this.objectMapper = objectMapper;
}
@Override
public void sendUserCreatedEvent(final User user) {
- publish(user, UserEvent.ofCreated(user.uuid()));
+ publish(user, USER_CREATED_EVENT_TYPE);
}
@Override
public void sendUserAddressUpdatedEvent(final User user) {
- publish(user, UserEvent.ofUpdated(user.uuid()));
+ publish(user, USER_ADDRESS_UPDATED_EVENT_TYPE);
}
- @AsyncPublisher(
- operation = @AsyncOperation(
- channelName = USER_EVENTS_TOPIC,
- description = "Publish user events",
- message = @AsyncMessage(
- name = "UserEvent",
- contentType = "application/json",
- messageId = "uuid"
- ),
- payloadType = UserEvent.class
- )
- )
- @KafkaAsyncOperationBinding(bindingVersion = "1.0.0")
- void publish(User user, UserEvent userEvent) {
- kafkaProducer.send(USER_EVENTS_TOPIC, user.uuid().toString(), userEvent.toJson());
+ @AsyncPublisher(
+ operation = @AsyncOperation(
+ channelName = USER_EVENTS_TOPIC,
+ description = "Publish user events",
+ message = @AsyncMessage(
+ name = "UserEvent",
+ contentType = JsonFormat.CONTENT_TYPE,
+ messageId = "id"
+ ),
+ payloadType = UserEvent.class
+ )
+ )
+ @KafkaAsyncOperationBinding(bindingVersion = "1.0.0")
+ void publish(final User user, final String eventType) {
+ final var cloudEvent = CloudEventBuilder
+ .v1()
+ .withId(UUID.randomUUID().toString())
+ .withType(eventType)
+ .withSource(URI.create(CLOUD_EVENT_SOURCE))
+ .withTime(OffsetDateTime.now(ZoneOffset.UTC))
+ .withData("application/json", serializeData(new UserEventData(user.uuid().toString())))
+ .build();
+
+ kafkaProducer.send(USER_EVENTS_TOPIC, user.uuid().toString(),
+ new String(EVENT_FORMAT.serialize(cloudEvent), StandardCharsets.UTF_8));
+ }
+
+ private byte[] serializeData(final UserEventData userEventData) {
+ try {
+ return objectMapper.writeValueAsBytes(userEventData);
+ } catch (JsonProcessingException exception) {
+ throw new KafkaException("Cannot serialize CloudEvent user payload", exception);
+ }
}
}
diff --git a/application/src/test/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListenerTest.java b/application/src/test/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListenerTest.java
index 5e7d49c..b243ba5 100644
--- a/application/src/test/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListenerTest.java
+++ b/application/src/test/java/br/com/helpdev/sample/adapters/input/kafka/UserEventListenerTest.java
@@ -1,58 +1,99 @@
package br.com.helpdev.sample.adapters.input.kafka;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.time.OffsetDateTime;
+import java.util.Objects;
import java.util.UUID;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import br.com.helpdev.sample.adapters.input.kafka.dto.UserEventDto;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.provider.EventFormatProvider;
+import io.cloudevents.jackson.JsonFormat;
+
+import br.com.helpdev.sample.adapters.input.kafka.dto.UserEventDataDto;
import br.com.helpdev.sample.core.ports.input.UserEnricherPort;
@ExtendWith(MockitoExtension.class)
class UserEventListenerTest {
- @Mock
- private ObjectMapper objectMapper;
+ private static final String CLOUD_EVENT_SOURCE = "urn:helpdev:sample:user";
+
+ private static final EventFormat EVENT_FORMAT = Objects.requireNonNull(
+ EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE));
@Mock
private UserEnricherPort userEnricherPort;
- @InjectMocks
private UserEventListener userEventListener;
- @Test
- void testListen_UserCreatedEvent() throws JsonProcessingException {
- UserEventDto userEventDto = new UserEventDto(UserEventDto.EVENT_CREATED, UUID.randomUUID().toString());
- String message = "{\"event\":\"" + UserEventDto.EVENT_CREATED + "\",\"uuid\":\"" + userEventDto.uuid() + "\"}";
+ private ObjectMapper objectMapper;
+
+ @BeforeEach
+ void setUp() {
+ objectMapper = new ObjectMapper();
+ userEventListener = new UserEventListener(objectMapper, userEnricherPort);
+ }
- when(objectMapper.readValue(message, UserEventDto.class)).thenReturn(userEventDto);
+ @Test
+ void testListen_UserCreatedEvent() throws Exception {
+ final var userUuid = UUID.randomUUID();
+ final var message = cloudEventMessage("br.com.helpdev.sample.user.created", userUuid.toString());
userEventListener.listen(message);
- verify(userEnricherPort).enrichUser(UUID.fromString(userEventDto.uuid()));
+ verify(userEnricherPort).enrichUser(userUuid);
verifyNoMoreInteractions(userEnricherPort);
}
@Test
- void testListen_UserEventIgnored() throws JsonProcessingException {
- UserEventDto userEventDto = new UserEventDto("EVENT_UPDATED", UUID.randomUUID().toString());
- String message = "{\"event\":\"OTHER_EVENT\",\"uuid\":\"" + userEventDto.uuid() + "\"}";
-
- when(objectMapper.readValue(message, UserEventDto.class)).thenReturn(userEventDto);
+ void testListen_UserEventIgnored() throws Exception {
+ final var message = cloudEventMessage("br.com.helpdev.sample.user.address.updated", UUID.randomUUID().toString());
userEventListener.listen(message);
verifyNoInteractions(userEnricherPort);
}
-}
\ No newline at end of file
+
+ @Test
+ void testListen_UserCreatedEventWithoutDataShouldThrowException() {
+ final var cloudEvent = CloudEventBuilder
+ .v1()
+ .withId(UUID.randomUUID().toString())
+ .withType("br.com.helpdev.sample.user.created")
+ .withSource(URI.create(CLOUD_EVENT_SOURCE))
+ .withTime(OffsetDateTime.now())
+ .build();
+
+ final var message = new String(EVENT_FORMAT.serialize(cloudEvent), StandardCharsets.UTF_8);
+
+ assertThrows(IllegalArgumentException.class, () -> userEventListener.listen(message));
+ verifyNoInteractions(userEnricherPort);
+ }
+
+ private String cloudEventMessage(final String eventType, final String userUuid) throws Exception {
+ final var cloudEvent = CloudEventBuilder
+ .v1()
+ .withId(UUID.randomUUID().toString())
+ .withType(eventType)
+ .withSource(URI.create(CLOUD_EVENT_SOURCE))
+ .withTime(OffsetDateTime.now())
+ .withData("application/json", objectMapper.writeValueAsBytes(new UserEventDataDto(userUuid)))
+ .build();
+
+ return new String(EVENT_FORMAT.serialize(cloudEvent), StandardCharsets.UTF_8);
+ }
+}
diff --git a/application/src/test/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcherTest.java b/application/src/test/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcherTest.java
index d48d9d3..e49d317 100644
--- a/application/src/test/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcherTest.java
+++ b/application/src/test/java/br/com/helpdev/sample/adapters/output/kafka/UserEventDispatcherTest.java
@@ -1,52 +1,127 @@
package br.com.helpdev.sample.adapters.output.kafka;
+import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.util.Objects;
import java.util.UUID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.provider.EventFormatProvider;
+import io.cloudevents.jackson.JsonFormat;
+
import br.com.helpdev.sample.core.domain.entities.User;
import br.com.helpdev.sample.core.domain.vo.Email;
@ExtendWith(MockitoExtension.class)
class UserEventDispatcherTest {
+ private static final String CLOUD_EVENT_SOURCE = "urn:helpdev:sample:user";
+
+ private static final EventFormat EVENT_FORMAT = Objects.requireNonNull(
+ EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE));
+
@Mock
private KafkaTemplate kafkaProducer;
- @InjectMocks
private UserEventDispatcher userEventDispatcher;
+ private ObjectMapper objectMapper;
+
private User user;
@BeforeEach
void setUp() {
final var userUuid = UUID.randomUUID();
+ objectMapper = new ObjectMapper();
+ userEventDispatcher = new UserEventDispatcher(kafkaProducer, objectMapper);
user = new User(1L, userUuid, "John Doe", Email.of("john.doe@example.com"), LocalDate.of(2000, 1, 1), null);
}
@Test
- void testSendUserCreatedEvent() {
+ void testSendUserCreatedEvent() throws Exception {
userEventDispatcher.sendUserCreatedEvent(user);
- verify(kafkaProducer).send("user-events", user.uuid().toString(), UserEvent.ofCreated(user.uuid()).toJson());
- verifyNoMoreInteractions(kafkaProducer);
+ assertPublishedEvent("br.com.helpdev.sample.user.created");
}
@Test
- void testSendUserAddressUpdatedEvent() {
+ void testSendUserAddressUpdatedEvent() throws Exception {
userEventDispatcher.sendUserAddressUpdatedEvent(user);
- verify(kafkaProducer).send("user-events", user.uuid().toString(), UserEvent.ofUpdated(user.uuid()).toJson());
+ assertPublishedEvent("br.com.helpdev.sample.user.address.updated");
+ }
+
+ @Test
+ void testSendUserCreatedEvent_WhenSerializationFailsShouldThrowKafkaException() throws Exception {
+ final var failingObjectMapper = mock(ObjectMapper.class);
+ userEventDispatcher = new UserEventDispatcher(kafkaProducer, failingObjectMapper);
+
+ when(failingObjectMapper.writeValueAsBytes(any(UserEventData.class))).thenThrow(new JsonProcessingException("boom") {
+ private static final long serialVersionUID = 1L;
+ });
+
+ assertThrows(KafkaException.class, () -> userEventDispatcher.sendUserCreatedEvent(user));
+ verifyNoInteractions(kafkaProducer);
+ }
+
+ @Test
+ void testUserEventRecordShouldExposeCloudEventFields() {
+ final var time = OffsetDateTime.now();
+ final var userEventData = new UserEventData(user.uuid().toString());
+ final var userEvent = new UserEvent("1.0", "event-id", CLOUD_EVENT_SOURCE, "br.com.helpdev.sample.user.created", time,
+ "application/json", userEventData);
+
+ assertEquals("1.0", userEvent.specversion());
+ assertEquals("event-id", userEvent.id());
+ assertEquals(CLOUD_EVENT_SOURCE, userEvent.source());
+ assertEquals("br.com.helpdev.sample.user.created", userEvent.type());
+ assertEquals(time, userEvent.time());
+ assertEquals("application/json", userEvent.datacontenttype());
+ assertEquals(user.uuid().toString(), userEvent.data().userUuid());
+ }
+
+ private void assertPublishedEvent(final String expectedType) throws Exception {
+ final var payloadCaptor = ArgumentCaptor.forClass(String.class);
+
+ verify(kafkaProducer).send(eq("user-events"), eq(user.uuid().toString()), payloadCaptor.capture());
verifyNoMoreInteractions(kafkaProducer);
+
+ final var cloudEvent = EVENT_FORMAT.deserialize(payloadCaptor.getValue().getBytes(StandardCharsets.UTF_8));
+
+ assertEquals(SpecVersion.V1, cloudEvent.getSpecVersion());
+ assertNotNull(cloudEvent.getId());
+ assertEquals(expectedType, cloudEvent.getType());
+ assertEquals(CLOUD_EVENT_SOURCE, cloudEvent.getSource().toString());
+ assertEquals("application/json", cloudEvent.getDataContentType());
+ assertNotNull(cloudEvent.getTime());
+ assertNotNull(cloudEvent.getData());
+
+ final var data = objectMapper.readValue(cloudEvent.getData().toBytes(), UserEventData.class);
+ assertEquals(user.uuid().toString(), data.userUuid());
}
-}
\ No newline at end of file
+}
diff --git a/docs/adr/0004-use-cloudevents-as-the-kafka-event-contract.md b/docs/adr/0004-use-cloudevents-as-the-kafka-event-contract.md
new file mode 100644
index 0000000..fce28f7
--- /dev/null
+++ b/docs/adr/0004-use-cloudevents-as-the-kafka-event-contract.md
@@ -0,0 +1,51 @@
+# 4. Use CloudEvents as the Kafka Event Contract
+
+## Context
+
+The application already uses Kafka to publish domain events on the `user-events` topic, but the current payload is a custom JSON shape that mixes transport metadata and business data. That makes the contract ad hoc, limits interoperability with external consumers, and leaves event metadata such as source, type, identifier, and content type without a standard representation.
+
+Issue #3 asks the project to implement the CloudEvents specification using the Java SDK, so the team needs a durable decision for how Kafka events should be represented from now on.
+
+## Decision
+
+We will publish and consume Kafka events as **CloudEvents v1.0 in structured JSON format**.
+
+The event contract will follow these rules:
+
+- use the Java CloudEvents SDK to build and parse events
+- serialize messages as `application/cloudevents+json`
+- use the CloudEvent `type` attribute to represent the domain event kind
+- use the CloudEvent `data` attribute for the business payload
+- keep the Kafka record key tied to the user identifier so ordering semantics stay unchanged for a given user
+
+## Consequences
+
+- Kafka events become self-describing and interoperable with CloudEvents-aware tooling and consumers.
+- Event metadata is normalized through standard attributes instead of custom JSON conventions.
+- The async contract becomes easier to document consistently in AsyncAPI because the envelope is explicit.
+- Publishers and listeners must map between domain payloads and the CloudEvents envelope.
+- Consumers of the existing custom JSON payload need to understand the new CloudEvents envelope.
+
+## Alternatives Considered
+
+- **Keep the current custom JSON payload**: simpler short term, but it keeps the contract proprietary and pushes metadata conventions into custom code.
+- **Use CloudEvents binary mode over Kafka headers**: valid and efficient, but it spreads the contract across headers and payload, which makes the sample harder to inspect, test, and document than a self-contained structured JSON message.
+
+## Rationale
+
+Structured JSON CloudEvents gives the template a standards-based event contract without changing the existing topic topology or ordering behavior. It keeps each Kafka message self-contained, which fits the template's goals of clarity, learnability, and generated documentation while still aligning with the CloudEvents specification and SDK.
+
+## Date
+
+2026-04-12
+
+## Status
+
+Accepted
+
+## Links
+
+- [ADR 0003: Use Kafka for Event Streaming](0003-use-kafka-for-event-streaming.md)
+- [Issue #3](https://github.com/helpdeveloper/java-architecture-template/issues/3)
+- [CloudEvents Specification](https://cloudevents.io/)
+- [CloudEvents Java SDK](https://github.com/cloudevents/sdk-java)
diff --git a/docs/adr/README.md b/docs/adr/README.md
index 8701be6..d30f912 100644
--- a/docs/adr/README.md
+++ b/docs/adr/README.md
@@ -42,6 +42,7 @@ Follow these steps when creating a new ADR:
| 0001 | [Adopt REST for API Communication](0001-adopt-rest-for-api-communication.md) | Accepted | 2024-12-31 |
| 0002 | [Use MySQL as the Primary Database](0002-use-mysql.md) | Accepted | 2024-12-31 |
| 0003 | [Use Kafka for Event Streaming](0003-use-kafka-for-event-streaming.md) | Accepted | 2024-12-31 |
+| 0004 | [Use CloudEvents as the Kafka Event Contract](0004-use-cloudevents-as-the-kafka-event-contract.md) | Accepted | 2026-04-12 |
---