Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new NVTX range for task GPU ownership #11596

Open
wants to merge 3 commits into
base: branch-24.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{NvtxColor, NvtxRange}
import ai.rapids.cudf.{NvtxColor, NvtxRange, NvtxUniqueRange}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -162,7 +162,7 @@ object GpuSemaphore {
* this is considered to be okay as there are other mechanisms in place, and it should be rather
* rare.
*/
private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging {
private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long) extends Logging {
/**
* This holds threads that are not on the GPU yet. Most of the time they are
* blocked waiting for the semaphore to let them on, but it may hold one
Expand All @@ -179,6 +179,7 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging {
*/
private val activeThreads = new util.LinkedHashSet[Thread]()
private lazy val numPermits = GpuSemaphore.computeNumPermits(SQLConf.get)
private lazy val trackSemaphore = RapidsConf.TRACE_TASK_GPU_OWNERSHIP.get(SQLConf.get)
/**
* If this task holds the GPU semaphore or not.
*/
Expand All @@ -187,6 +188,8 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging {

type GpuBackingSemaphore = PrioritySemaphore[Long]

var nvtxRange: Option[NvtxUniqueRange] = None

/**
* Does this task have the GPU semaphore or not. Be careful because it can change at
* any point in time. So only use it for logging.
Expand Down Expand Up @@ -258,6 +261,11 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging {
// We now own the semaphore so we need to wake up all of the other tasks that are
// waiting.
hasSemaphore = true
if (trackSemaphore) {
nvtxRange =
Some(new NvtxUniqueRange(s"Stage ${stageId} Task ${taskAttemptId} owning GPU",
NvtxColor.ORANGE))
}
moveToActive(t)
notifyAll()
done = true
Expand Down Expand Up @@ -309,6 +317,8 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging {
semaphore.release(numPermits)
hasSemaphore = false
lastHeld = System.currentTimeMillis()
nvtxRange.foreach(_.close())
nvtxRange = None
}
// It should be impossible for the current thread to be blocked when releasing the semaphore
// because no blocked thread should ever leave `blockUntilReady`, which is where we put it in
Expand All @@ -324,7 +334,9 @@ private final class GpuSemaphore() extends Logging {

type GpuBackingSemaphore = PrioritySemaphore[Long]
private val semaphore = new GpuBackingSemaphore(MAX_PERMITS)
// Keep track of all tasks that are both active on the GPU and blocked waiting on the GPU
// A map of taskAttemptId => semaphoreTaskInfo.
// This map keeps track of all tasks that are both active on the GPU and blocked waiting
// on the GPU.
private val tasks = new ConcurrentHashMap[Long, SemaphoreTaskInfo]

def tryAcquire(context: TaskContext): TryAcquireResult = {
Expand All @@ -333,7 +345,7 @@ private final class GpuSemaphore() extends Logging {
val taskAttemptId = context.taskAttemptId()
val taskInfo = tasks.computeIfAbsent(taskAttemptId, _ => {
onTaskCompletion(context, completeTask)
new SemaphoreTaskInfo(taskAttemptId)
new SemaphoreTaskInfo(context.stageId(), taskAttemptId)
})
if (taskInfo.tryAcquire(semaphore, taskAttemptId)) {
GpuDeviceManager.initializeFromTask()
Expand All @@ -357,7 +369,7 @@ private final class GpuSemaphore() extends Logging {
val taskAttemptId = context.taskAttemptId()
val taskInfo = tasks.computeIfAbsent(taskAttemptId, _ => {
onTaskCompletion(context, completeTask)
new SemaphoreTaskInfo(taskAttemptId)
new SemaphoreTaskInfo(context.stageId(), taskAttemptId)
})
taskInfo.blockUntilReady(semaphore)
GpuDeviceManager.initializeFromTask()
Expand Down Expand Up @@ -426,4 +438,4 @@ private final class GpuSemaphore() extends Logging {
logDebug(s"shutting down with ${tasks.size} tasks still registered")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2383,6 +2383,13 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.booleanConf
.createWithDefault(true)

val TRACE_TASK_GPU_OWNERSHIP = conf("spark.rapids.sql.nvtx.traceTaskGpuOwnership")
.doc("Enable tracing of the GPU ownership of tasks. This can be useful for debugging " +
"deadlocks and other issues related to GPU semaphore.")
.internal()
.booleanConf
.createWithDefault(false)

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down