diff --git a/src/docs/asciidoc/cdc-configuration.adoc b/src/docs/asciidoc/cdc-configuration.adoc index f46b865..2412545 100644 --- a/src/docs/asciidoc/cdc-configuration.adoc +++ b/src/docs/asciidoc/cdc-configuration.adoc @@ -396,7 +396,7 @@ If polling fails, reader will try again using the specified interval. | Pipeline property name | eventuate.database.schema -| Schema which is listened by the CDC service +| Schema which is listened by the CDC service (use `NONE` for default from jdbc connection) | eventuate | eventuateDatabaseSchema @@ -580,7 +580,7 @@ The Eventuate Tram CDC supports `eventuate-tram` and `eventuate-local` | - | eventuateDatabaseSchema -| The schema of the transaction outbox table +| The schema of the transaction outbox table (use `NONE` for default from jdbc connection). | `eventuate` | sourceTableName @@ -595,6 +595,95 @@ The Eventuate Tram CDC supports `eventuate-tram` and `eventuate-local` |=== +==== Configuring a cleaner + +Cdc can automatically remove old messages from message table and received_message table. +To enable cleaning is necessary to define one or more named cleaners. +A cleaner can clean message and/or received_message tables. +Example configuration: + +---- +EVENTUATE_CDC_CLEANER_CLEANER1_PIPELINE: PIPELINE1 +EVENTUATE_CDC_CLEANER_CLEANER1_MESSAGE_CLEANING_ENABLED: "true" +EVENTUATE_CDC_CLEANER_CLEANER1_MESSAGES_MAX_AGE_IN_SECONDS: 1 +EVENTUATE_CDC_CLEANER_CLEANER1_RECEIVED_MESSAGE_CLEANING_ENABLED: "true" +EVENTUATE_CDC_CLEANER_CLEANER1_RECEIVED_MESSAGES_MAX_AGE_IN_SECONDS: 1 +EVENTUATE_CDC_CLEANER_CLEANER1_INTERVAL_IN_SECONDS: 1 +... +---- + +Here is specified message cleaner with name CLEANER1 (EVENTUATE_CDC_CLEANER is root property name). +It used database connection from PIPELINE1. +Cleaning enabled for message and received_message table. +Max message and received message age is one second. +Cleaning will be started on each second. + + +If pipeline is specified, pipeline database configuration is used. +If not, please specify additional database parameters explicitly: + +---- +EVENTUATE_CDC_CLEANER_CLEANER1_DATASOURCE_URL: jdbc:postgresql://postgreswalpipeline/eventuate +EVENTUATE_CDC_CLEANER_CLEANER1_DATASOURCE_USERNAME: eventuate +EVENTUATE_CDC_CLEANER_CLEANER1_DATASOURCE_PASSWORD: eventuate +EVENTUATE_CDC_CLEANER_CLEANER1_DATASOURCE_DRIVER_CLASSNAME: org.postgresql.Driver +... +---- + +Full list of configuration properties + +[cols=3, options="header"] +|=== +| Name +| Description +| Default Value + +| dataSourceUrl +| jdbc connection url +| - + +| dataSourceUserName +| jdbc username +| - + +| dataSourcePassword +| jdbc password +| - + +| dataSourceDriverClassName +| jdbc driver class name +| - + +| eventuateSchema +| database schema +| depends on RDBMS + +| pipeline +| pipeline name (used instead of jdbc configuration) +| - + +| messageCleaningEnabled +| enables message table cleaning +| false + +| messagesMaxAgeInSeconds +| max age of message to remove +| 2 days + +| receivedMessageCleaningEnabled +| enables received_message table cleaning +| false + +| receivedMessagesMaxAgeInSeconds +| max age of received_message to remove +| 2 days + +| intervalInSeconds +| how often cdc starts cleaning +| every minute + +|=== + === Configuring the publisher The publisher is invoked by the pipeline to publish a message/event to the message broker. @@ -807,16 +896,178 @@ The CDC service requires various infrastructure services including: The Eventuate CDC service requires several tables. The Eventuate MySQL and Postgres images define these tables. + +In the default Eventuate images, tables are located in 'eventuate' schema. +If you created necessary tables (see further) in some custom schema, please specify it in `eventuate.database.schema` property or use value `NONE` for it. +See <> and <> + +==== MySQL schema example for Eventuate Local + +---- +create table events ( + id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, + event_id VARCHAR(255), + event_type LONGTEXT, + event_data LONGTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, + entity_type VARCHAR(255) NOT NULL, + entity_id VARCHAR(255) NOT NULL, + triggering_event LONGTEXT, + metadata LONGTEXT, + published TINYINT DEFAULT 0 +); + +create table entities ( + entity_type VARCHAR(255), + entity_id VARCHAR(255), + entity_version LONGTEXT NOT NULL, + PRIMARY KEY(entity_type, entity_id) +); + +create table snapshots ( + entity_type VARCHAR(255), + entity_id VARCHAR(255), + entity_version VARCHAR(255), + snapshot_type LONGTEXT NOT NULL, + snapshot_json LONGTEXT NOT NULL, + triggering_events LONGTEXT, + PRIMARY KEY(entity_type, entity_id, entity_version) +); +---- + +For details see: + +link:https://github.com/eventuate-foundation/eventuate-common/blob/master/mysql/1.initialize-database.sql[MySQL schema for Eventuate Local] + +link:https://github.com/eventuate-foundation/eventuate-common/blob/master/mysql/4.initialize-database-db-id.sql[MySQL schema for Eventuate Local with database generated id] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/postgres/1.initialize-database.sql[Postgres schema for Eventuate Local] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/postgres/5.initialize-database-db-id.sql[Postgres schema for Eventuate Local with database generated id] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/mssql/1.setup.sql[MSSQL schema for Eventuate Local] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/mssql/4.setup-db-id.sql[MSSQL schema for Eventuate Local with database generated id] + +==== MySQL schema example for Eventuate Tram + +---- +CREATE TABLE message ( + dbid BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, + id LONGTEXT, + destination LONGTEXT NOT NULL, + headers LONGTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, + payload LONGTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, + published SMALLINT DEFAULT 0, + creation_time BIGINT +); + +CREATE TABLE received_messages ( + consumer_id VARCHAR(255), + message_id VARCHAR(255), + creation_time BIGINT, + published SMALLINT DEFAULT 0, + PRIMARY KEY(consumer_id, message_id) +); +---- + +For details see: + +link:https://github.com/eventuate-foundation/eventuate-common/blob/master/mysql/2.initialize-database.sql[MySQL schema for Eventuate Tram] + +link:https://github.com/eventuate-foundation/eventuate-common/blob/master/mysql/4.initialize-database-db-id.sql[MySQL schema for Eventuate Tram with database generated id] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/postgres/2.initialize-database.sql[Postgres schema for Eventuate Tram] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/postgres/5.initialize-database-db-id.sql[Postgres schema for Eventuate Tram with database generated id] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/mssql/2.setup.sql[MSSQL schema for Eventuate Tram] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/mssql/4.setup-db-id.sql[MSSQL schema for Eventuate Tram with database generated id] + +==== MySQL schema example for Eventuate Tram Saga + +---- +CREATE TABLE saga_instance_participants ( + saga_type VARCHAR(255) NOT NULL, + saga_id VARCHAR(100) NOT NULL, + destination VARCHAR(100) NOT NULL, + resource VARCHAR(100) NOT NULL, + PRIMARY KEY(saga_type, saga_id, destination, resource) +); + +CREATE TABLE saga_instance( + saga_type VARCHAR(255) NOT NULL, + saga_id VARCHAR(100) NOT NULL, + state_name VARCHAR(100) NOT NULL, + last_request_id VARCHAR(100), + end_state INT(1), + compensating INT(1), + saga_data_type VARCHAR(1000) NOT NULL, + saga_data_json VARCHAR(1000) NOT NULL, + PRIMARY KEY(saga_type, saga_id) +); + +create table saga_lock_table( + target VARCHAR(100) PRIMARY KEY, + saga_type VARCHAR(255) NOT NULL, + saga_Id VARCHAR(100) NOT NULL +); + +create table saga_stash_table( + message_id VARCHAR(100) PRIMARY KEY, + target VARCHAR(100) NOT NULL, + saga_type VARCHAR(255) NOT NULL, + saga_id VARCHAR(100) NOT NULL, + message_headers VARCHAR(1000) NOT NULL, + message_payload VARCHAR(1000) NOT NULL +); +---- + +For details see: + +link:https://github.com/eventuate-tram/eventuate-tram-sagas/tree/master/mysql/tram-saga-schema.sql[MySQL schema for Eventuate Tram Saga] + +link:https://github.com/eventuate-tram/eventuate-tram-sagas/tree/master/postgres/tram-saga-schema.sql[Postgres schema for Eventuate Tram Saga] + +link:https://github.com/eventuate-tram/eventuate-tram-sagas/tree/master/mssql/5.tram-saga-schema.sql[MSSQL schema for Eventuate tram Saga] + ==== `CDC_MONITORING` table The CDC service uses the `CDC_MONITORING` table to implement a 'heart beat' mechanism. Each reader that uses transaction log tailing (MySQL binlog/Postgres WAL) periodically updates a row in this table and measures the delay in receiving the update from the transaction log. +---- +create table cdc_monitoring ( + reader_id VARCHAR(255) PRIMARY KEY, + last_time BIGINT +); +---- + +For details see: + +link:https://github.com/eventuate-foundation/eventuate-common/blob/master/mysql/1.initialize-database.sql[MySQL schema for monitoring] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/postgres/1.initialize-database.sql[Postgres schema for monitoring] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/mssql/1.setup.sql[MSSQL schema for monitoring] ==== `OFFSET_STORE` table When publishing messages to Apache ActiveMQ, RabbitMQ, Redis, the MySql binlog reader records the current binlog position in this table. +---- +CREATE TABLE offset_store( + client_name VARCHAR(255) NOT NULL PRIMARY KEY, + serialized_offset LONGTEXT +); +---- + +link:https://github.com/eventuate-foundation/eventuate-common/blob/master/mysql/2.initialize-database.sql[MySQL schema for offset storage] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/postgres/2.initialize-database.sql[Postgres schema offset storage] + +link:https://github.com/eventuate-foundation/eventuate-common/tree/master/mssql/2.setup.sql[MSSQL schema for offset storage] + === Apache Kafka broker configuration The MySQL binlog reader records the current binlog position in the `offsetStorageTopicName` topic. diff --git a/src/docs/asciidoc/getting-started-eventuate-tram.adoc b/src/docs/asciidoc/getting-started-eventuate-tram.adoc index 63097f7..cdc91ea 100644 --- a/src/docs/asciidoc/getting-started-eventuate-tram.adoc +++ b/src/docs/asciidoc/getting-started-eventuate-tram.adoc @@ -645,6 +645,10 @@ public class AbstractTramEventTestConfiguration { See this example of https://github.com/eventuate-tram-examples/eventuate-tram-core-quarkus-examples-basic/blob/main/eventuate-tram-examples-common/src/main/java/io/eventuate/tram/examples/basic/events/AbstractTramEventTestConfiguration.java[transaction events]. +Please note: if you want to have several dispatchers, you will need to use @Named annotation (https://docs.oracle.com/javaee/6/api/javax/inject/Named.html) +on DomainEventDispatcher bean definitions. To initialize DomainEventDispatcher objects, eventuate framework iterates over all beans +of that type and those beans should have unique names, otherwise only one of them will be retrieved from IoC container (micronaut specific behavior) +without any explicit error. === Transactional commands @@ -939,6 +943,7 @@ spring.datasource.url=jdbc:mysql://${DOCKER_HOST_IP}/eventuate spring.datasource.username=mysqluser spring.datasource.password=mysqlpw spring.datasource.driver.class.name=com.mysql.jdbc.driver +eventuate.database.schema=eventuate ---- ==== Micronaut @@ -950,6 +955,10 @@ datasources: driverClassName: com.mysql.jdbc.driver username: mysqluser password: mysqlpw + +eventuate: + database: + schema: eventuate ---- ==== Quarkus @@ -960,7 +969,13 @@ quarkus.datasource.password=mysqlpw quarkus.datasource.jdbc.url=jdbc:mysql://localhost/eventuate quarkus.datasource.db-kind=mysql eventuateDatabase=mysql +eventuate.database.schema=eventuate ---- +The `eventuate.database.schema` property is optional. +You can use it to define database schema where Eventuate tables are located +(Please read link:cdc-configuration.html[CDC Configuration document]). +By default `eventuate` schema is used. Specify `NONE` to use schema from jdbc connection. + In addition, you need to define message broker-specific properties.