Skip to content

Commit

Permalink
merge: #966
Browse files Browse the repository at this point in the history
966: Properly support Signal broadcasting r=remcowesterhoud a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->

I only implemented half of the Signal support 🙈 . This PR adds the remainder and adds a test to verify it's possible to broadcast signals.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #964

<!-- Cut-off marker
_All lines under and including the cut-off marker will be removed from the merge commit message_

## Definition of Ready

Please check the items that apply, before requesting a review.

You can find more details about these items in our wiki page about [Pull Requests and Code Reviews](https://github.com/camunda/zeebe/wiki/Pull-Requests-and-Code-Reviews).

* [ ] I've reviewed my own code
* [ ] I've written a clear changelist description
* [ ] I've narrowly scoped my changes
* [ ] I've separated structural from behavioural changes
-->

## Definition of Done

<!-- Please check the items that apply, before merging or (if possible) before requesting a review. -->

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [x] The changes are backwards compatibility with previous versions
* [x] If it fixes a bug then PRs are created to backport the fix

Testing:
* [x] There are unit/integration tests that verify all acceptance criterias of the issue
* [x] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually

Documentation:
* [ ] Javadoc has been written
* [ ] The documentation is updated


Co-authored-by: Remco Westerhoud <[email protected]>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud authored Nov 9, 2023
2 parents 1618aa7 + e10a176 commit 45b6ed5
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivatedJob;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.BroadcastSignalResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CancelProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.CompleteJobRequest;
Expand Down Expand Up @@ -59,6 +61,7 @@
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceResultRecord;
import io.camunda.zeebe.protocol.impl.record.value.signal.SignalRecord;
import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.Intent;
Expand Down Expand Up @@ -93,7 +96,8 @@ class GrpcResponseMapper {
Map.entry(ResolveIncidentRequest.class, this::createResolveIncidentResponse),
Map.entry(SetVariablesRequest.class, this::createSetVariablesResponse),
Map.entry(UpdateJobRetriesRequest.class, this::createJobUpdateRetriesResponse),
Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse));
Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse),
Map.entry(BroadcastSignalRequest.class, this::createBroadcastSignalResponse));

GeneratedMessageV3 map(
final Class<? extends GeneratedMessageV3> requestType,
Expand Down Expand Up @@ -287,6 +291,13 @@ private GeneratedMessageV3 createModifyProcessInstanceResponse() {
return ModifyProcessInstanceResponse.newBuilder().build();
}

private GeneratedMessageV3 createBroadcastSignalResponse() {
final SignalRecord signal = new SignalRecord();
signal.wrap(valueBufferView);

return BroadcastSignalResponse.newBuilder().setKey(key).build();
}

private GeneratedMessageV3 createResolveIncidentResponse() {
final IncidentRecord incident = new IncidentRecord();
incident.wrap(valueBufferView);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,15 @@ public void broadcastSignal(
final var requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

final SignalRecord command = new SignalRecord().setSignalName(request.getSignalName());

if (!request.getVariables().isEmpty()) {
command.setVariables(
BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(request.getVariables())));
}

writer.writeCommandWithoutKey(
new SignalRecord()
.setSignalName(request.getSignalName())
.setVariables(
BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(request.getVariables()))),
command,
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.SIGNAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.client.api.command.ClientException;
import io.camunda.zeebe.client.api.response.ActivateJobsResponse;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.response.BroadcastSignalResponse;
import io.camunda.zeebe.client.api.response.BrokerInfo;
import io.camunda.zeebe.client.api.response.DeploymentEvent;
import io.camunda.zeebe.client.api.response.EvaluateDecisionResponse;
Expand Down Expand Up @@ -1013,4 +1014,55 @@ void shouldDeployForm() {
assertThat(form.getFormKey()).isPositive();
assertThat(form.getFormId()).isEqualTo("Form_0w7r08e");
}

@Test
void shouldBroadcastSignal() {
// given
zeebeClient
.newDeployResourceCommand()
.addProcessModel(
Bpmn.createExecutableProcess("simpleProcess")
.startEvent()
.signal("signal")
.endEvent()
.done(),
"simpleProcess.bpmn")
.send()
.join();

// when
final BroadcastSignalResponse response =
zeebeClient.newBroadcastSignalCommand().signalName("signal").send().join();

// then
assertThat(response.getKey()).isPositive();
}

@Test
void shouldBroadcastSignalWithVariables() {
// given
zeebeClient
.newDeployResourceCommand()
.addProcessModel(
Bpmn.createExecutableProcess("simpleProcess")
.startEvent()
.signal("signal")
.endEvent()
.done(),
"simpleProcess.bpmn")
.send()
.join();

// when
final BroadcastSignalResponse response =
zeebeClient
.newBroadcastSignalCommand()
.signalName("signal")
.variable("foo", "bar")
.send()
.join();

// then
assertThat(response.getKey()).isPositive();
}
}

0 comments on commit 45b6ed5

Please sign in to comment.