Skip to content

Commit

Permalink
[YUNIKORN-2368] Shim: Send updated resource requests to core (#912)
Browse files Browse the repository at this point in the history
Closes: #912
  • Loading branch information
craigcondit committed Sep 9, 2024
1 parent cd60dd9 commit 8add17d
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 54 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ go 1.22.0
toolchain go1.22.5

require (
github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e
github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e h1:VaihjHjtmsDK7HEOjlX8KCz7QDxmZSf71CSCuOgjqcc=
github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e/go.mod h1:HYeyzHhZt43oG54pasKHrwHM+Jeji8nFoAE2bcLWLYg=
github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3 h1:ySu0cpFSYFGNtf+PZw4ulzO+cWOyJMYJs+AjmwGWM80=
github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3/go.mod h1:HYeyzHhZt43oG54pasKHrwHM+Jeji8nFoAE2bcLWLYg=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a h1:3WRXGTvhunGBZj8AVZDxx7Bs/AXiH9mvf2jYcuDyklA=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down
12 changes: 7 additions & 5 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,14 @@ func (app *Application) onReservationStateChange() {

for _, t := range app.getTasks(TaskStates().Bound) {
if t.placeholder {
if _, ok := desireCounts[t.taskGroupName]; ok {
desireCounts[t.taskGroupName]--
taskGroupName := t.GetTaskGroupName()
if _, ok := desireCounts[taskGroupName]; ok {
desireCounts[taskGroupName]--
} else {
log.Log(log.ShimCacheApplication).Debug("placeholder taskGroupName set on pod is unknown for application",
zap.String("application", app.applicationID),
zap.String("podName", t.GetTaskPod().Name),
zap.String("taskGroupName", t.taskGroupName))
zap.String("taskGroupName", taskGroupName))
}
}
}
Expand Down Expand Up @@ -659,12 +660,13 @@ func (app *Application) handleAppTaskCompletedEvent() {
}

func (app *Application) publishPlaceholderTimeoutEvents(task *Task) {
if app.originatingTask != nil && task.IsPlaceholder() && task.terminationType == si.TerminationType_name[int32(si.TerminationType_TIMEOUT)] {
taskTerminationType := task.GetTaskTerminationType()
if app.originatingTask != nil && task.IsPlaceholder() && taskTerminationType == si.TerminationType_name[int32(si.TerminationType_TIMEOUT)] {
log.Log(log.ShimCacheApplication).Debug("trying to send placeholder timeout events to the original pod from application",
zap.String("appID", app.applicationID),
zap.Stringer("app request originating pod", app.originatingTask.GetTaskPod()),
zap.String("taskID", task.taskID),
zap.String("terminationType", task.terminationType))
zap.String("terminationType", taskTerminationType))
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "GangScheduling",
"PlaceholderTimeOut", "Application %s placeholder has been timed out", app.applicationID)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) {
app := ctx.getApplication(appID)
if app != nil {
if task := app.GetTask(taskID); task != nil {
task.setTaskPod(pod)
task.SetTaskPod(pod)
}
}

Expand Down Expand Up @@ -1194,7 +1194,7 @@ func (ctx *Context) HandleContainerStateUpdate(request *si.UpdateContainerSchedu
Reason: "SchedulingSkipped",
Message: request.Reason,
}) {
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil,
v1.EventTypeNormal, "PodUnschedulable", "PodUnschedulable",
"Task %s is skipped from scheduling because the queue quota has been exceed", task.alias)
}
Expand All @@ -1209,7 +1209,7 @@ func (ctx *Context) HandleContainerStateUpdate(request *si.UpdateContainerSchedu
Reason: v1.PodReasonUnschedulable,
Message: request.Reason,
}) {
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil,
v1.EventTypeNormal, "PodUnschedulable", "PodUnschedulable",
"Task %s is pending for the requested resources become available", task.alias)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/placeholder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
// map task group to count of already created placeholders
tgCounts := make(map[string]int32)
for _, ph := range app.getPlaceHolderTasks() {
tgCounts[ph.getTaskGroupName()]++
tgCounts[ph.GetTaskGroupName()]++
}

// iterate all task groups, create placeholders for all the min members
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/scheduler_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons
task.setAllocationKey(alloc.AllocationKey)

if err := callback.context.AssumePod(alloc.AllocationKey, alloc.NodeID); err != nil {
task.failWithEvent(err.Error(), "AssumePodError")
task.FailWithEvent(err.Error(), "AssumePodError")
return err
}

Expand Down
110 changes: 71 additions & 39 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,27 @@ import (
)

type Task struct {
taskID string
alias string
applicationID string
application *Application
taskID string
alias string
applicationID string
application *Application
podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons
context *Context
createTime time.Time
placeholder bool
originator bool
sm *fsm.FSM

// mutable resources, require locking
allocationKey string
resource *si.Resource
pod *v1.Pod
podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons
context *Context
nodeName string
createTime time.Time
taskGroupName string
placeholder bool
terminationType string
originator bool
schedulingState TaskSchedulingState
sm *fsm.FSM
lock *locking.RWMutex
resource *si.Resource
pod *v1.Pod

lock *locking.RWMutex
}

func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task {
Expand Down Expand Up @@ -135,14 +138,10 @@ func (task *Task) GetTaskPod() *v1.Pod {
}

func (task *Task) GetTaskID() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.taskID
}

func (task *Task) IsPlaceholder() bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.placeholder
}

