diff --git a/go.mod b/go.mod index 3b7c2d118..13ad673e8 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ go 1.22.0 toolchain go1.22.5 require ( - github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e + github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3 github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 2f048f158..f1203810b 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= -github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e h1:VaihjHjtmsDK7HEOjlX8KCz7QDxmZSf71CSCuOgjqcc= -github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e/go.mod h1:HYeyzHhZt43oG54pasKHrwHM+Jeji8nFoAE2bcLWLYg= +github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3 h1:ySu0cpFSYFGNtf+PZw4ulzO+cWOyJMYJs+AjmwGWM80= +github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3/go.mod h1:HYeyzHhZt43oG54pasKHrwHM+Jeji8nFoAE2bcLWLYg= github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a h1:3WRXGTvhunGBZj8AVZDxx7Bs/AXiH9mvf2jYcuDyklA= github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= diff --git a/pkg/cache/application.go b/pkg/cache/application.go index 085465ce7..8438957ba 100644 --- a/pkg/cache/application.go +++ b/pkg/cache/application.go @@ -546,13 +546,14 @@ func (app *Application) onReservationStateChange() { for _, t := range app.getTasks(TaskStates().Bound) { if t.placeholder { - if _, ok := desireCounts[t.taskGroupName]; ok { - desireCounts[t.taskGroupName]-- + taskGroupName := t.GetTaskGroupName() + if _, ok := desireCounts[taskGroupName]; ok { + desireCounts[taskGroupName]-- } else { log.Log(log.ShimCacheApplication).Debug("placeholder taskGroupName set on pod is unknown for application", zap.String("application", app.applicationID), zap.String("podName", t.GetTaskPod().Name), - zap.String("taskGroupName", t.taskGroupName)) + zap.String("taskGroupName", taskGroupName)) } } } @@ -659,12 +660,13 @@ func (app *Application) handleAppTaskCompletedEvent() { } func (app *Application) publishPlaceholderTimeoutEvents(task *Task) { - if app.originatingTask != nil && task.IsPlaceholder() && task.terminationType == si.TerminationType_name[int32(si.TerminationType_TIMEOUT)] { + taskTerminationType := task.GetTaskTerminationType() + if app.originatingTask != nil && task.IsPlaceholder() && taskTerminationType == si.TerminationType_name[int32(si.TerminationType_TIMEOUT)] { log.Log(log.ShimCacheApplication).Debug("trying to send placeholder timeout events to the original pod from application", zap.String("appID", app.applicationID), zap.Stringer("app request originating pod", app.originatingTask.GetTaskPod()), zap.String("taskID", task.taskID), - zap.String("terminationType", task.terminationType)) + zap.String("terminationType", taskTerminationType)) events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "GangScheduling", "PlaceholderTimeOut", "Application %s placeholder has been timed out", app.applicationID) } diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 61fe1ea02..aaea690aa 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -299,7 +299,7 @@ func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) { app := ctx.getApplication(appID) if app != nil { if task := app.GetTask(taskID); task != nil { - task.setTaskPod(pod) + task.SetTaskPod(pod) } } @@ -1194,7 +1194,7 @@ func (ctx *Context) HandleContainerStateUpdate(request *si.UpdateContainerSchedu Reason: "SchedulingSkipped", Message: request.Reason, }) { - events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, + events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeNormal, "PodUnschedulable", "PodUnschedulable", "Task %s is skipped from scheduling because the queue quota has been exceed", task.alias) } @@ -1209,7 +1209,7 @@ func (ctx *Context) HandleContainerStateUpdate(request *si.UpdateContainerSchedu Reason: v1.PodReasonUnschedulable, Message: request.Reason, }) { - events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, + events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeNormal, "PodUnschedulable", "PodUnschedulable", "Task %s is pending for the requested resources become available", task.alias) } diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go index 230c77c6c..7a44ea065 100644 --- a/pkg/cache/placeholder_manager.go +++ b/pkg/cache/placeholder_manager.go @@ -79,7 +79,7 @@ func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error { // map task group to count of already created placeholders tgCounts := make(map[string]int32) for _, ph := range app.getPlaceHolderTasks() { - tgCounts[ph.getTaskGroupName()]++ + tgCounts[ph.GetTaskGroupName()]++ } // iterate all task groups, create placeholders for all the min members diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go index d058be396..643b4c719 100644 --- a/pkg/cache/scheduler_callback.go +++ b/pkg/cache/scheduler_callback.go @@ -64,7 +64,7 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons task.setAllocationKey(alloc.AllocationKey) if err := callback.context.AssumePod(alloc.AllocationKey, alloc.NodeID); err != nil { - task.failWithEvent(err.Error(), "AssumePodError") + task.FailWithEvent(err.Error(), "AssumePodError") return err } diff --git a/pkg/cache/task.go b/pkg/cache/task.go index b3d6915c5..5c0be9dec 100644 --- a/pkg/cache/task.go +++ b/pkg/cache/task.go @@ -40,24 +40,27 @@ import ( ) type Task struct { - taskID string - alias string - applicationID string - application *Application + taskID string + alias string + applicationID string + application *Application + podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons + context *Context + createTime time.Time + placeholder bool + originator bool + sm *fsm.FSM + + // mutable resources, require locking allocationKey string - resource *si.Resource - pod *v1.Pod - podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons - context *Context nodeName string - createTime time.Time taskGroupName string - placeholder bool terminationType string - originator bool schedulingState TaskSchedulingState - sm *fsm.FSM - lock *locking.RWMutex + resource *si.Resource + pod *v1.Pod + + lock *locking.RWMutex } func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task { @@ -135,14 +138,10 @@ func (task *Task) GetTaskPod() *v1.Pod { } func (task *Task) GetTaskID() string { - task.lock.RLock() - defer task.lock.RUnlock() return task.taskID } func (task *Task) IsPlaceholder() bool { - task.lock.RLock() - defer task.lock.RUnlock() return task.placeholder } @@ -157,19 +156,25 @@ func (task *Task) setTaskGroupName(groupName string) { task.taskGroupName = groupName } -func (task *Task) setTaskTerminationType(terminationTyp string) { +func (task *Task) setTaskTerminationType(terminationType string) { task.lock.Lock() defer task.lock.Unlock() - task.terminationType = terminationTyp + task.terminationType = terminationType } -func (task *Task) getTaskGroupName() string { +func (task *Task) GetTaskTerminationType() string { + task.lock.RLock() + defer task.lock.RUnlock() + return task.terminationType +} + +func (task *Task) GetTaskGroupName() string { task.lock.RLock() defer task.lock.RUnlock() return task.taskGroupName } -func (task *Task) getNodeName() string { +func (task *Task) GetNodeName() string { task.lock.RLock() defer task.lock.RUnlock() return task.nodeName @@ -222,8 +227,6 @@ func (task *Task) initialize() { } func (task *Task) IsOriginator() bool { - task.lock.RLock() - defer task.lock.RUnlock() return task.originator } @@ -286,13 +289,33 @@ func (task *Task) handleSubmitTaskEvent() { log.Log(log.ShimCacheTask).Debug("scheduling pod", zap.String("podName", task.pod.Name)) + // send update allocation event to core + task.updateAllocation() + + if !utils.PodAlreadyBound(task.pod) { + // if this is a new request, add events to pod + events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, v1.EventTypeNormal, "Scheduling", "Scheduling", + "%s is queued and waiting for allocation", task.alias) + // if this task belongs to a task group, that means the app has gang scheduling enabled + // in this case, post an event to indicate the task is being gang scheduled + if !task.placeholder && task.taskGroupName != "" { + events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, + v1.EventTypeNormal, "GangScheduling", "TaskGroupMatch", + "Pod belongs to the taskGroup %s, it will be scheduled as a gang member", task.taskGroupName) + } + } +} + +// updateAllocation updates the core scheduler when task information changes. +// This function must be called with the task lock held. +func (task *Task) updateAllocation() { // build preemption policy preemptionPolicy := &si.PreemptionPolicy{ AllowPreemptSelf: task.isPreemptSelfAllowed(), AllowPreemptOther: task.isPreemptOtherAllowed(), } - // submit allocation ask + // submit allocation rr := common.CreateAllocationForTask( task.applicationID, task.taskID, @@ -305,22 +328,9 @@ func (task *Task) handleSubmitTaskEvent() { preemptionPolicy) log.Log(log.ShimCacheTask).Debug("send update request", zap.Stringer("request", rr)) if err := task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err != nil { - log.Log(log.ShimCacheTask).Debug("failed to send scheduling request to scheduler", zap.Error(err)) + log.Log(log.ShimCacheTask).Debug("failed to send allocation to scheduler", zap.Error(err)) return } - - if !utils.PodAlreadyBound(task.pod) { - // if this is a new request, add events to pod - events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, v1.EventTypeNormal, "Scheduling", "Scheduling", - "%s is queued and waiting for allocation", task.alias) - // if this task belongs to a task group, that means the app has gang scheduling enabled - // in this case, post an event to indicate the task is being gang scheduled - if !task.placeholder && task.taskGroupName != "" { - events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, - v1.EventTypeNormal, "GangScheduling", "TaskGroupMatch", - "Pod belongs to the taskGroup %s, it will be scheduled as a gang member", task.taskGroupName) - } - } } // this is called after task reaches PENDING state, @@ -604,20 +614,42 @@ func (task *Task) UpdatePodCondition(podCondition *v1.PodCondition) (bool, *v1.P return false, pod } +func (task *Task) GetAllocationKey() string { + task.lock.RLock() + defer task.lock.RUnlock() + return task.allocationKey +} + func (task *Task) setAllocationKey(allocationKey string) { task.lock.Lock() defer task.lock.Unlock() task.allocationKey = allocationKey } +func (task *Task) FailWithEvent(errorMessage, actionReason string) { + task.lock.RLock() + defer task.lock.RUnlock() + task.failWithEvent(errorMessage, actionReason) +} + func (task *Task) failWithEvent(errorMessage, actionReason string) { dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID, errorMessage)) events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, v1.EventTypeWarning, actionReason, actionReason, errorMessage) } -func (task *Task) setTaskPod(pod *v1.Pod) { +func (task *Task) SetTaskPod(pod *v1.Pod) { task.lock.Lock() defer task.lock.Unlock() + task.pod = pod + oldResource := task.resource + newResource := common.GetPodResource(pod) + if !common.Equals(oldResource, newResource) { + // pod resources have changed + task.resource = newResource + + // update allocation in core + task.updateAllocation() + } } diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go index 1ec5f7d8d..d1fcc3369 100644 --- a/pkg/cache/task_test.go +++ b/pkg/cache/task_test.go @@ -203,7 +203,7 @@ func TestReleaseTaskAllocation(t *testing.T) { // bind a task is a async process, wait for it to happen err = utils.WaitForCondition( func() bool { - return task.getNodeName() == "node-1" + return task.GetNodeName() == "node-1" }, 100*time.Millisecond, 3*time.Second, @@ -481,7 +481,7 @@ func TestSetTaskGroup(t *testing.T) { } task := NewTask("task01", app, mockedContext, pod) task.setTaskGroupName("test-group") - assert.Equal(t, task.getTaskGroupName(), "test-group") + assert.Equal(t, task.GetTaskGroupName(), "test-group") } //nolint:funlen