Skip to content

Commit

Permalink
removed redundant atomic var
Browse files Browse the repository at this point in the history
  • Loading branch information
gauravAshok committed May 23, 2024
1 parent b384a88 commit c0a5cfe
Showing 1 changed file with 2 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public class GroupedMessageSrc<O extends Offset> implements MessageSrc {
*/
private final AtomicReference<NextMsgsRequest> pendingRequest = new AtomicReference<>();

private final AtomicBoolean pendingRequestWaitingOnFreeGroups = new AtomicBoolean(false);

/**
* Attempt to fill the message array with one message from each group.
* Subsequent messages from a group are not fetched until the previous message is consumed.
Expand All @@ -81,7 +79,6 @@ public CompletableFuture<Integer> nextMessages(MessageTracker[] messages) {
throw new IllegalStateException(
"nextMessages method is not supposed to be called concurrently. There seems to be a pending nextMessage call");
}
pendingRequestWaitingOnFreeGroups.set(true);

// incomplete result is saved. trigger new message fetch.
optionallyFetchNewMessages();
Expand All @@ -95,9 +92,8 @@ public CompletableFuture<Integer> nextMessages(MessageTracker[] messages) {
}

private void tryCompletePendingRequest() {
if (pendingRequestWaitingOnFreeGroups.compareAndSet(true, false)) {
// ohh, a free group was there and I am able to grab the request for completion.
NextMsgsRequest request = pendingRequest.getAndSet(null);
NextMsgsRequest request;
if ((request = pendingRequest.getAndSet(null)) != null) {
request.result.complete(nextMessagesInternal(request.messages));
}
}
Expand Down

0 comments on commit c0a5cfe

Please sign in to comment.