Skip to content

Commit

Permalink
merge: #969
Browse files Browse the repository at this point in the history
969: [Backport stable/8.2] Properly support Signal broadcasting r=remcowesterhoud a=remcowesterhoud

# Description
Backport of #966 to `stable/8.2`.

relates to #964

Co-authored-by: Remco Westerhoud <[email protected]>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud authored Nov 9, 2023
2 parents 9f9f92d + ff967e9 commit d9ffbd9
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivatedJob;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CompleteJobRequest;
Expand Down Expand Up @@ -58,6 +60,7 @@
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceResultRecord;
import io.camunda.zeebe.protocol.impl.record.value.signal.SignalRecord;
import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.Intent;
Expand Down Expand Up @@ -92,7 +95,8 @@ class GrpcResponseMapper {
Map.entry(ResolveIncidentRequest.class, this::createResolveIncidentResponse),
Map.entry(SetVariablesRequest.class, this::createSetVariablesResponse),
Map.entry(UpdateJobRetriesRequest.class, this::createJobUpdateRetriesResponse),
Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse));
Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse),
Map.entry(BroadcastSignalRequest.class, this::createBroadcastSignalResponse));

GeneratedMessageV3 map(
final Class<? extends GeneratedMessageV3> requestType,
Expand Down Expand Up @@ -275,6 +279,13 @@ private GeneratedMessageV3 createModifyProcessInstanceResponse() {
return ModifyProcessInstanceResponse.newBuilder().build();
}

private GeneratedMessageV3 createBroadcastSignalResponse() {
final SignalRecord signal = new SignalRecord();
signal.wrap(valueBufferView);

return BroadcastSignalResponse.newBuilder().setKey(key).build();
}

private GeneratedMessageV3 createResolveIncidentResponse() {
final IncidentRecord incident = new IncidentRecord();
incident.wrap(valueBufferView);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,15 @@ public void broadcastSignal(
final var requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

final SignalRecord command = new SignalRecord().setSignalName(request.getSignalName());

if (!request.getVariables().isEmpty()) {
command.setVariables(
BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(request.getVariables())));
}

writer.writeCommandWithoutKey(
new SignalRecord()
.setSignalName(request.getSignalName())
.setVariables(
BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(request.getVariables()))),
command,
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.SIGNAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.client.api.command.ClientException;
import io.camunda.zeebe.client.api.response.ActivateJobsResponse;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.response.BroadcastSignalResponse;
import io.camunda.zeebe.client.api.response.BrokerInfo;
import io.camunda.zeebe.client.api.response.DeploymentEvent;
import io.camunda.zeebe.client.api.response.EvaluateDecisionResponse;
Expand Down Expand Up @@ -988,4 +989,55 @@ void shouldIncreaseTheTime() {
assertThat(processCompleted).isNotEmpty();
});
}

@Test
void shouldBroadcastSignal() {
// given
zeebeClient
.newDeployResourceCommand()
.addProcessModel(
Bpmn.createExecutableProcess("simpleProcess")
.startEvent()
.signal("signal")
.endEvent()
.done(),
"simpleProcess.bpmn")
.send()
.join();

// when
final BroadcastSignalResponse response =
zeebeClient.newBroadcastSignalCommand().signalName("signal").send().join();

// then
assertThat(response.getKey()).isPositive();
}

@Test
void shouldBroadcastSignalWithVariables() {
// given
zeebeClient
.newDeployResourceCommand()
.addProcessModel(
Bpmn.createExecutableProcess("simpleProcess")
.startEvent()
.signal("signal")
.endEvent()
.done(),
"simpleProcess.bpmn")
.send()
.join();

// when
final BroadcastSignalResponse response =
zeebeClient
.newBroadcastSignalCommand()
.signalName("signal")
.variables(Map.of("foo", "bar"))
.send()
.join();

// then
assertThat(response.getKey()).isPositive();
}
}

0 comments on commit d9ffbd9

Please sign in to comment.