diff --git a/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java b/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java index fbe85efd0b..f27fc5ae22 100644 --- a/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java +++ b/common/src/main/java/org/apache/uniffle/common/audit/RpcAuditContext.java @@ -18,6 +18,7 @@ package org.apache.uniffle.common.audit; import java.io.Closeable; +import java.util.Optional; import org.slf4j.Logger; @@ -25,17 +26,25 @@ /** Context for rpc audit logging. */ public abstract class RpcAuditContext implements Closeable { + private static final ThreadLocal RPC_AUDIT_CONTEXT_THREAD_LOCAL = + new ThreadLocal<>(); private final Logger log; private String command; private String statusCode; private String args; private String returnValue; + private String context; private String from; private long creationTimeNs; protected long executionTimeNs; public RpcAuditContext(Logger log) { this.log = log; + RPC_AUDIT_CONTEXT_THREAD_LOCAL.set(this); + } + + public static final Optional getRpcAuditContext() { + return Optional.ofNullable(RPC_AUDIT_CONTEXT_THREAD_LOCAL.get()); } protected abstract String content(); @@ -78,6 +87,21 @@ public RpcAuditContext withStatusCode(StatusCode statusCode) { return this; } + /** + * Sets context field, context can be concat by invoke multiply time. + * + * @param contextPart the new context part + * @return this {@link RpcAuditContext} instance + */ + public RpcAuditContext withContext(String contextPart) { + if (context == null) { + context = contextPart; + } else { + this.context += ", " + contextPart; + } + return this; + } + /** * Sets status code field. * @@ -140,6 +164,9 @@ public String toString() { if (returnValue != null) { line += String.format("\treturn{%s}", returnValue); } + if (context != null) { + line += String.format("\tcontext{%s}", context); + } return line; } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 5cce3a3a8b..23e19a48b8 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -806,6 +806,8 @@ public void reportShuffleResult( appId, shuffleId); } + auditContext.withContext("updatedBlockCount=" + updatedBlockCount); + auditContext.withContext("expectedBlockCount=" + expectedBlockCount); } catch (Exception e) { status = StatusCode.INTERNAL_ERROR; msg = "error happened when report shuffle result, check shuffle server for detail"; @@ -878,6 +880,7 @@ public void getShuffleResult( } auditContext.withStatusCode(status); + auditContext.withReturnValue("serializedBlockIdsBytes=" + serializedBlockIdsBytes.size()); reply = GetShuffleResultResponse.newBuilder() .setStatus(status.toProto()) @@ -906,7 +909,7 @@ public void getShuffleResultForMultiPart( auditContext.withAppId(appId).withShuffleId(shuffleId); auditContext.withArgs( - "partitionsListSize=" + partitionsList.size() + ", blockIdLayout=" + blockIdLayout); + "partitionsList=" + partitionsList + ", blockIdLayout=" + blockIdLayout); StatusCode status = verifyRequest(appId); if (status != StatusCode.SUCCESS) { @@ -947,6 +950,7 @@ public void getShuffleResultForMultiPart( LOG.error("Error happened when get shuffle result for {}", requestInfo, e); } + auditContext.withReturnValue("serializedBlockIdsBytes=" + serializedBlockIdsBytes.size()); auditContext.withStatusCode(status); reply = GetShuffleResultForMultiPartResponse.newBuilder() diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 226682e638..e3280dd06f 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -58,6 +58,7 @@ import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.audit.RpcAuditContext; import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.exception.FileNotFoundException; import org.apache.uniffle.common.exception.InvalidRequestException; @@ -641,9 +642,20 @@ public byte[] getFinishedBlockIds( Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf(); for (Map.Entry> entry : bitmapIndexToPartitions.entrySet()) { Set requestPartitions = entry.getValue(); - Roaring64NavigableMap bitmap = blockIds[entry.getKey()]; + int bitmapIndex = entry.getKey(); + Roaring64NavigableMap bitmap = blockIds[bitmapIndex]; getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout); + RpcAuditContext.getRpcAuditContext() + .ifPresent( + context -> + context.withContext( + String.format( + "bitmap[%d].=<%d,%d>", + bitmapIndex, bitmap.getLongCardinality(), bitmap.getLongSizeInBytes()))); } + RpcAuditContext.getRpcAuditContext() + .ifPresent( + context -> context.withContext("partitionBlockCount=" + res.getLongCardinality())); if (res.getLongCardinality() != expectedBlockNumber) { throw new RssException(