Expand All @@ -157,19 +156,25 @@ func (task *Task) setTaskGroupName(groupName string) {
task.taskGroupName = groupName
}

func (task *Task) setTaskTerminationType(terminationTyp string) {
func (task *Task) setTaskTerminationType(terminationType string) {
task.lock.Lock()
defer task.lock.Unlock()
task.terminationType = terminationTyp
task.terminationType = terminationType
}

func (task *Task) getTaskGroupName() string {
func (task *Task) GetTaskTerminationType() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.terminationType
}

func (task *Task) GetTaskGroupName() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.taskGroupName
}

func (task *Task) getNodeName() string {
func (task *Task) GetNodeName() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.nodeName
Expand Down Expand Up @@ -222,8 +227,6 @@ func (task *Task) initialize() {
}

func (task *Task) IsOriginator() bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.originator
}

Expand Down Expand Up @@ -286,13 +289,33 @@ func (task *Task) handleSubmitTaskEvent() {
log.Log(log.ShimCacheTask).Debug("scheduling pod",
zap.String("podName", task.pod.Name))

// send update allocation event to core
task.updateAllocation()

if !utils.PodAlreadyBound(task.pod) {
// if this is a new request, add events to pod
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, v1.EventTypeNormal, "Scheduling", "Scheduling",
"%s is queued and waiting for allocation", task.alias)
// if this task belongs to a task group, that means the app has gang scheduling enabled
// in this case, post an event to indicate the task is being gang scheduled
if !task.placeholder && task.taskGroupName != "" {
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "GangScheduling", "TaskGroupMatch",
"Pod belongs to the taskGroup %s, it will be scheduled as a gang member", task.taskGroupName)
}
}
}

// updateAllocation updates the core scheduler when task information changes.
// This function must be called with the task lock held.
func (task *Task) updateAllocation() {
// build preemption policy
preemptionPolicy := &si.PreemptionPolicy{
AllowPreemptSelf: task.isPreemptSelfAllowed(),
AllowPreemptOther: task.isPreemptOtherAllowed(),
}

// submit allocation ask
// submit allocation
rr := common.CreateAllocationForTask(
task.applicationID,
task.taskID,
Expand All @@ -305,22 +328,9 @@ func (task *Task) handleSubmitTaskEvent() {
preemptionPolicy)
log.Log(log.ShimCacheTask).Debug("send update request", zap.Stringer("request", rr))
if err := task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err != nil {
log.Log(log.ShimCacheTask).Debug("failed to send scheduling request to scheduler", zap.Error(err))
log.Log(log.ShimCacheTask).Debug("failed to send allocation to scheduler", zap.Error(err))
return
}

if !utils.PodAlreadyBound(task.pod) {
// if this is a new request, add events to pod
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, v1.EventTypeNormal, "Scheduling", "Scheduling",
"%s is queued and waiting for allocation", task.alias)
// if this task belongs to a task group, that means the app has gang scheduling enabled
// in this case, post an event to indicate the task is being gang scheduled
if !task.placeholder && task.taskGroupName != "" {
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "GangScheduling", "TaskGroupMatch",
"Pod belongs to the taskGroup %s, it will be scheduled as a gang member", task.taskGroupName)
}
}
}

// this is called after task reaches PENDING state,
Expand Down Expand Up @@ -604,20 +614,42 @@ func (task *Task) UpdatePodCondition(podCondition *v1.PodCondition) (bool, *v1.P
return false, pod
}

func (task *Task) GetAllocationKey() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.allocationKey
}

func (task *Task) setAllocationKey(allocationKey string) {
task.lock.Lock()
defer task.lock.Unlock()
task.allocationKey = allocationKey
}

func (task *Task) FailWithEvent(errorMessage, actionReason string) {
task.lock.RLock()
defer task.lock.RUnlock()
task.failWithEvent(errorMessage, actionReason)
}

func (task *Task) failWithEvent(errorMessage, actionReason string) {
dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID, errorMessage))
events.GetRecorder().Eventf(task.pod.DeepCopy(),
nil, v1.EventTypeWarning, actionReason, actionReason, errorMessage)
}

func (task *Task) setTaskPod(pod *v1.Pod) {
func (task *Task) SetTaskPod(pod *v1.Pod) {
task.lock.Lock()
defer task.lock.Unlock()

task.pod = pod
oldResource := task.resource
newResource := common.GetPodResource(pod)
if !common.Equals(oldResource, newResource) {
// pod resources have changed
task.resource = newResource

// update allocation in core
task.updateAllocation()
}
}
4 changes: 2 additions & 2 deletions pkg/cache/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestReleaseTaskAllocation(t *testing.T) {
// bind a task is a async process, wait for it to happen
err = utils.WaitForCondition(
func() bool {
return task.getNodeName() == "node-1"
return task.GetNodeName() == "node-1"
},
100*time.Millisecond,
3*time.Second,
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestSetTaskGroup(t *testing.T) {
}
task := NewTask("task01", app, mockedContext, pod)
task.setTaskGroupName("test-group")
assert.Equal(t, task.getTaskGroupName(), "test-group")
assert.Equal(t, task.GetTaskGroupName(), "test-group")
}

//nolint:funlen
Expand Down

0 comments on commit 8add17d

Please sign in to comment.