From 83c5eb683716a99a4cf11fd2eaff2b7b5fa81933 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Bigorajski?= <72501021+lukasz-bigorajski@users.noreply.github.com> Date: Tue, 6 Aug 2024 16:33:18 +0200 Subject: [PATCH] Add DB mocks and sample RTM scenario (#183) --- designer/application-customizations.conf | 35 +- docker-compose.yml | 4 +- flink/Dockerfile | 8 + mocks/Dockerfile | 17 +- mocks/db/mocks/__ddl/rtm_near_pos_example.sql | 61 ++ mocks/db/scripts/common.sh | 6 + mocks/db/scripts/configure.sh | 42 ++ mocks/db/scripts/postgres_operations.sh | 130 ++++ mocks/db/scripts/run_postgres.sh | 17 + .../kafka/generate-messages/geoLocations.sh | 25 + .../kafka/static-messages/geoLocations.txt | 3 + quickstart-setup/data/kafka/topics.txt | 1 + quickstart-setup/scripts/utils/lib.sh | 4 + quickstart-setup/setup/kafka/topics.txt | 4 + quickstart-setup/setup/nu/examples.txt | 1 + .../setup/nu/scenarios/RTMClientNearPOS.json | 660 ++++++++++++++++++ .../setup/schema-registry/active-schemas.txt | 4 + .../schemas/GeoLocations.schema.json | 38 + .../GeoLocationsOutputEmail.schema.json | 20 + .../GeoLocationsOutputPush.schema.json | 20 + .../schemas/GeoLocationsOutputSms.schema.json | 20 + 21 files changed, 1112 insertions(+), 8 deletions(-) create mode 100644 mocks/db/mocks/__ddl/rtm_near_pos_example.sql create mode 100755 mocks/db/scripts/common.sh create mode 100755 mocks/db/scripts/configure.sh create mode 100755 mocks/db/scripts/postgres_operations.sh create mode 100755 mocks/db/scripts/run_postgres.sh create mode 100755 quickstart-setup/data/kafka/generate-messages/geoLocations.sh create mode 100644 quickstart-setup/data/kafka/static-messages/geoLocations.txt create mode 100644 quickstart-setup/setup/nu/scenarios/RTMClientNearPOS.json create mode 100644 quickstart-setup/setup/schema-registry/schemas/GeoLocations.schema.json create mode 100644 quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputEmail.schema.json create mode 100644 quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputPush.schema.json create mode 100644 quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputSms.schema.json diff --git a/designer/application-customizations.conf b/designer/application-customizations.conf index 84f92b49..3b1177ff 100644 --- a/designer/application-customizations.conf +++ b/designer/application-customizations.conf @@ -3,11 +3,32 @@ scenarioTypes { "streaming" { # customize Flink streaming scenario type - - modelConfig: { + modelConfig { components { + customersDataEnricher { + providerType: databaseEnricher + config: { + databaseQueryEnricher { + name: "customers-data-query-enricher" + dbPool: ${rtmNearPosExampleDatabasePool} #refers to your database pool definition + } + databaseLookupEnricher { + name: "customers-data-lookup-enricher" + dbPool: ${rtmNearPosExampleDatabasePool} + } + } + } + posDataEnricher { + providerType: databaseEnricher + config: { + databaseLookupEnricher { + name: "pos-data-lookup-enricher" + dbPool: ${rtmNearPosExampleDatabasePool} + } + } + } "customerProfileOffers" { - providerType: "openAPI" + providerType: "openAPI" url: "http://mocks:8080/__admin/files/openapi/CustomerApi.yaml" rootUrl: "http://mocks:8080/" namePattern: "get.*" @@ -23,3 +44,11 @@ scenarioTypes { # customize Lite request-response scenario type } } + +rtmNearPosExampleDatabasePool { + driverClassName: "org.postgresql.Driver" + url: "jdbc:postgresql://mocks:5432/mocks" + username: "mocks" + password: "mocks_pass" + schema: "rtm_near_pos_example" +} diff --git a/docker-compose.yml b/docker-compose.yml index 8e0fb57a..3be6b530 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,7 +13,7 @@ services: deploy: resources: limits: - memory: 128M + memory: 256M ### Quickstart mocks mocks: @@ -86,7 +86,7 @@ services: condition: service_started mocks: condition: service_healthy - expose: + expose: - 8181 healthcheck: test: [ "CMD-SHELL", "curl localhost:8080/api/app/healthCheck" ] diff --git a/flink/Dockerfile b/flink/Dockerfile index e6d7cda9..63f64ec3 100644 --- a/flink/Dockerfile +++ b/flink/Dockerfile @@ -1,11 +1,19 @@ ARG FLINK_VERSION +FROM curlimages/curl:8.9.1 AS lib_provider + +# Adding custom libraries ('add other libraries' section): +# https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#further-customization +WORKDIR /libs +RUN curl -k --output /libs/postgresql-42.6.0.jar https://repo1.maven.org/maven2/org/postgresql/postgresql/42.6.0/postgresql-42.6.0.jar + FROM flink:${FLINK_VERSION} USER root RUN echo '#!/bin/sh' > /ex-docker-entrypoint.sh && \ echo 'export FLINK_PROPERTIES=$(cat /opt/flink/conf/flink-properties.yml) && /docker-entrypoint.sh "$@"' >> /ex-docker-entrypoint.sh && \ chmod +x /ex-docker-entrypoint.sh +COPY --from=lib_provider /libs/ /opt/flink/lib/ USER flink COPY flink-properties.yml /opt/flink/conf/ diff --git a/mocks/Dockerfile b/mocks/Dockerfile index e9d92cd6..939a65cf 100644 --- a/mocks/Dockerfile +++ b/mocks/Dockerfile @@ -1,6 +1,6 @@ FROM holomekc/wiremock-gui:3.8.1 AS wiremock -RUN apt-get update && \ +RUN apt-get update && \ apt-get install -y wget && \ wget -P /var/wiremock/extensions https://repo1.maven.org/maven2/org/wiremock/extensions/wiremock-faker-extension-standalone/0.2.0/wiremock-faker-extension-standalone-0.2.0.jar @@ -12,8 +12,13 @@ CMD ["/sbin/my_init"] # install USER root -RUN apt-get update -y && \ +RUN apt-get install -y --no-install-recommends curl ca-certificates && \ + install -d /usr/share/postgresql-common/pgdg && \ + curl -o /usr/share/postgresql-common/pgdg/apt.postgresql.org.asc --fail https://www.postgresql.org/media/keys/ACCC4CF8.asc && \ + echo "deb [signed-by=/usr/share/postgresql-common/pgdg/apt.postgresql.org.asc] https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \ + apt-get update -y && \ apt -y install openjdk-11-jre-headless && \ + apt -y install postgresql-16 && \ apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* # WIREMOCK @@ -24,7 +29,13 @@ COPY http-service/mocks /home/wiremock/ COPY http-service/scripts /etc/service/http-service RUN mv /etc/service/http-service/run-wiremock.sh /etc/service/http-service/run +# POSTGRES +COPY db /home/postgres +RUN mkdir /etc/service/postgres && \ + mv /home/postgres/scripts/run_postgres.sh /etc/service/postgres/run + EXPOSE 8080 +EXPOSE 5432 HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=30 \ - CMD (curl -f http://localhost:8080/__admin/) || exit 1 \ No newline at end of file + CMD (curl -f http://localhost:8080/__admin/ && pg_isready -d mocks -U mocks) || exit 1 diff --git a/mocks/db/mocks/__ddl/rtm_near_pos_example.sql b/mocks/db/mocks/__ddl/rtm_near_pos_example.sql new file mode 100644 index 00000000..6079bbf0 --- /dev/null +++ b/mocks/db/mocks/__ddl/rtm_near_pos_example.sql @@ -0,0 +1,61 @@ +-- cleanup for the sake idempotent run +drop table if exists contact_history; +drop table if exists blocked_list; +drop table if exists client; +drop table if exists pos; +drop type if exists consent_enum; +drop type if exists client_type_enum; + +create table pos +( + id SERIAL PRIMARY KEY, + location_lat NUMERIC(10, 6) NOT NULL, + location_lon NUMERIC(10, 6) NOT NULL, + open_hour TIME NOT NULL, + close_hour TIME NOT NULL +); + +create type consent_enum AS ENUM ('SMS', 'EMAIL', 'PUSH', 'SMS_EMAIL', 'EMAIL_PUSH', 'SMS_PUSH', 'SMS_EMAIL_PUSH'); +create type client_type_enum AS ENUM ('INDIVIDUAL', 'BUSINESS'); + +create table client +( + id SERIAL PRIMARY KEY, + pos_id SERIAL REFERENCES pos(id) NOT NULL, + msisdn CHAR(11), + email VARCHAR(100), + consents consent_enum, + client_type client_type_enum NOT NULL +); + +create table blocked_list( + client_id INT PRIMARY KEY references client(id) +); + +create table contact_history( + id SERIAL PRIMARY KEY, + client_id INT NOT NULL references client(id), + event_time TIMESTAMP NOT NULL +); + +---- POS +insert into pos(id, location_lat, location_lon, open_hour, close_hour) VALUES (1, 52.237049, 21.017532, '00:00:00', '23:59:59'); +insert into pos(id, location_lat, location_lon, open_hour, close_hour) VALUES (2, 50.049683, 19.944544, '08:00:00', '15:00:00'); +insert into pos(id, location_lat, location_lon, open_hour, close_hour) VALUES (3, 51.107883, 17.038538, '00:00:00', '23:59:59'); + +---- Clients +insert into client(id, pos_id, msisdn, email, consents, client_type) VALUES (1, 1, '48500500500', 'jan.kowalski@nussknacker.io', 'SMS', 'INDIVIDUAL'); +insert into client(id, pos_id, msisdn, email, consents, client_type) VALUES (2, 1, '48500500501', 'zbigniew.paleta@nussknacker.io', 'SMS_EMAIL', 'BUSINESS'); +insert into client(id, pos_id, msisdn, email, consents, client_type) VALUES (3, 1, '48500500502', 'genia.nowak@nussknacker.io', 'PUSH', 'INDIVIDUAL'); +insert into client(id, pos_id, msisdn, email, consents, client_type) VALUES (4, 2, '48500500503', 'klaudia.wisniewska@nussknacker.io', 'SMS_EMAIL_PUSH', 'INDIVIDUAL'); +insert into client(id, pos_id, msisdn, email, consents, client_type) VALUES (5, 2, '48500500504', 'teofil.benc@nussknacker.io', 'EMAIL', 'BUSINESS'); +insert into client(id, pos_id, msisdn, email, consents, client_type) VALUES (6, 2, '48500500505', 'zdzislaw.lecina@nussknacker.io', null, 'BUSINESS'); +insert into client(id, pos_id, msisdn, email, consents, client_type) VALUES (7, 2, '48500500506', 'ksenia.gorka@nussknacker.io', 'EMAIL_PUSH', 'INDIVIDUAL'); +insert into client(id, pos_id, msisdn, email, consents, client_type) VALUES (8, 3, '48500500507', 'anna.milkowska@nussknacker.io', 'SMS_PUSH', 'BUSINESS'); +insert into client(id, pos_id, msisdn, email, consents, client_type) VALUES (9, 3, '48500500508', 'john.doe@nussknacker.io', 'EMAIL', 'INDIVIDUAL'); + +---- Blocked +insert into blocked_list(client_id) values (5); + +-- Contact history +insert into contact_history(client_id, event_time) VALUES (9, NOW() - INTERVAL '1 minutes'); diff --git a/mocks/db/scripts/common.sh b/mocks/db/scripts/common.sh new file mode 100755 index 00000000..d3790029 --- /dev/null +++ b/mocks/db/scripts/common.sh @@ -0,0 +1,6 @@ +#!/bin/bash -e + +strip_extension() { + local file="$1" + echo "${file%.*}" +} diff --git a/mocks/db/scripts/configure.sh b/mocks/db/scripts/configure.sh new file mode 100755 index 00000000..9ea4f667 --- /dev/null +++ b/mocks/db/scripts/configure.sh @@ -0,0 +1,42 @@ +#!/bin/bash -e + +cd "$(dirname "$0")" +. postgres_operations.sh +. common.sh + +init_db() { + init_bg_log_file + init_data_dir + init_custom_conf_dir + configure_pg_config + configure_authentication +} + +configure_users() { + create_user + create_custom_database + grant_privileges + alter_pg_user_pass +} + +execute_ddls() { + local schema_name + local ddl_content + for file in "$PG_DDL_DIR"/*; do + if [ -f "$file" ]; then + schema_name=$(basename "$(strip_extension "$file")") + echo "Creating schema: $schema_name" + create_schema "$PG_USER" "$schema_name" + ddl_content=$(wrap_sql_with_current_schema "$schema_name" "$(cat "$file")") + echo "Executing ddl: $file with content: $ddl_content" + echo "$ddl_content" | execute_sql "" "$PG_USER" "$PG_PASS" + fi + done +} + +init_db +start_bg +wait_until_started +configure_users +execute_ddls +stop diff --git a/mocks/db/scripts/postgres_operations.sh b/mocks/db/scripts/postgres_operations.sh new file mode 100755 index 00000000..37d79b9a --- /dev/null +++ b/mocks/db/scripts/postgres_operations.sh @@ -0,0 +1,130 @@ +#!/bin/bash -e + +init_data_dir() { + if [ ! -e "$PG_DATA_DIR" ]; then + mkdir -p "$PG_DATA_DIR" + chown postgres "$PG_DATA_DIR" + /sbin/setuser postgres "$PG_BIN_DIR"/initdb -D "$PG_DATA_DIR" + fi +} + +init_custom_conf_dir() { + if [ ! -e "$PG_CUSTOM_CONF_DIR" ]; then + mkdir -p "$PG_CUSTOM_CONF_DIR" + chown postgres "$PG_CUSTOM_CONF_DIR" + fi +} + +configure_authentication() { + if [ ! -f "$PG_HBA_FILE" ]; then + cp "$PG_DATA_DIR/pg_hba.conf" "$PG_HBA_FILE" + chown postgres "$PG_HBA_FILE" + echo "#" >> "$PG_HBA_FILE" + echo "host all all all md5" >> "$PG_HBA_FILE" + fi +} + +configure_pg_config() { + if [ ! -f "$PG_CONF_FILE" ]; then + cp "$PG_DATA_DIR/postgresql.conf" "$PG_CONF_FILE" + chown postgres "$PG_CONF_FILE" + echo "#" >> "$PG_CONF_FILE" + echo "listen_addresses = '*'" >> "$PG_CONF_FILE" + fi +} + +init_bg_log_file() { + local log_file + log_file="/var/log/postgres_bg.log" + if [ ! -f "$log_file" ]; then + touch "$log_file" + chown postgres "$log_file" + fi +} + +wait_until_started() { + local max_startup_timeout_in_s=10 + while ! pg_isready >/dev/null 2>&1; do + sleep 1 + max_startup_timeout_in_s=$((max_startup_timeout_in_s - 1)) + if ((max_startup_timeout_in_s <= 0)); then + echo "Postgres is not started" + exit 1 + fi + done + echo "Postgres started" +} + +create_custom_database() { + local db_name="${1:-$PG_DB_NAME}" + DB_EXISTS=$(echo "SELECT 1 FROM pg_database WHERE datname='$db_name'" | execute_sql "" "postgres" "" "-tA") + if [ "$DB_EXISTS" != "1" ]; then + echo "CREATE DATABASE \"$db_name\"" | execute_sql "" "postgres" "" + else + echo "DB already exists - creation skipped" + fi +} + +create_user() { + ROLE_EXISTS=$(echo "SELECT 1 FROM pg_roles WHERE rolname='$PG_USER'" | execute_sql "" "postgres" "" "-tA") + if [ "$ROLE_EXISTS" != "1" ]; then + echo "CREATE ROLE \"${PG_USER}\" WITH LOGIN PASSWORD '${PG_PASS}';" | execute_sql "" "postgres" "" + else + echo "ROLE already exists - creation skipped" + fi +} + +grant_privileges() { + local user="${1:-$PG_USER}" + local db_name="${2:-$PG_DB_NAME}" + execute_sql "" "postgres" "" < NOW() - INTERVAL '5 minutes'" + } + }, + { + "name": "Cache TTL", + "expression": { + "language": "spel", + "expression": "T(java.time.Duration).parse('PT1M')" + } + }, + { + "name": "arg1", + "expression": { + "language": "spel", + "expression": "#input.clientId" + } + } + ] + }, + "output": "contact_history_entry", + "additionalFields": { + "description": "Wer are fetching given client contact history from the last X days.\nFor the purpose of example we are fetching entries from the last 5 minutes instead of days or longer period.", + "layoutData": { + "x": 360, + "y": 180 + } + }, + "type": "Enricher" + }, + { + "nextFalse": [ + ], + "id": "client is not contacted in last X Days", + "expression": { + "language": "spel", + "expression": "#contact_history_entry.empty" + }, + "isDisabled": null, + "additionalFields": { + "description": "The main goal is not to send too many notifications to the given client.", + "layoutData": { + "x": 360, + "y": 360 + } + }, + "type": "Filter" + }, + { + "id": "enrich with data from list of blocked clients", + "service": { + "id": "customers-data-lookup-enricher", + "parameters": [ + { + "name": "Table", + "expression": { + "language": "spel", + "expression": "'blocked_list'" + } + }, + { + "name": "Cache TTL", + "expression": { + "language": "spel", + "expression": "T(java.time.Duration).parse('PT1M')" + } + }, + { + "name": "Key column", + "expression": { + "language": "spel", + "expression": "'client_id'" + } + }, + { + "name": "Key value", + "expression": { + "language": "spel", + "expression": "#input.clientId" + } + } + ] + }, + "output": "is_client_on_blocked_list", + "additionalFields": { + "description": null, + "layoutData": { + "x": 360, + "y": 540 + } + }, + "type": "Enricher" + }, + { + "nextFalse": [ + ], + "id": "client is not blocked", + "expression": { + "language": "spel", + "expression": "#is_client_on_blocked_list == null" + }, + "isDisabled": null, + "additionalFields": { + "description": null, + "layoutData": { + "x": 360, + "y": 720 + } + }, + "type": "Filter" + }, + { + "id": "enrich with client data", + "service": { + "id": "customers-data-lookup-enricher", + "parameters": [ + { + "name": "Table", + "expression": { + "language": "spel", + "expression": "'client'" + } + }, + { + "name": "Cache TTL", + "expression": { + "language": "spel", + "expression": "T(java.time.Duration).parse('PT5M')" + } + }, + { + "name": "Key column", + "expression": { + "language": "spel", + "expression": "'id'" + } + }, + { + "name": "Key value", + "expression": { + "language": "spel", + "expression": "#input.clientId" + } + } + ] + }, + "output": "clientData", + "additionalFields": { + "description": null, + "layoutData": { + "x": 360, + "y": 900 + } + }, + "type": "Enricher" + }, + { + "id": "extract consents from client data", + "varName": "consents", + "value": { + "language": "spel", + "expression": "#UTIL.split(#clientData.consents, '_')" + }, + "additionalFields": { + "description": "We are transforming and then assigning consents to variable consents", + "layoutData": { + "x": 360, + "y": 1080 + } + }, + "type": "Variable" + }, + { + "nextFalse": [ + ], + "id": "client has marketing consents", + "expression": { + "language": "spel", + "expression": "NOT #consents.isEmpty" + }, + "isDisabled": false, + "additionalFields": { + "description": "#clientData?.consents?", + "layoutData": { + "x": 360, + "y": 1260 + } + }, + "type": "Filter" + }, + { + "id": "enrich with POS info", + "service": { + "id": "pos-data-lookup-enricher", + "parameters": [ + { + "name": "Table", + "expression": { + "language": "spel", + "expression": "'pos'" + } + }, + { + "name": "Cache TTL", + "expression": { + "language": "spel", + "expression": "T(java.time.Duration).parse('PT10M')" + } + }, + { + "name": "Key column", + "expression": { + "language": "spel", + "expression": "'id'" + } + }, + { + "name": "Key value", + "expression": { + "language": "spel", + "expression": "#clientData.pos_id" + } + } + ] + }, + "output": "pos_data", + "additionalFields": { + "description": null, + "layoutData": { + "x": 360, + "y": 1440 + } + }, + "type": "Enricher" + }, + { + "nextFalse": [ + ], + "id": "POS is currently opened", + "expression": { + "language": "spel", + "expression": "#pos_data != null AND #pos_data.open_hour != null AND #pos_data.close_hour != null AND #DATE.isBetween(#DATE.nowAtZone('Europe/Warsaw').toLocalTime, #pos_data.open_hour.toLocalTime(), #pos_data.close_hour.toLocalTime())" + }, + "isDisabled": null, + "additionalFields": { + "description": "POS open weekdays should be checked too", + "layoutData": { + "x": 360, + "y": 1620 + } + }, + "type": "Filter" + }, + { + "id": "Count when distance to POS is lower than 1 km", + "outputVar": "agg_out", + "nodeType": "aggregate-session", + "parameters": [ + { + "name": "groupBy", + "expression": { + "language": "spel", + "expression": "#input.clientId + ''" + } + }, + { + "name": "aggregator", + "expression": { + "language": "spel", + "expression": "#AGG.countWhen" + } + }, + { + "name": "aggregateBy", + "expression": { + "language": "spel", + "expression": "#GEO.distanceInKm(#input.geo.lat, #input.geo.lon, #pos_data.location_lat, #pos_data.location_lon) <= 1" + } + }, + { + "name": "endSessionCondition", + "expression": { + "language": "spel", + "expression": "false" + } + }, + { + "name": "sessionTimeout", + "expression": { + "language": "spel", + "expression": "T(java.time.Duration).parse('PT10S')" + } + }, + { + "name": "emitWhen", + "expression": { + "language": "spel", + "expression": "T(pl.touk.nussknacker.engine.flink.util.transformer.aggregate.SessionWindowTrigger).OnEvent" + } + } + ], + "additionalFields": { + "description": "We are counting events when a customer is within one kilometre of the point of sale assigned to that customer. For the purpose of the example we set the timeout to 10 seconds, in the real scenario the value should be higher.", + "layoutData": { + "x": 360, + "y": 1800 + } + }, + "type": "CustomNode" + }, + { + "nextFalse": [ + ], + "id": "2 events received with location near POS from session of X time", + "expression": { + "language": "spel", + "expression": "#agg_out == 2" + }, + "isDisabled": null, + "additionalFields": { + "description": "If we receive at least 2 events where the customer location meets our requirements within 10 seconds, we process further.", + "layoutData": { + "x": 360, + "y": 1980 + } + }, + "type": "Filter" + }, + { + "id": "decisions priorities and notification content by consent and client type", + "service": { + "id": "decision-table", + "parameters": [ + { + "name": "Decision Table", + "expression": { + "language": "tabularDataDefinition", + "expression": "{\n \"rows\": [\n [\n \"SMS\",\n \"INDIVIDUAL\",\n \"3\",\n \"You are close to our shop come and see new offers!\"\n ],\n [\n \"EMAIL\",\n \"INDIVIDUAL\",\n \"1\",\n \"You are close to our shop come and see new offers!\"\n ],\n [\n \"PUSH\",\n \"INDIVIDUAL\",\n \"2\",\n \"You are close to our shop come and see new offers!\"\n ],\n [\n \"SMS\",\n \"BUSINESS\",\n \"2\",\n \"You are close to our shop come and see new business offers!\"\n ],\n [\n \"EMAIL\",\n \"BUSINESS\",\n \"3\",\n \"You are close to our shop come and see new business offers!\"\n ],\n [\n \"PUSH\",\n \"BUSINESS\",\n \"1\",\n \"You are close to our shop come and see new business offers!\"\n ]\n ],\n \"columns\": [\n {\n \"name\": \"Consent\",\n \"type\": \"java.lang.String\"\n },\n {\n \"name\": \"Client type\",\n \"type\": \"java.lang.String\"\n },\n {\n \"name\": \"Priority\",\n \"type\": \"java.lang.Integer\"\n },\n {\n \"name\": \"Notification content\",\n \"type\": \"java.lang.String\"\n }\n ]\n}" + } + }, + { + "name": "Match condition", + "expression": { + "language": "spel", + "expression": "#consents.^[#this == #ROW.Consent] != null AND #clientData.client_type == #ROW['Client type']" + } + } + ] + }, + "output": "decisionsWithPrioritiesAndNotificationContent", + "additionalFields": { + "description": "In the decision table, we define the business rules that we are trying to match with the 'matching condition' and then pass the matched decisions on.", + "layoutData": { + "x": 360, + "y": 2160 + } + }, + "type": "Enricher" + }, + { + "id": "choose best decision", + "varName": "decision", + "value": { + "language": "spel", + "expression": "#decisionsWithPrioritiesAndNotificationContent?.^[#this.Priority == 3] ?: (#decisionsWithPrioritiesAndNotificationContent?.^[#this.Priority == 2] ?: (#decisionsWithPrioritiesAndNotificationContent?.^[#this.Priority == 1]))" + }, + "additionalFields": { + "description": "We can apply various business rules to choose best decisions here", + "layoutData": { + "x": 360, + "y": 2340 + } + }, + "type": "Variable" + }, + { + "defaultNext": [ + ], + "nexts": [ + { + "expression": { + "language": "spel", + "expression": "#decision?.Consent == \"SMS\"" + }, + "nodes": [ + { + "id": "SMS", + "ref": { + "typ": "kafka", + "parameters": [ + { + "name": "Topic", + "expression": { + "language": "spel", + "expression": "'GeoLocationsOutputSms'" + } + }, + { + "name": "Schema version", + "expression": { + "language": "spel", + "expression": "'latest'" + } + }, + { + "name": "Key", + "expression": { + "language": "spel", + "expression": "" + } + }, + { + "name": "Raw editor", + "expression": { + "language": "spel", + "expression": "false" + } + }, + { + "name": "msisdn", + "expression": { + "language": "spel", + "expression": "#clientData.msisdn" + } + }, + { + "name": "content", + "expression": { + "language": "spel", + "expression": "#decision['Notification content']" + } + } + ] + }, + "endResult": null, + "isDisabled": null, + "additionalFields": { + "description": null, + "layoutData": { + "x": 0, + "y": 2700 + } + }, + "type": "Sink" + } + ] + }, + { + "expression": { + "language": "spel", + "expression": "#decision?.Consent == \"EMAIL\"" + }, + "nodes": [ + { + "id": "EMAIL", + "ref": { + "typ": "kafka", + "parameters": [ + { + "name": "Topic", + "expression": { + "language": "spel", + "expression": "'GeoLocationsOutputEmail'" + } + }, + { + "name": "Schema version", + "expression": { + "language": "spel", + "expression": "'latest'" + } + }, + { + "name": "Key", + "expression": { + "language": "spel", + "expression": "" + } + }, + { + "name": "Raw editor", + "expression": { + "language": "spel", + "expression": "false" + } + }, + { + "name": "email", + "expression": { + "language": "spel", + "expression": "#clientData.email" + } + }, + { + "name": "content", + "expression": { + "language": "spel", + "expression": "#decision['Notification content']" + } + } + ] + }, + "endResult": null, + "isDisabled": null, + "additionalFields": { + "description": null, + "layoutData": { + "x": 360, + "y": 2700 + } + }, + "type": "Sink" + } + ] + }, + { + "expression": { + "language": "spel", + "expression": "#decision?.Consent == 'PUSH'" + }, + "nodes": [ + { + "id": "PUSH", + "ref": { + "typ": "kafka", + "parameters": [ + { + "name": "Topic", + "expression": { + "language": "spel", + "expression": "'GeoLocationsOutputPush'" + } + }, + { + "name": "Schema version", + "expression": { + "language": "spel", + "expression": "'latest'" + } + }, + { + "name": "Key", + "expression": { + "language": "spel", + "expression": "" + } + }, + { + "name": "Raw editor", + "expression": { + "language": "spel", + "expression": "false" + } + }, + { + "name": "msisdn", + "expression": { + "language": "spel", + "expression": "#clientData.msisdn" + } + }, + { + "name": "content", + "expression": { + "language": "spel", + "expression": "#decision['Notification content']" + } + } + ] + }, + "endResult": null, + "isDisabled": null, + "additionalFields": { + "description": null, + "layoutData": { + "x": 720, + "y": 2700 + } + }, + "type": "Sink" + } + ] + } + ], + "id": "split by notification type", + "expression": null, + "exprVal": null, + "additionalFields": { + "description": "We direct the results to the appropriate topic", + "layoutData": { + "x": 360, + "y": 2520 + } + }, + "type": "Switch" + } + ], + "additionalBranches": [ + ] +} \ No newline at end of file diff --git a/quickstart-setup/setup/schema-registry/active-schemas.txt b/quickstart-setup/setup/schema-registry/active-schemas.txt index f9f28041..24ecb09e 100644 --- a/quickstart-setup/setup/schema-registry/active-schemas.txt +++ b/quickstart-setup/setup/schema-registry/active-schemas.txt @@ -5,3 +5,7 @@ ProcessedTransactions.schema.json Transactions.schema.json CustomerEvents.schema.json OfferProposalsBasedOnCustomerEvents.schema.json +GeoLocations.schema.json +GeoLocationsOutputEmail.schema.json +GeoLocationsOutputPush.schema.json +GeoLocationsOutputSms.schema.json diff --git a/quickstart-setup/setup/schema-registry/schemas/GeoLocations.schema.json b/quickstart-setup/setup/schema-registry/schemas/GeoLocations.schema.json new file mode 100644 index 00000000..6e2aaa34 --- /dev/null +++ b/quickstart-setup/setup/schema-registry/schemas/GeoLocations.schema.json @@ -0,0 +1,38 @@ +{ + "type": "object", + "additionalProperties": false, + "$schema": "http://json-schema.org/draft-07/schema", + "required": [ + "clientId", + "geo", + "eventTime" + ], + "properties": { + "geo": { + "type": "object", + "description": "This property contains geographical coordinates (latitude and longitude).", + "required": [ + "lat", + "lon" + ], + "properties": { + "lon": { + "type": "number", + "description": "Longitude coordinate." + }, + "lat": { + "type": "number", + "description": "Latitude coordinate." + } + } + }, + "clientId": { + "type": "integer", + "description": "A unique identifier for the client." + }, + "eventTime": { + "type": "integer", + "description": "A timestamp indicating the time of the event." + } + } +} diff --git a/quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputEmail.schema.json b/quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputEmail.schema.json new file mode 100644 index 00000000..2d550d39 --- /dev/null +++ b/quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputEmail.schema.json @@ -0,0 +1,20 @@ +{ + "type": "object", + "additionalProperties": false, + "description": "A schema for a topic used to send emails.", + "$schema": "http://json-schema.org/draft-07/schema", + "required": [ + "email", + "content" + ], + "properties": { + "email": { + "type": "string", + "description": "The email address of the client." + }, + "content": { + "type": "string", + "description": "The content or message to be associated with the email." + } + } +} diff --git a/quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputPush.schema.json b/quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputPush.schema.json new file mode 100644 index 00000000..d9d2d2e6 --- /dev/null +++ b/quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputPush.schema.json @@ -0,0 +1,20 @@ +{ + "type": "object", + "additionalProperties": false, + "description": "A schema for a topic used to send pushes.", + "$schema": "http://json-schema.org/draft-07/schema", + "required": [ + "msisdn", + "content" + ], + "properties": { + "msisdn": { + "type": "string", + "description": "The phone number of the client." + }, + "content": { + "type": "string", + "description": "The content or message to be associated with the email." + } + } +} diff --git a/quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputSms.schema.json b/quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputSms.schema.json new file mode 100644 index 00000000..c86f7635 --- /dev/null +++ b/quickstart-setup/setup/schema-registry/schemas/GeoLocationsOutputSms.schema.json @@ -0,0 +1,20 @@ +{ + "type": "object", + "additionalProperties": false, + "description": "A schema for a topic used to send smses.", + "$schema": "http://json-schema.org/draft-07/schema", + "required": [ + "msisdn", + "content" + ], + "properties": { + "msisdn": { + "type": "string", + "description": "The phone number of the client." + }, + "content": { + "type": "string", + "description": "The content or message to be associated with the email." + } + } +}