From 5400dfb12015a1e7bf75e955ea7c2acc66c4e142 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 13 Aug 2024 14:57:45 -0700 Subject: [PATCH 1/3] Add nvtx range for task owning GPU Signed-off-by: Jihoon Son --- .../com/nvidia/spark/rapids/GpuSemaphore.scala | 15 ++++++++++++++- .../com/nvidia/spark/rapids/RapidsConf.scala | 7 +++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index 78d05efb0c2..17ea163b1af 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -22,7 +22,8 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{NvtxColor, NvtxRange} +import ai.rapids.cudf.{NvtxColor, NvtxRange, NvtxUniqueRange} +import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import org.apache.spark.TaskContext @@ -179,6 +180,7 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { */ private val activeThreads = new util.LinkedHashSet[Thread]() private lazy val numPermits = GpuSemaphore.computeNumPermits(SQLConf.get) + private lazy val trackSemaphore = RapidsConf.TRACE_TASK_GPU_OWNERSHIP.get(SQLConf.get) /** * If this task holds the GPU semaphore or not. */ @@ -187,6 +189,8 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { type GpuBackingSemaphore = PrioritySemaphore[Long] + var nvtxRange: Option[NvtxUniqueRange] = None + /** * Does this task have the GPU semaphore or not. Be careful because it can change at * any point in time. So only use it for logging. @@ -258,6 +262,10 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { // We now own the semaphore so we need to wake up all of the other tasks that are // waiting. hasSemaphore = true + if (trackSemaphore) { + nvtxRange = + Some(new NvtxUniqueRange(s"Sem-${taskAttemptId}", NvtxColor.ORANGE)) + } moveToActive(t) notifyAll() done = true @@ -309,6 +317,10 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { semaphore.release(numPermits) hasSemaphore = false lastHeld = System.currentTimeMillis() + nvtxRange match { + case Some(range) => range.safeClose() + case _ => // do nothing + } } // It should be impossible for the current thread to be blocked when releasing the semaphore // because no blocked thread should ever leave `blockUntilReady`, which is where we put it in @@ -325,6 +337,7 @@ private final class GpuSemaphore() extends Logging { type GpuBackingSemaphore = PrioritySemaphore[Long] private val semaphore = new GpuBackingSemaphore(MAX_PERMITS) // Keep track of all tasks that are both active on the GPU and blocked waiting on the GPU + // taskAttemptId => semaphoreTaskInfo private val tasks = new ConcurrentHashMap[Long, SemaphoreTaskInfo] def tryAcquire(context: TaskContext): TryAcquireResult = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 50dc457268c..2147773a4de 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2383,6 +2383,13 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. .booleanConf .createWithDefault(true) + val TRACE_TASK_GPU_OWNERSHIP = conf("spark.rapids.sql.traceTaskGpuOwnership") + .doc("Enable tracing of the GPU ownership of tasks. This can be useful for debugging " + + "deadlocks and other issues related to GPU semaphore.") + .internal() + .booleanConf + .createWithDefault(false) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") From 5d522462a7723abec95367b6002dea525760ae04 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 22 Oct 2024 16:06:38 -0700 Subject: [PATCH 2/3] review comments --- .../nvidia/spark/rapids/GpuSemaphore.scala | 20 +++++++++---------- .../com/nvidia/spark/rapids/RapidsConf.scala | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index 17ea163b1af..e2f1cf9b133 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -163,7 +163,7 @@ object GpuSemaphore { * this is considered to be okay as there are other mechanisms in place, and it should be rather * rare. */ -private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { +private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long) extends Logging { /** * This holds threads that are not on the GPU yet. Most of the time they are * blocked waiting for the semaphore to let them on, but it may hold one @@ -264,7 +264,8 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { hasSemaphore = true if (trackSemaphore) { nvtxRange = - Some(new NvtxUniqueRange(s"Sem-${taskAttemptId}", NvtxColor.ORANGE)) + Some(new NvtxUniqueRange(s"Stage ${stageId} Task ${taskAttemptId} owning GPU", + NvtxColor.ORANGE)) } moveToActive(t) notifyAll() @@ -317,10 +318,8 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { semaphore.release(numPermits) hasSemaphore = false lastHeld = System.currentTimeMillis() - nvtxRange match { - case Some(range) => range.safeClose() - case _ => // do nothing - } + nvtxRange.foreach(_.close()) + nvtxRange = None } // It should be impossible for the current thread to be blocked when releasing the semaphore // because no blocked thread should ever leave `blockUntilReady`, which is where we put it in @@ -336,8 +335,9 @@ private final class GpuSemaphore() extends Logging { type GpuBackingSemaphore = PrioritySemaphore[Long] private val semaphore = new GpuBackingSemaphore(MAX_PERMITS) - // Keep track of all tasks that are both active on the GPU and blocked waiting on the GPU - // taskAttemptId => semaphoreTaskInfo + // A map of taskAttemptId => semaphoreTaskInfo. + // This map keeps track of all tasks that are both active on the GPU and blocked waiting + // on the GPU. private val tasks = new ConcurrentHashMap[Long, SemaphoreTaskInfo] def tryAcquire(context: TaskContext): TryAcquireResult = { @@ -346,7 +346,7 @@ private final class GpuSemaphore() extends Logging { val taskAttemptId = context.taskAttemptId() val taskInfo = tasks.computeIfAbsent(taskAttemptId, _ => { onTaskCompletion(context, completeTask) - new SemaphoreTaskInfo(taskAttemptId) + new SemaphoreTaskInfo(context.stageId(), taskAttemptId) }) if (taskInfo.tryAcquire(semaphore, taskAttemptId)) { GpuDeviceManager.initializeFromTask() @@ -370,7 +370,7 @@ private final class GpuSemaphore() extends Logging { val taskAttemptId = context.taskAttemptId() val taskInfo = tasks.computeIfAbsent(taskAttemptId, _ => { onTaskCompletion(context, completeTask) - new SemaphoreTaskInfo(taskAttemptId) + new SemaphoreTaskInfo(context.stageId(), taskAttemptId) }) taskInfo.blockUntilReady(semaphore) GpuDeviceManager.initializeFromTask() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 2147773a4de..18480f1260a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2383,7 +2383,7 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. .booleanConf .createWithDefault(true) - val TRACE_TASK_GPU_OWNERSHIP = conf("spark.rapids.sql.traceTaskGpuOwnership") + val TRACE_TASK_GPU_OWNERSHIP = conf("spark.rapids.sql.nvtx.traceTaskGpuOwnership") .doc("Enable tracing of the GPU ownership of tasks. This can be useful for debugging " + "deadlocks and other issues related to GPU semaphore.") .internal() From 3fd32c2087dc6e204b04aa6f26a1e4ebdfb05329 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 24 Oct 2024 09:45:36 -0700 Subject: [PATCH 3/3] Unused import --- .../src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index e2f1cf9b133..719c4525373 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -23,7 +23,6 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{NvtxColor, NvtxRange, NvtxUniqueRange} -import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import org.apache.spark.TaskContext @@ -439,4 +438,4 @@ private final class GpuSemaphore() extends Logging { logDebug(s"shutting down with ${tasks.size} tasks still registered") } } -} \ No newline at end of file +}