Skip to content

Commit

Permalink
Add context to rpc audit log to output necessary context
Browse files Browse the repository at this point in the history
  • Loading branch information
maobaolong committed Aug 28, 2024
1 parent e62fe7c commit 0414673
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,33 @@
package org.apache.uniffle.common.audit;

import java.io.Closeable;
import java.util.Optional;

import org.slf4j.Logger;

import org.apache.uniffle.common.rpc.StatusCode;

/** Context for rpc audit logging. */
public abstract class RpcAuditContext implements Closeable {
private static final ThreadLocal<RpcAuditContext> RPC_AUDIT_CONTEXT_THREAD_LOCAL =
new ThreadLocal<>();
private final Logger log;
private String command;
private String statusCode;
private String args;
private String returnValue;
private String context;
private String from;
private long creationTimeNs;
protected long executionTimeNs;

public RpcAuditContext(Logger log) {
this.log = log;
RPC_AUDIT_CONTEXT_THREAD_LOCAL.set(this);
}

public static final Optional<RpcAuditContext> getRpcAuditContext() {
return Optional.ofNullable(RPC_AUDIT_CONTEXT_THREAD_LOCAL.get());
}

protected abstract String content();
Expand Down Expand Up @@ -119,6 +128,21 @@ public RpcAuditContext withFrom(String from) {
return this;
}

/**
* Sets context field, context can be concat by invoke multiply time.
*
* @param contextPart the new context part
* @return this {@link RpcAuditContext} instance
*/
public RpcAuditContext withContext(String contextPart) {
if (context == null) {
context = contextPart;
} else {
this.context += ", " + contextPart;
}
return this;
}

@Override
public void close() {
if (log == null) {
Expand All @@ -140,6 +164,9 @@ public String toString() {
if (returnValue != null) {
line += String.format("\treturn{%s}", returnValue);
}
if (context != null) {
line += String.format("\tcontext{%s}", context);
}
return line;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,8 @@ public void reportShuffleResult(
appId,
shuffleId);
}
auditContext.withContext("updatedBlockCount=" + updatedBlockCount);
auditContext.withContext("expectedBlockCount=" + expectedBlockCount);
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "error happened when report shuffle result, check shuffle server for detail";
Expand Down Expand Up @@ -853,7 +855,6 @@ public void getShuffleResult(
}

String msg = "OK";
GetShuffleResultResponse reply;
byte[] serializedBlockIds = null;
String requestInfo =
"appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";
Expand All @@ -878,7 +879,8 @@ public void getShuffleResult(
}

auditContext.withStatusCode(status);
reply =
auditContext.withReturnValue("serializedBlockIdsBytes=" + serializedBlockIdsBytes.size());
GetShuffleResultResponse reply =
GetShuffleResultResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
Expand Down Expand Up @@ -906,7 +908,7 @@ public void getShuffleResultForMultiPart(

auditContext.withAppId(appId).withShuffleId(shuffleId);
auditContext.withArgs(
"partitionsListSize=" + partitionsList.size() + ", blockIdLayout=" + blockIdLayout);
"partitionsList=" + partitionsList + ", blockIdLayout=" + blockIdLayout);

StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
Expand All @@ -922,7 +924,6 @@ public void getShuffleResultForMultiPart(
}

String msg = "OK";
GetShuffleResultForMultiPartResponse reply;
byte[] serializedBlockIds = null;
String requestInfo =
"appId[" + appId + "], shuffleId[" + shuffleId + "], partitions" + partitionsList;
Expand All @@ -947,8 +948,9 @@ public void getShuffleResultForMultiPart(
LOG.error("Error happened when get shuffle result for {}", requestInfo, e);
}

auditContext.withReturnValue("serializedBlockIdsBytes=" + serializedBlockIdsBytes.size());
auditContext.withStatusCode(status);
reply =
GetShuffleResultForMultiPartResponse reply =
GetShuffleResultForMultiPartResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.audit.RpcAuditContext;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.exception.InvalidRequestException;
Expand Down Expand Up @@ -641,9 +642,20 @@ public byte[] getFinishedBlockIds(
Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
for (Map.Entry<Integer, Set<Integer>> entry : bitmapIndexToPartitions.entrySet()) {
Set<Integer> requestPartitions = entry.getValue();
Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
int bitmapIndex = entry.getKey();
Roaring64NavigableMap bitmap = blockIds[bitmapIndex];
getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
RpcAuditContext.getRpcAuditContext()
.ifPresent(
context ->
context.withContext(
String.format(
"bitmap[%d].<size,byte>=<%d,%d>",
bitmapIndex, bitmap.getLongCardinality(), bitmap.getLongSizeInBytes())));
}
RpcAuditContext.getRpcAuditContext()
.ifPresent(
context -> context.withContext("partitionBlockCount=" + res.getLongCardinality()));

if (res.getLongCardinality() != expectedBlockNumber) {
throw new RssException(
Expand Down

0 comments on commit 0414673

Please sign in to comment.