En este tutorial voy a diseccionar la implementación del ejemplo de Event Sourcing y CQRS que presenté en la charla «Arquitecturas orientadas a eventos: de las notificaciones al Event Sourcing«, una aplicación para reservas de teatro.
1. Introducción
Recientemente publiqué una charla sobre Arquitecturas Orientadas a Eventos como parte de la serie #CompartimosExperiencias de Autentia.
Este tutorial es un complemento al capítulo sobre Event Sourcing y CQRS en esa charla. Así que, si aún no la has visto, te animo a hacerlo para entender mejor el contexto: https://youtu.be/gX0DUO171jc
Y es que el diablo está en los detalles. O tal y como lo expresa Linus Torvalds: “talk is cheap, show me the code”.
Los diagramas son muy bonitos y nunca fallan. Por eso en este tutorial vamos a hablar de código.
En concreto, aquí voy a diseccionar la implementación de Event Sourcing del proceso de reservas de teatro.
2. Entorno
- SLIMBOOK PROX (Intel® Core™ i7-8565U, 1.80GHz, 32GB RAM)
- Ubuntu 20.04.1 LTS (5.4.0-58-generic)
- Apache Maven 3.6.3
- OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.5+10)
- IntelliJ IDEA 2020.2.3 (Community Edition)
3. Lo más importante
Antes de empezar, insisto como ya dije en la charla, que esta manera de implementar una solución sólo tiene sentido si el dominio y el negocio lo justifican.
Primero, el dominio del problema debe consistir en un proceso cuyo progreso va dando lugar a una serie de eventos. Esto también debería verse reflejado en el UX de la solución.
Segundo, a negocio debe interesarle mucho que esos eventos queden registrados en un histórico. También, por las características de la arquitectura, la escalabilidad y la resiliencia deben ser requisitos importantes.
Recordad que con esta forma de hacer las cosas, sacrificamos la consistencia transaccional y la simplicidad de implementación y operación.
4. El modelo
También es importante modelar bien el dominio con DDD y EventStorming.
Se puede hacer desde la perspectiva de explorar y entender el problema o como táctica para diseñar las agregadas y el proceso en detalle.
Yo empecé por la primera aproximación y acabé refinando el modelo hasta el nivel de detalle que me permitió hacer una traducción casi directa al código Event Sourcing:
Aún así, aquí faltan dos detalles importantes:
- Los comandos que disparan eventos secundarios en otras agregadas.
Por ejemplo, justo antes del evento “Descuentos aplicados” estaría el comando “Aplicar descuentos”. - La lógica reactiva que encadena la secuencia de un evento de una agregada con el comando de otra agregada. Es decir, la saga del proceso de reservas.
Por ejemplo, el hecho de que a continuación de “Reserva confirmada” debe invocarse automáticamente el comando “Aplicar descuentos”.
Según esto, el modelo completo de lo que ocurre durante “Confirmar reserva” sería:
Es comprensible que el modelo no tenga este nivel de detalle pero obviamente todo esto sí debe estar explícito en el código.
5. La implementación
El código se puede descargar aquí: https://github.com/dav-garcia/reservas-teatro
No he utilizado ningún framework específico de Event Sourcing, aunque sí he sucumbido a la comodidad de usar Spring Boot y Lombok.
5.1. Configuración
Todos los beans del contexto de Spring se registran manualmente en clases de configuración en el paquete com.autentia.tutoriales.reservas.teatro.configuration del módulo reservas-teatro-es.
Hay una clase de configuración para cada agregada del modelo de escritura (Cliente, Pago, Representación, Reserva), para cada modelo de lectura (Histórico) y para la saga.
Esto facilita tener tests unitarios de cada componente levantando un contexto mínimo de Spring.
Las clases de configuración de las agregadas definen:
- El Repository para guardar el modelo de escritura.
- El EventPublisher de la agregada.
- Cualquier adaptador de proveedor externo (por ejemplo, PaymentGateway).
- El EventConsumer para que la agregada proyecte sus propios eventos en su modelo de escritura.
- El CommandContext, del que hablaremos más adelante.
- El CommandDispatcher con el que se le pueden enviar comandos a la agregada.
5.2. Infraestructura
Todo el código de infraestructura está en el módulo reservas-teatro-infra.
En el paquete raíz están las interfaces básicas de DDD y Event Sourcing: Entity, AggregateRoot, Command y Event.
Estas interfaces son necesarias porque hay unos contratos mínimos que esperan los dispatchers, productores, consumidores y repositorios.
Seguro que alguna de ellas podría ser reemplazada por una anotación, pero eso se sale del propósito de este ejemplo.
CommandDispatcher
En el subpaquete dispatcher está la interfaz CommandDispatcher con la que se puede solicitar que se ejecute un comando.
Imaginemos por ejemplo el controller REST para iniciar el proceso de reserva:
@PostMapping("/reserva") public ResponseEntity crearReserva(final @RequestBody CrearReservaRequest request) { final var idReserva = UUID.randomUUID(); final var command = new SeleccionarButacasCommand(request.getIdRepresentacion(), idReserva, request.getButacas(), request.getEmail()); representacionDispatcher.dispatch(command); // Invoca el comando return ResponseEntity.accepted().body(new CrearReservaResponse(idReserva)); }
Hay dos implementaciones de esta interfaz:
- SynchronizedCommandDispatcher, que fuerza synchronized en el método dispatch().
Esto es un single-writer intra-process, es decir, sólo funciona dentro de una misma JVM. - OccCommandDispatcher implementa Optimistic Concurrency Control apoyándose en que el EventPublisher controle el offset del stream de eventos.
El código es muy sencillo, ya que el marrón de la publicación atómica condicional se lo come el EventPublisher.
CommandContext
Un comando tiene como dependencias mínimas el Repository del modelo de escritura y el EventPublisher de su agregada. ¿Cómo se inyectan esas dependencias en cada instancia de comando?
Podría definir una factoría de comandos que sea usada por los clientes, pero he preferido que los comandos sean clases sencillas instanciables en cualquier momento y lugar.
Esto deja dos lugares posibles para pasar las dependencias:
- El propio cliente que ejecuta el comando. El resultado es que el cliente se llena de dependencias que ni le van ni le vienen, solo están ahí para poder instanciar comandos.
- El CommandDispatcher de la agregada. Esta es una buena opción salvo que la implementación de CommandDispatcher es genérica, así que no puede saber las dependencias concretas que necesitan los comandos de cada agregada.
He elegido la opción 2 con una variación: que las dependencias concretas vayan en una subclase de CommandContext y que CommandDispatcher dependa sólo de ella.
Esto se define en tiempo de configuración, por ejemplo para la agregada de pagos:
@Bean public PagoCommandContext pagoCommandContext(final Repository pagoRepository, final InMemoryEventPublisher pagoPublisher, final PaymentGateway paymentGateway) { return new PagoCommandContext(pagoRepository, pagoPublisher, paymentGateway); } @Bean public CommandDispatcher pagoDispatcher(final PagoCommandContext context) { return new OccCommandDispatcher(context); }
Y en runtime, el dispatcher pasa el contexto como parámetro al ejecutar el comando:
@Override public void dispatch(final Command command) { ... command.execute(context); ... }
EventPublisher
Debe crearse una instancia de EventPublisher por cada agregada que exista en el dominio.
Los comandos son los únicos que pueden publicar eventos usando el EventPublisher de su CommandContext.
La única implementación de esta interfaz, InMemoryEventPublisher, distribuye los eventos síncronamente.
El evento no se guarda de manera permanente, solo está en memoria mientras se está distribuyendo a los consumidores registrados.
Otro tema interesante es que la atomicidad del tryPublish() se consigue con una operación CAS (Compare And Set).
En concreto, se usa AtomicLong.compareAndExchange() para comprobar si la versión del modelo coincide con el offset del stream.
Obviamente, en un sistema real, la implementación del EventPublisher se integraría con el event journal.
Por ejemplo, en Kafka tendríamos que crear una transacción.
EventConsumer
Esta es la interfaz que implementa cualquier consumidor de eventos: los de la propia agregada, los proyectores de modelos de lectura y las sagas.
Cada adaptador de event journal tendrá su mecanismo para registrar los consumidores e invocarlos cuando haya nuevos eventos de la agregada.
En esta implementación, los consumidores se registran manualmente en su InMemoryEventPublisher durante la configuración. Por ejemplo:
@Bean public PagoEventConsumer pagoEventConsumer(final Repository pagoRepository, final InMemoryEventPublisher pagoPublisher) { final var result = new PagoEventConsumer(pagoRepository); pagoPublisher.registerEventConsumer(result); return result; }
Otros puertos y adaptadores
Existe un puerto PaymentGateway totalmente dummy para simular la dependencia con un proveedor externo.
También está el puerto de persistencia Repository con su implementación InMemoryRepository que guarda las instancias en un mapa en memoria.
Por último, hay un TaskScheduler que funciona bien cuando todo el sistema está corriendo en una única JVM (proceso único).
Internamente utiliza un ThreadPoolTaskScheduler y se utiliza para programar el timeout de una reserva.
5.3. Eventos
Los eventos están en subpaquetes aparte bajo com.autentia.tutoriales.reservas.teatro.event.
En teoría son parte de la agregada, en concreto, el aspecto observable del estado de la agregada. ¡De hecho son lo más importante de nuestro modelo DDD orientado a eventos!
Entonces, ¿por qué he preferido sacarlos a otro sitio? Por dos motivos:
- Precisamente por su relevancia, para no mezclarlos con comandos y entidades del modelo de escritura.
- Para facilitar su reutilización en otros consumidores, como los modelos de lectura y la saga.
Hay un subpaquete de eventos por cada agregada. En ellos también están incluidos los value objects usados en los eventos. Por ejemplo, en el caso de los pagos:
Todas estas clases son inmutables, así que están anotadas con @Value de Lombok.
5.4. Agregadas
Algunos frameworks de Event Sourcing obligan a crear una clase que represente el concepto de agregada como frontera de consistencia de un grupo de entidades y value objects. Yo creo que esta aproximación da lugar a clases demasiado grandes puesto que habría que definir un método para cada comando que se reciba y para cada evento que se aplique.
En vez de eso, he preferido que la agregada quede representada por un paquete que contiene clases de comando, el contexto de comandos, uno o más auto-consumidores de los eventos propios y las entidades del modelo de escritura. Por ejemplo:
Estos paquetes están colgados del paquete raíz com.autentia.tutoriales.reservas.teatro.command.
Sí, los comandos implementan el patrón de diseño Command y los consumidores de eventos el patrón Observer.
5.5. Comandos
Los comandos simples tienen dos partes: validación y publicación de eventos.
Los comandos con efectos laterales interactúan con un proveedor externo en medio de esos dos pasos y por tanto deben manejar la repetición.
Comandos simples
Por ejemplo, el comando para liberar butacas reservadas:
@Value public class LiberarButacasCommand implements Command { UUID aggregateRootId; Set butacas; @Override public void execute(final RepresentacionCommandContext context) { final var representacion = context.getRepository().load(aggregateRootId) .orElseThrow(() -> new CommandNotValidException("Representación no existe")); context.getEventPublisher().tryPublish(representacion.getVersion(), new ButacasLiberadasEvent(aggregateRootId, butacas)); } }
La validación comprueba que la representación existe y, si es así, publica el evento correspondiente.
Podría haber usado el patrón Template Method para estos comandos, teniendo una clase base parecida a esta:
public abstract class SimpleCommand<C extends CommandContext, T extends AggregateRoot, U> implements Command { @Override public void execute(final C context) { final var aggregateRoot = validateCommand(getAggregateRootId()); publishEvents(aggregateRoot); } /** * @throws CommandNotValidException si el comando no es válido */ protected abstract T validateCommand(final U aggregateRootId); protected abstract void publishEvents(final T aggregateRoot); }
Comandos con efectos laterales
El ejemplo más claro es “proponer pago”, que debe contactar con la pasarela de pagos para iniciar el proceso de pago por un canal separado.
He implementado dos estrategias para manejar la repetición del comando:
- ProponerPagoIdempotentCommand es idempotente y asegura que, aunque el comando se repita, el pago sólo se inicia una vez con el proveedor.
Esto funcionará siempre y cuando se mantenga el estado del comando entre repeticiones.
Primero se define una propiedad mutable:@NonFinal String codigoPago;
Y luego, entre otras cosas, solo se llama al proveedor si la propiedad es null:
if (codigoPago == null) { // Idempotencia con el proveedor externo en caso de repetición codigoPago = iniciarPago(context.getPaymentGateway()); }
- ProponerPagoRollbackCommand hace lo que dice en su nombre. Si el comando falla por inconsistencia de versiones (usando OCC), entonces se cancela el inicio de pago.
Al repetir el comando se vuelve a iniciar el pago.
5.6. Entidades
Con Lombok he podido definir de manera muy concisa la semántica del ciclo de vida de las entidades del modelo de escritura. Por ejemplo, en la entidad Cliente:
@Getter @Setter @AllArgsConstructor(access = AccessLevel.PRIVATE) @Builder(builderClassName = "Builder") public class Cliente implements AggregateRoot { private final String id; private long version; private boolean suscrito; private final List descuentos; ... }
Prefiero tener un Builder a un constructor con muchos parámetros.
Hay campos no modificables como el id y los descuentos, y otros que sí varían durante la vida de la entidad como la version y si el cliente está suscrito.
5.7. Auto-consumidores
Cada agregada tiene un consumidor para sus propios eventos. Esto es lo que garantiza que podemos reconstruir el modelo de escritura a partir de los eventos.
Este consumidor no tiene ningún misterio. Simplemente, traduce el evento a una o más entidades persistentes que se guardan en el repositorio. Por ejemplo, el que guarda una propuesta de pago:
private void apply(final long version, final PagoPropuestoEvent event) { final var pago = Pago.builder() .id(event.getAggregateRootId()) .version(version) .codigoPago(event.getCodigoPago()) .build(); repository.save(pago); }
5.8. Modelo de lectura
A modo de ejemplo, he definido un modelo de lectura que guarda un histórico de todas las reservas de un cliente, incluyendo su estado.
Esto está en el paquete com.autentia.tutoriales.reservas.teatro.query.
La clase HistoricoEventConsumer tiene un mapa de lambdas para recoger y aplicar todos los eventos al histórico persistente.
5.9. Saga
Probé dos maneras distintas de implementar la saga:
- Una única clase con consumidores de eventos internos para cada agregada.
- Clases separadas con la lógica para gestionar los eventos de cada agregada.
Al final opté por la segunda opción porque la clase de la saga estaba quedando demasiado grande, aún a costa de tener que repetir algunos fragmentos pequeños de código.
Todas estas clases están en el paquete com.autentia.tutoriales.reservas.teatro.saga.
También está definida ahí la entidad persistente que mantiene el estado del proceso de reserva:
@Getter @Setter @AllArgsConstructor(access = AccessLevel.PRIVATE) @Builder(builderClassName = "Builder") public class EstadoProceso implements Entity { private final UUID id; // Se utiliza el id de la reserva private final UUID representacion; private final String cliente; private final Set butacas; private UUID pago; }
Las clases de la saga son consumidores normales con métodos process() que reciben un evento como parámetro. Por ejemplo, este es el método que se ejecuta cuando se ha confirmado una reserva:
private void process(final ReservaConfirmadaEvent event) { final var estado = repository.load(event.getAggregateRootId()).orElseThrow(); final var maximo = estado.getButacas().stream() .mapToInt(Butaca::getPrecio) .sum(); clienteDispatcher.dispatch(new AplicarDescuentosCommand(estado.getCliente(), estado.getId(), maximo)); }
Esta parte de la aplicación es la más difícil de seguir. No sólo por la variedad de implementaciones de métodos process(), sino también por el orden en que se encadenan los comandos y los eventos.
Veamos a continuación algunos ejemplos interesantes.
Cancelación por timeout
La saga es responsable de garantizar invariantes temporales como los timeouts.
Aquí la reserva tiene un tiempo de vida, si no se paga en ese tiempo, se cancela.
Todo esto lo gestiona la clase ReservaSaga usando el TaskScheduler.
El temporizador se pone en marcha al crear la reserva. Tan sencillo como invocar el comando “abandonar reserva” cuando ocurra el timeout:
private void process(final ReservaCreadaEvent event) { final var id = event.getAggregateRootId(); taskScheduler.scheduleTask(TASK_TYPE, id, () -> reservaDispatcher.dispatch(new AbandonarReservaCommand(id)), timeout); }
Por otro lado, el timeout se cancela ante los eventos “reserva cancelada” y “reserva pagada”. Así es el código para este último evento:
private void process(final ReservaPagadaEvent event) { taskScheduler.cancelTask(TASK_TYPE, event.getAggregateRootId()); repository.delete(event.getAggregateRootId()); }
Consistencia entre agregadas
En la charla comenté que el orden de las operaciones es la base para garantizar la consistencia del proceso. Hay que asegurar que todas las transiciones llevan a estados legales.
Y en este proceso hay una situación muy delicada cuando se cancela una reserva, ya sea por petición del usuario o por timeout.
En primer lugar, hay que ejecutar varias acciones de compensación a modo de rollback:
- Liberar las butacas.
- Recuperar los descuentos aplicados.
- Cancelar la iniciación de pago, si la había.
En segundo lugar, la reserva no puede quedar cancelada o las butacas libres si el pago ya se ha confirmado. Parece fácil, pero pensad cómo se controla la concurrencia de confirmar el pago después del timeout pero antes de completar las acciones de compensación, o viceversa.
La solución al primer problema es encadenar automáticamente un evento con otro comando.
Hay varias combinaciones pero he elegido una que también sirve para controlar el problema de concurrencia.
Este es el flujo:
Recordad que sólo se garantiza la transaccionalidad dentro de cada agregada. Fuera de ellas, es la saga la que debe ocuparse de mantener un estado global eventualmente consistente a través de las acciones de compensación.
Entonces, para evitar la inconsistencia de “reserva abandonada/cancelada” con “pago confirmado”, lo que hago es sincronizar un estado válido dentro de la agregada de Pago. Ahí sí puedo asegurar que o bien el pago está pendiente o bien se ha confirmado.
Así que la primera acción de compensación debe ser intentar anular la propuesta de pago.
Si esto funciona, entonces es seguro que el pago aún no se ha confirmado y el resto de acciones de compensación pueden continuar con tranquilidad.
Si no funciona, significa que hay “pago confirmado”, incluso aunque exista el evento “reserva abandonada” o “reserva cancelada”.
Lo único extraño es que al final se emitirá un evento “reserva pagada” que puede parecer incongruente pero que refleja exactamente lo ocurrido en el mundo real. Pero lo importante es que el estado final será válido.
Por otro lado, no puede ocurrir que se confirme el pago en el proveedor si el estado ya es “pago anulado”. Ese evento sólo se publica después de haber solicitado la anulación al proveedor.
¿Se podrían haber resuelto estos problemas de otra manera? Seguramente, lo único que hay que recordar es que la transaccionalidad sólo existe dentro de cada agregada.
6. Conclusiones
Entre la charla y este tutorial espero haber cubierto los aspectos más relevantes del Event Sourcing con un enfoque práctico y cercano al mundo real.
A pesar de la cantidad de información, aún hay algunos detalles que me he dejado fuera como el cumplimiento de GDPR con eventos inmutables o una implementación más realista con Kafka o Event Store como event journals. Tampoco he hablado de “reproyectar” eventos y la necesidad de dejar fuera de esto a la lógica reactiva de las sagas.
En fin, son muchos detalles de un paradigma muy diferente al clásico CRUD. Más complicado en muchos casos, pero también pensad en cómo se implementarían todos esos invariantes y restricciones temporales y de concurrencia si no tuviéramos eventos…
Tampoco sería fácil, ¿verdad? Y al menos en Event Sourcing tenemos un histórico preciso de lo que ha ocurrido y en qué momento.
Como con todo, hay que valorar los aspectos positivos y negativos y ver si compensa. Lo importante es tener otra herramienta a nuestra disposición para cuando surja la necesidad.
6.1. Ejercicios y feedback
Os propongo también un par de ejercicios.
El primero es modificar el código del CommandDispatcher para devolver la nueva versión del modelo. Este cambio afectaría también al EventPublisher, ya que es él quien realmente se encarga de hacer el mapping entre números de versión y offsets del stream.
De este modo, el controller REST podría devolver una cabecera ETag con ese número de versión.
El segundo ejercicio es encontrar una posible condición de carrera en la saga que compensa la «reserva abandonada/cancelada». El resultado es una inconsistencia en la que la propuesta de pago se queda pendiente de cancelación.
Espero vuestros comentarios con dudas, sugerencias y correcciones. Me gustaría mucho conocer vuestras opiniones.
7. Referencias
Termino poniendo aquí algunas de las fuentes más importantes que he usado para la charla y el código:
- David Schmitz, Event Sourcing, you are doing it wrong.
- Greg Young, A Decade of DDD, CQRS, Event Sourcing.
- The big interview with Martin Kleppmann.
- Martin Fowler, What do you mean by “Event-Driven”?
- Martin Fowler, Event Sourcing.
- Teiva Harsanyi, 1 Year of Event Sourcing and CQRS.
- Bernd Rücker, Opportunities and Pitfalls of Event-driven Utopia.