From 280b2350ef132a41dcd9da462c844244b62bed0c Mon Sep 17 00:00:00 2001 From: Remco Westerhoud Date: Thu, 9 Nov 2023 15:09:10 +0100 Subject: [PATCH 1/3] test: verify broadcasting of signals Verify we can broadcast signals and retrieve a response with a key (cherry picked from commit 766a700ce658497148b81b3046ac95c6ff68f627) --- .../process/test/engine/EngineClientTest.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java index 230993240..600d296b8 100644 --- a/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java +++ b/engine/src/test/java/io/camunda/zeebe/process/test/engine/EngineClientTest.java @@ -15,6 +15,7 @@ import io.camunda.zeebe.client.api.command.ClientException; import io.camunda.zeebe.client.api.response.ActivateJobsResponse; import io.camunda.zeebe.client.api.response.ActivatedJob; +import io.camunda.zeebe.client.api.response.BroadcastSignalResponse; import io.camunda.zeebe.client.api.response.BrokerInfo; import io.camunda.zeebe.client.api.response.DeploymentEvent; import io.camunda.zeebe.client.api.response.EvaluateDecisionResponse; @@ -988,4 +989,55 @@ void shouldIncreaseTheTime() { assertThat(processCompleted).isNotEmpty(); }); } + + @Test + void shouldBroadcastSignal() { + // given + zeebeClient + .newDeployResourceCommand() + .addProcessModel( + Bpmn.createExecutableProcess("simpleProcess") + .startEvent() + .signal("signal") + .endEvent() + .done(), + "simpleProcess.bpmn") + .send() + .join(); + + // when + final BroadcastSignalResponse response = + zeebeClient.newBroadcastSignalCommand().signalName("signal").send().join(); + + // then + assertThat(response.getKey()).isPositive(); + } + + @Test + void shouldBroadcastSignalWithVariables() { + // given + zeebeClient + .newDeployResourceCommand() + .addProcessModel( + Bpmn.createExecutableProcess("simpleProcess") + .startEvent() + .signal("signal") + .endEvent() + .done(), + "simpleProcess.bpmn") + .send() + .join(); + + // when + final BroadcastSignalResponse response = + zeebeClient + .newBroadcastSignalCommand() + .signalName("signal") + .variable("foo", "bar") + .send() + .join(); + + // then + assertThat(response.getKey()).isPositive(); + } } From b6ea7f69a8e926dc9f30143a9c9b080765b66dea Mon Sep 17 00:00:00 2001 From: Remco Westerhoud Date: Thu, 9 Nov 2023 15:09:41 +0100 Subject: [PATCH 2/3] fix: don't set variables if empty If the variables are empty we cannot convert them to MsgPack. Instead, we should ignore them and not add them to the record. (cherry picked from commit 77901f4b5ed5b416735a84c73b79f2589395b4d0) --- .../process/test/engine/GrpcToLogStreamGateway.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java index 5f01f2c74..b88828c41 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java @@ -493,11 +493,15 @@ public void broadcastSignal( final var requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); + final SignalRecord command = new SignalRecord().setSignalName(request.getSignalName()); + + if (!request.getVariables().isEmpty()) { + command.setVariables( + BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(request.getVariables()))); + } + writer.writeCommandWithoutKey( - new SignalRecord() - .setSignalName(request.getSignalName()) - .setVariables( - BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(request.getVariables()))), + command, prepareRecordMetadata() .requestId(requestId) .valueType(ValueType.SIGNAL) From 2a7ec55e88750fe8f3fc91197c31808d44a22dcb Mon Sep 17 00:00:00 2001 From: Remco Westerhoud Date: Thu, 9 Nov 2023 15:11:35 +0100 Subject: [PATCH 3/3] fix: map signal response Maps a response for signals. The response from the gateway contains the key and the tenant id. I left the tenant id out for now as it's not yet supported in ZPT (cherry picked from commit e10a176c3a9e0c63611f20f6d39f8159803924c6) --- .../process/test/engine/GrpcResponseMapper.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java index 44773463b..ee2356bc6 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcResponseMapper.java @@ -14,6 +14,8 @@ import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivatedJob; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalRequest; +import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceRequest; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceResponse; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CompleteJobRequest; @@ -58,6 +60,7 @@ import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceResultRecord; +import io.camunda.zeebe.protocol.impl.record.value.signal.SignalRecord; import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord; import io.camunda.zeebe.protocol.record.RejectionType; import io.camunda.zeebe.protocol.record.intent.Intent; @@ -92,7 +95,8 @@ class GrpcResponseMapper { Map.entry(ResolveIncidentRequest.class, this::createResolveIncidentResponse), Map.entry(SetVariablesRequest.class, this::createSetVariablesResponse), Map.entry(UpdateJobRetriesRequest.class, this::createJobUpdateRetriesResponse), - Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse)); + Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse), + Map.entry(BroadcastSignalRequest.class, this::createBroadcastSignalResponse)); GeneratedMessageV3 map( final Class requestType, @@ -275,6 +279,13 @@ private GeneratedMessageV3 createModifyProcessInstanceResponse() { return ModifyProcessInstanceResponse.newBuilder().build(); } + private GeneratedMessageV3 createBroadcastSignalResponse() { + final SignalRecord signal = new SignalRecord(); + signal.wrap(valueBufferView); + + return BroadcastSignalResponse.newBuilder().setKey(key).build(); + } + private GeneratedMessageV3 createResolveIncidentResponse() { final IncidentRecord incident = new IncidentRecord(); incident.wrap(valueBufferView);