Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync main branch with Apache main branch #37

Merged
merged 11 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
9 changes: 9 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Apache KIE
Copyright 2023 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

The Initial Developer of some parts of the framework, which are copied from, derived from, or
inspired by KIE (Knowledge Is Everthing) group, is Red Hat, Inc (https://www.redhat.com/).
Copyright Red Hat, Inc. and/or its affiliates.. All Rights Reserved.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
*/
package org.kie.kogito.monitoring.core.common.process;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.kie.api.event.process.ErrorEvent;
import org.kie.api.event.process.ProcessCompletedEvent;
import org.kie.api.event.process.ProcessNodeLeftEvent;
import org.kie.api.event.process.ProcessStartedEvent;
Expand All @@ -31,12 +32,15 @@
import org.kie.kogito.KogitoGAV;
import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener;
import org.kie.kogito.internal.process.runtime.KogitoNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoProcessInstance;
import org.kie.kogito.internal.process.runtime.KogitoWorkItemNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcessInstance;
import org.kie.kogito.internal.utils.KogitoTags;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Counter.Builder;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
Expand All @@ -56,102 +60,129 @@ public MetricsProcessEventListener(String identifier, KogitoGAV gav, MeterRegist
this.meterRegistry = meterRegistry;
}

private Counter getNumberOfProcessInstancesStartedCounter(String appId, String processId) {
return Counter
.builder("kogito_process_instance_started_total")
.description("Started Process Instances")
.tags(Arrays.asList(Tag.of("app_id", appId), Tag.of("process_id", processId), Tag.of("artifactId", gav.getArtifactId()), Tag.of("version", gav.getVersion())))
.register(meterRegistry);
protected Counter buildCounter(String name, String description, String processId, Tag... tags) {
Builder builder = Counter.builder(name)
.description(description)
.tag("app_id", identifier).tag("process_id", processId).tag("artifactId", gav.getArtifactId()).tag("version", gav.getVersion());
for (Tag tag : tags) {
builder.tag(tag.getKey(), tag.getValue());
}
return builder.register(meterRegistry);
}

private Counter getNumberOfSLAsViolatedCounter(String appId, String processId, String nodeName) {
return Counter
.builder("kogito_process_instance_sla_violated_total")
.description("Process Instances SLA Violated")
.tags(Arrays.asList(Tag.of("app_id", appId), Tag.of("process_id", processId), Tag.of("node_name", nodeName), Tag.of("artifactId", gav.getArtifactId()),
Tag.of("version", gav.getVersion())))
.register(meterRegistry);
protected AtomicInteger buildGauge(String name, String description, String processId, Tag... tags) {
AtomicInteger atomicInteger = new AtomicInteger(0);
io.micrometer.core.instrument.Gauge.Builder<AtomicInteger> builder = Gauge.builder(name, atomicInteger, AtomicInteger::doubleValue)
.description(description)
.tag("app_id", identifier).tag("process_id", processId).tag("artifactId", gav.getArtifactId()).tag("version", gav.getVersion());
for (Tag tag : tags) {
builder.tag(tag.getKey(), tag.getValue());
}
builder.register(meterRegistry);
return atomicInteger;
}

private Counter getNumberOfProcessInstancesCompletedCounter(String appId, String processId, String nodeName) {
return Counter
.builder("kogito_process_instance_completed_total")
.description("Completed Process Instances")
.tags(Arrays.asList(Tag.of("app_id", appId), Tag.of("process_id", processId), Tag.of("node_name", nodeName), Tag.of("artifactId", gav.getArtifactId()),
Tag.of("version", gav.getVersion())))
.register(meterRegistry);
protected DistributionSummary buildDistributionSummary(String name, String description, Tag... tags) {
io.micrometer.core.instrument.DistributionSummary.Builder builder = DistributionSummary.builder(name)
.description(description).tag("artifactId", gav.getArtifactId()).tag("version", gav.getVersion());
for (Tag tag : tags) {
builder.tag(tag.getKey(), tag.getValue());
}
return builder.register(meterRegistry);
}

private AtomicInteger getRunningProcessInstancesGauge(String appId, String processId) {
if (gaugeMap.containsKey(appId + processId)) {
return gaugeMap.get(appId + processId);
}
AtomicInteger atomicInteger = new AtomicInteger(0);
Gauge.builder("kogito_process_instance_running_total", atomicInteger, AtomicInteger::doubleValue)
.description("Running Process Instances")
.tags(Arrays.asList(Tag.of("app_id", appId), Tag.of("process_id", processId), Tag.of("artifactId", gav.getArtifactId()), Tag.of("version", gav.getVersion())))
.register(meterRegistry);
gaugeMap.put(appId + processId, atomicInteger);
return atomicInteger;
private Counter getNumberOfProcessInstancesStartedCounter(String processId) {
return buildCounter("kogito_process_instance_started_total", "Started Process Instances", processId);
}

private Counter getErrorCounter(String processId, String errorMessage) {
return buildCounter("kogito_process_instance_error", "Number of errors that has occurred", processId, Tag.of("error_message", errorMessage));
}

private Counter getNumberOfSLAsViolatedCounter(String processId, String nodeName) {
return buildCounter("kogito_process_instance_sla_violated_total", "Number of SLA violations that has ocurred", processId, Tag.of("node_name", nodeName));
}

private Counter getNumberOfProcessInstancesCompletedCounter(String processId, String state) {
return buildCounter("kogito_process_instance_completed_total", "Completed Process Instances", processId, Tag.of("process_state", state));
}

private AtomicInteger getRunningProcessInstancesGauge(String processId) {
return gaugeMap.computeIfAbsent(identifier + processId, k -> buildGauge("kogito_process_instance_running_total", "Running Process Instances", processId));
}

private DistributionSummary getProcessInstancesDurationSummary(String appId, String processId) {
return DistributionSummary.builder("kogito_process_instance_duration_seconds")
.description("Process Instances Duration")
.tags(Arrays.asList(Tag.of("app_id", appId), Tag.of("process_id", processId), Tag.of("artifactId", gav.getArtifactId()), Tag.of("version", gav.getVersion())))
.register(meterRegistry);
private DistributionSummary getProcessInstancesDurationSummary(String processId) {
return buildDistributionSummary("kogito_process_instance_duration_seconds",
"Process Instances Duration", Tag.of("process_id", processId), Tag.of("app_id", identifier));
}

private DistributionSummary getWorkItemsDurationSummary(String name) {
return DistributionSummary.builder("kogito_work_item_duration_seconds")
.description("Work Items Duration")
.tags(Arrays.asList(Tag.of("name", name), Tag.of("artifactId", gav.getArtifactId()), Tag.of("version", gav.getVersion())))
.register(meterRegistry);
return buildDistributionSummary("kogito_work_item_duration_seconds",
"Work Items Duration", Tag.of("name", name));
}

private DistributionSummary getNodeInstancesDurationSummary(String processId, String nodeName) {
return buildDistributionSummary("kogito_node_instance_duration_milliseconds", "Relevant nodes duration in milliseconds", Tag.of("process_id", processId),
Tag.of("node_name", nodeName));
}

protected void recordRunningProcessInstance(String containerId, String processId) {
getRunningProcessInstancesGauge(containerId, processId).incrementAndGet();
protected void recordRunningProcessInstance(String processId) {
getRunningProcessInstancesGauge(processId).incrementAndGet();
}

protected static double millisToSeconds(long millis) {
return millis / 1000.0;
return TimeUnit.MILLISECONDS.toSeconds(millis);
}

@Override
public void afterProcessStarted(ProcessStartedEvent event) {
LOGGER.debug("After process started event: {}", event);
final ProcessInstance processInstance = event.getProcessInstance();
getNumberOfProcessInstancesStartedCounter(identifier, processInstance.getProcessId()).increment();
recordRunningProcessInstance(identifier, processInstance.getProcessId());
getNumberOfProcessInstancesStartedCounter(processInstance.getProcessId()).increment();
recordRunningProcessInstance(processInstance.getProcessId());
}

@Override
public void afterProcessCompleted(ProcessCompletedEvent event) {
LOGGER.debug("After process completed event: {}", event);
final KogitoWorkflowProcessInstance processInstance = (KogitoWorkflowProcessInstance) event.getProcessInstance();
getRunningProcessInstancesGauge(identifier, processInstance.getProcessId()).decrementAndGet();
getRunningProcessInstancesGauge(processInstance.getProcessId()).decrementAndGet();

getNumberOfProcessInstancesCompletedCounter(identifier, processInstance.getProcessId(), String.valueOf(processInstance.getState())).increment();
getNumberOfProcessInstancesCompletedCounter(processInstance.getProcessId(), fromState(processInstance.getState())).increment();

if (processInstance.getStartDate() != null) {
final double duration = millisToSeconds(processInstance.getEndDate().getTime() - processInstance.getStartDate().getTime());
getProcessInstancesDurationSummary(identifier, processInstance.getProcessId()).record(duration);
getProcessInstancesDurationSummary(processInstance.getProcessId()).record(duration);
LOGGER.debug("Process Instance duration: {}s", duration);
}
}

@Override
public void onError(ErrorEvent event) {
LOGGER.debug("After Error event: {}", event);
final KogitoWorkflowProcessInstance processInstance = (KogitoWorkflowProcessInstance) event.getProcessInstance();
getErrorCounter(processInstance.getProcessId(), processInstance.getErrorMessage()).increment();
}

@Override
public void beforeNodeLeft(ProcessNodeLeftEvent event) {
LOGGER.debug("Before Node left event: {}", event);
final KogitoNodeInstance nodeInstance = (KogitoNodeInstance) event.getNodeInstance();
if (nodeInstance instanceof KogitoWorkItemNodeInstance) {
KogitoWorkItemNodeInstance wi = (KogitoWorkItemNodeInstance) nodeInstance;
if (wi.getTriggerTime() != null) {
final String name = (String) wi.getWorkItem().getParameters().getOrDefault("TaskName", wi.getWorkItem().getName());
final double duration = millisToSeconds(wi.getLeaveTime().getTime() - wi.getTriggerTime().getTime());
getWorkItemsDurationSummary(name).record(duration);
LOGGER.debug("Work Item {}, duration: {}s", name, duration);
}
recordNodeDuration(getWorkItemsDurationSummary((String) wi.getWorkItem().getParameters().getOrDefault("TaskName", wi.getWorkItem().getName())), nodeInstance, TimeUnit.SECONDS);
}
String nodeName = (String) nodeInstance.getNode().getMetaData().get(KogitoTags.METRIC_NAME_METADATA);
if (nodeName != null) {
recordNodeDuration(getNodeInstancesDurationSummary(event.getProcessInstance().getProcessId(), nodeName), nodeInstance, TimeUnit.MILLISECONDS);
}
}

private void recordNodeDuration(DistributionSummary summary, KogitoNodeInstance instance, TimeUnit target) {
if (instance.getTriggerTime() != null) {
double duration = target.convert(instance.getLeaveTime().getTime() - instance.getTriggerTime().getTime(), TimeUnit.MILLISECONDS);
summary.record(duration);
LOGGER.debug("Recorded {} {} because of node {} for summary {}", duration, target, instance.getNode().getName(), summary.getId().getName());
}
}

Expand All @@ -160,7 +191,22 @@ public void afterSLAViolated(SLAViolatedEvent event) {
LOGGER.debug("After SLA violated event: {}", event);
final ProcessInstance processInstance = event.getProcessInstance();
if (processInstance != null && event.getNodeInstance() != null) {
getNumberOfSLAsViolatedCounter(identifier, processInstance.getProcessId(), event.getNodeInstance().getNodeName()).increment();
getNumberOfSLAsViolatedCounter(processInstance.getProcessId(), event.getNodeInstance().getNodeName()).increment();
}
}

private static String fromState(int state) {
switch (state) {
case KogitoProcessInstance.STATE_ABORTED:
return "Aborted";
case KogitoProcessInstance.STATE_COMPLETED:
return "Completed";
case KogitoProcessInstance.STATE_ERROR:
return "Error";
default:
case KogitoProcessInstance.STATE_ACTIVE:
return "Active";
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;

public class PostgreSQLCorrelationRepository {
public class JDBCCorrelationRepository {

static final String INSERT = "INSERT INTO correlation_instances (id, encoded_correlation_id, correlated_id, correlation) VALUES (?, ?, ?, ?::json)";
static final String INSERT = "INSERT INTO correlation_instances (id, encoded_correlation_id, correlated_id, correlation) VALUES (?, ?, ?, ?)";
static final String DELETE = "DELETE FROM correlation_instances WHERE encoded_correlation_id = ?";
private static final String FIND_BY_ENCODED_ID = "SELECT correlated_id, correlation FROM correlation_instances WHERE encoded_correlation_id = ?";
private static final String FIND_BY_CORRELATED_ID = "SELECT encoded_correlation_id, correlation FROM correlation_instances WHERE correlated_id = ?";

private DataSource dataSource;
private ObjectMapper objectMapper;

public PostgreSQLCorrelationRepository(DataSource dataSource) {
public JDBCCorrelationRepository(DataSource dataSource) {
this.dataSource = dataSource;
this.objectMapper = ObjectMapperFactory.get().copy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import org.kie.kogito.correlation.CorrelationService;
import org.kie.kogito.event.correlation.MD5CorrelationEncoder;

public class PostgreSQLCorrelationService implements CorrelationService {
public class JDBCCorrelationService implements CorrelationService {

private PostgreSQLCorrelationRepository repository;
private JDBCCorrelationRepository repository;
private CorrelationEncoder correlationEncoder;

public PostgreSQLCorrelationService(DataSource dataSource) {
this.repository = new PostgreSQLCorrelationRepository(dataSource);
public JDBCCorrelationService(DataSource dataSource) {
this.repository = new JDBCCorrelationRepository(dataSource);
this.correlationEncoder = new MD5CorrelationEncoder();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
CREATE TABLE process_instances
(
id CHAR(36) NOT NULL,
payload BLOB NOT NULL,
process_id VARCHAR(4000) NOT NULL,
version BIGINT(19),
process_version VARCHAR(4000),
id character(36) NOT NULL,
payload varbinary(1000000) NOT NULL,
process_id character varying(4000) NOT NULL,
version bigint,
process_version character varying(4000),
CONSTRAINT process_instances_pkey PRIMARY KEY (id)
);
CREATE INDEX idx_process_instances_process_id ON process_instances (process_id, id, process_version);
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE correlation_instances
(
id character(36) NOT NULL,
encoded_correlation_id character varying(36) NOT NULL UNIQUE,
correlated_id character varying(36) NOT NULL,
correlation character varying(8000) NOT NULL,
version bigint,
CONSTRAINT correlation_instances_pkey PRIMARY KEY (id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE correlation_instances
ALTER COLUMN correlation TYPE character varying;
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.kie.kogito.correlation.CompositeCorrelation;
import org.kie.kogito.correlation.CorrelationInstance;
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.persistence.jdbc.correlation.PostgreSQLCorrelationService;
import org.kie.kogito.persistence.jdbc.correlation.JDBCCorrelationService;
import org.kie.kogito.testcontainers.KogitoPostgreSqlContainer;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.JdbcDatabaseContainer;
Expand All @@ -42,15 +42,15 @@ public class JDBCCorrelationServiceIT {
@Container
private static final KogitoPostgreSqlContainer PG_CONTAINER = new KogitoPostgreSqlContainer();
private static PGSimpleDataSource dataSource;
private static PostgreSQLCorrelationService correlationService;
private static JDBCCorrelationService correlationService;

@BeforeAll
public static void setUp() {
dataSource = new PGSimpleDataSource();
dataSource.setUrl(PG_CONTAINER.getJdbcUrl());
dataSource.setUser(PG_CONTAINER.getUsername());
dataSource.setPassword(PG_CONTAINER.getPassword());
correlationService = new PostgreSQLCorrelationService(dataSource);
correlationService = new JDBCCorrelationService(dataSource);
//create table
// DDLRunner.init(new GenericRepository(dataSource), true);
initMigration(PG_CONTAINER, "postgresql");
Expand Down
1 change: 0 additions & 1 deletion api/kogito-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>provided</scope>
<version>4.0.1</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ public static boolean isNotEmpty(String value) {
return !isEmpty(value);
}

public static String sanitizeString(String string) {
return string.replaceAll("\"", "\\\\\"");
}

public static String sanitizeClassName(String className) {
return sanitizeJavaName(className, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ public class KogitoTags {
public static final String INPUT_TAG = "input";
public static final String OUTPUT_TAG = "output";

public static final String METRIC_NAME_METADATA = "MetricName";

}
Loading
Loading