Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimus spec based siren alerts #299

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/goto/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "526e657b03d243a4c9f880e6c4ffbe15b116afd5"
PROTON_COMMIT := "8f4ba32fa794363bd48938bf415250cff0d1d504"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down
6 changes: 6 additions & 0 deletions client/local/model/job_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type JobSpecBehaviorNotifier struct {
On string `yaml:"on"`
Config map[string]string `yaml:"config"`
Channels []string `yaml:"channels"`
Severity string `yaml:"severity"`
Team string `yaml:"team"`
}

type WebhookEndpoint struct {
Expand Down Expand Up @@ -215,6 +217,8 @@ func (j *JobSpec) getProtoJobSpecBehavior() *pb.JobSpecification_Behavior {
On: pb.JobEvent_Type(pb.JobEvent_Type_value[utils.ToEnumProto(notify.On, "type")]),
Channels: notify.Channels,
Config: notify.Config,
Severity: notify.Severity,
Team: notify.Team,
}
}
}
Expand Down Expand Up @@ -602,6 +606,8 @@ func toJobSpecBehavior(protoBehavior *pb.JobSpecification_Behavior, dependsOnPas
On: utils.FromEnumProto(protoNotifier.On.String(), "type"),
Config: protoNotifier.Config,
Channels: protoNotifier.Channels,
Severity: protoNotifier.Severity,
Team: protoNotifier.Team,
}
notifiers = append(notifiers, notifier)
}
Expand Down
8 changes: 5 additions & 3 deletions config/config_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ type TelemetryConfig struct {
}

type AlertingConfig struct {
EventManager EventManagerConfig `mapstructure:"alert_manager"`
Dashboard string `mapstructure:"dashboard"`
DataConsole string `mapstructure:"data_console"`
EventManager EventManagerConfig `mapstructure:"alert_manager"`
Dashboard string `mapstructure:"dashboard"`
DataConsole string `mapstructure:"data_console"`
EnableSlack bool `mapstructure:"enable_slack"`
EnablePagerDuty bool `mapstructure:"enable_pager_duty"`
}

type EventManagerConfig struct {
Expand Down
3 changes: 2 additions & 1 deletion core/job/handler/v1beta1/job_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func toAlerts(notifiers []*pb.JobSpecification_Behavior_Notifiers) ([]*job.Alert
if err != nil {
return nil, err
}
alertConfig, err := job.NewAlertSpec(alertOn, notify.Channels, config)
alertConfig, err := job.NewAlertSpec(alertOn, notify.Channels, config, notify.GetSeverity(), notify.GetTeam())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -350,6 +350,7 @@ func fromAlerts(jobAlerts []*job.AlertSpec) []*pb.JobSpecification_Behavior_Noti
On: pb.JobEvent_Type(pb.JobEvent_Type_value[utils.ToEnumProto(alert.On(), "type")]),
Channels: alert.Channels(),
Config: alert.Config(),
Severity: alert.Severity(),
})
}
return notifiers
Expand Down
1 change: 1 addition & 0 deletions core/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type AlertAttrs struct {
Tenant tenant.Tenant
EventTime time.Time
ChangeType ChangeType
Job *Spec
}

type UpdateImpact string
Expand Down
1 change: 1 addition & 0 deletions core/job/service/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,7 @@ func (j *JobService) raiseUpdateEvent(incomingJob *job.Job, impactType job.Updat
Tenant: incomingJob.Tenant(),
EventTime: time.Now(),
ChangeType: job.ChangeTypeUpdate,
Job: incomingJob.Spec(),
})
jobEvent, err := event.NewJobUpdateEvent(incomingJob, impactType)
if err != nil {
Expand Down
29 changes: 28 additions & 1 deletion core/job/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ type AlertSpec struct {

channels []string
config Config
severity string
team string
}

type WebhookEndPoint struct {
Expand All @@ -560,14 +562,31 @@ type WebhookSpec struct {
Endpoints []WebhookEndPoint
}

func NewAlertSpec(on string, channels []string, config Config) (*AlertSpec, error) {
const (
DefaultSeverity = "INFO"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in here default severity is INFO, while in https://github.com/goto/optimus/pull/299/files#diff-ae8202ecdfcffa6355ad138f9caf0e43b8c5163a4a831e28be335bbcc3a66f2cR28 it is warning. Both should be warning, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes correct,
fixing this

WarningSeverity = "WARNING"
CriticalSeverity = "CRITICAL"
)

func getSeverity(severity string) string {
switch strings.ToUpper(severity) {
case WarningSeverity, CriticalSeverity:
return strings.ToUpper(severity)
default:
return DefaultSeverity
}
}

func NewAlertSpec(on string, channels []string, config Config, severity, team string) (*AlertSpec, error) {
if err := validateMap(config); err != nil {
return nil, err
}
return &AlertSpec{
on: on,
channels: channels,
config: config,
severity: getSeverity(severity),
team: team,
}, nil
}

Expand All @@ -583,6 +602,14 @@ func (a AlertSpec) Config() Config {
return a.config
}

func (a AlertSpec) Severity() string {
return a.severity
}

func (a AlertSpec) Team() string {
return a.team
}

// TODO: reconsider whether we still need it or not
type SpecHTTPUpstream struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion core/job/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestEntitySpec(t *testing.T) {
httpUpstreamHeader := map[string]string{"header-key": "sample-header-val"}
httpUpstream, _ := job.NewSpecHTTPUpstreamBuilder("sample-name", "sample-url").WithParams(httpUpstreamConfig).WithHeaders(httpUpstreamHeader).Build()
specUpstream, _ := job.NewSpecUpstreamBuilder().WithUpstreamNames([]job.SpecUpstreamName{"job-d"}).WithSpecHTTPUpstream([]*job.SpecHTTPUpstream{httpUpstream}).Build()
alert, _ := job.NewAlertSpec("sla_miss", []string{"sample-channel"}, jobAlertConfig)
alert, _ := job.NewAlertSpec("sla_miss", []string{"sample-channel"}, jobAlertConfig, "CRITICAL", "")
assetMap := map[string]string{"key": "value"}
asset, _ := job.AssetFrom(assetMap)
resourceRequestConfig := job.NewMetadataResourceConfig("250m", "128Mi")
Expand Down
21 changes: 13 additions & 8 deletions core/scheduler/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@ import (
)

type (
EventName string
EventStatus string
JobEventType string
JobEventCategory string
EventName string
EventStatus string
JobEventType string
EventCategory string
)

const (
EntityEvent = "event"

ISODateFormat = "2006-01-02T15:04:05Z"

EventCategorySLAMiss JobEventCategory = "sla_miss"
EventCategoryJobFailure JobEventCategory = "failure"
EventCategoryJobSuccess JobEventCategory = "success"
EventCategorySLAMiss EventCategory = "sla_miss"
EventCategoryJobFailure EventCategory = "failure"
EventCategoryJobSuccess EventCategory = "job_success"
EventCategoryReplay EventCategory = "replay_lifecycle"

SLAMissEvent JobEventType = "sla_miss"
JobFailureEvent JobEventType = "failure"
Expand Down Expand Up @@ -111,7 +112,11 @@ type Event struct {
SLAObjectList []*SLAObject
}

func (event JobEventType) IsOfType(category JobEventCategory) bool {
func (e EventCategory) String() string {
return string(e)
}

func (event JobEventType) IsOfType(category EventCategory) bool {
switch category {
case EventCategoryJobFailure:
if event == JobFailureEvent {
Expand Down
6 changes: 3 additions & 3 deletions core/scheduler/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,15 @@ func TestFromStringToEventType(t *testing.T) {
assert.Equal(t, &outputObj, output)
})
})
t.Run("IsOfType JobEventCategory", func(t *testing.T) {
positiveExpectationMap := map[scheduler.JobEventType]scheduler.JobEventCategory{
t.Run("IsOfType EventCategory", func(t *testing.T) {
positiveExpectationMap := map[scheduler.JobEventType]scheduler.EventCategory{
scheduler.JobFailureEvent: scheduler.EventCategoryJobFailure,
scheduler.SLAMissEvent: scheduler.EventCategorySLAMiss,
}
for eventType, category := range positiveExpectationMap {
assert.True(t, eventType.IsOfType(category))
}
negativeExpectationMap := map[scheduler.JobEventType]scheduler.JobEventCategory{
negativeExpectationMap := map[scheduler.JobEventType]scheduler.EventCategory{
scheduler.SLAMissEvent: scheduler.EventCategoryJobFailure,
scheduler.SensorRetryEvent: scheduler.EventCategoryJobFailure,
scheduler.SensorSuccessEvent: scheduler.EventCategorySLAMiss,
Expand Down
6 changes: 4 additions & 2 deletions core/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,11 @@ type Retry struct {
}

type Alert struct {
On JobEventCategory
On EventCategory
Channels []string
Config map[string]string
Severity string
Team string
}

type WebhookEndPoint struct {
Expand All @@ -202,7 +204,7 @@ type WebhookEndPoint struct {
}

type Webhook struct {
On JobEventCategory
On EventCategory
Endpoints []WebhookEndPoint
}

Expand Down
4 changes: 4 additions & 0 deletions core/scheduler/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type AlertAttrs struct {
SchedulerHost string
Status EventStatus
JobEvent *Event

JobWithDetails *JobWithDetails
}

type ReplayNotificationAttrs struct {
Expand All @@ -85,6 +87,8 @@ type ReplayNotificationAttrs struct {
Tenant tenant.Tenant
JobURN string
State ReplayState

JobWithDetails *JobWithDetails
}

type WebhookAttrs struct {
Expand Down
22 changes: 16 additions & 6 deletions core/scheduler/service/events_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ func (e *EventsService) Relay(ctx context.Context, event *scheduler.Event) error
} else {
status = scheduler.StatusResolved
}

e.alertManager.SendJobRunEvent(&scheduler.AlertAttrs{
Owner: jobDetails.JobMetadata.Owner,
JobURN: jobDetails.Job.URN(),
Title: "Optimus Job Alert",
SchedulerHost: schedulerHost,
Status: status,
JobEvent: event,
Owner: jobDetails.JobMetadata.Owner,
JobURN: jobDetails.Job.URN(),
Title: "Optimus Job Alert",
SchedulerHost: schedulerHost,
Status: status,
JobEvent: event,
JobWithDetails: jobDetails,
})
return nil
}
Expand Down Expand Up @@ -131,6 +133,10 @@ func (e *EventsService) Webhook(ctx context.Context, event *scheduler.Event) err
}

func (e *EventsService) Push(ctx context.Context, event *scheduler.Event) error {
if !(event.Type.IsOfType(scheduler.EventCategoryJobFailure) || event.Type.IsOfType(scheduler.EventCategorySLAMiss)) {
return nil
}

jobDetails, err := e.jobRepo.GetJobDetails(ctx, event.Tenant.ProjectName(), event.JobName)
if err != nil {
e.l.Error("error getting detail for job [%s]: %s", event.JobName, err)
Expand All @@ -146,6 +152,10 @@ func (e *EventsService) Push(ctx context.Context, event *scheduler.Event) error
for _, channel := range notify.Channels {
chanParts := strings.Split(channel, "://")
scheme := chanParts[0]
if _, ok := e.notifyChannels[scheme]; !ok {
e.l.Warn("Scheme: %s, is not enabled", scheme)
continue
}
route := chanParts[1]

e.l.Debug("notification event for job: %s , event: %+v", event.JobName, event)
Expand Down
4 changes: 3 additions & 1 deletion core/scheduler/service/events_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestNotificationService(t *testing.T) {
event := &scheduler.Event{
JobName: jobName,
Tenant: tnnt,
Type: scheduler.TaskStartEvent,
Type: scheduler.JobFailureEvent,
Values: map[string]any{},
}
err := notifyService.Push(ctx, event)
Expand Down Expand Up @@ -142,6 +142,8 @@ func TestNotificationService(t *testing.T) {
SchedulerHost: "localhost",
Status: scheduler.StatusFiring,
JobEvent: event,

JobWithDetails: &jobWithDetails,
})
tenantService := new(mockTenantService)
tenantWithDetails, _ := tenant.NewTenantDetails(project, namespace, []*tenant.PlainTextSecret{})
Expand Down
16 changes: 10 additions & 6 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,17 @@ func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobNa
jobName.String(),
replayReq.State().String(),
).Inc()

jobWithDetails, err := r.jobRepo.GetJobDetails(ctx, t.ProjectName(), jobName)
if err != nil {
return uuid.Nil, err
}
r.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: t,
JobURN: jobName.GetJobURN(t),
State: scheduler.ReplayStateCreated,
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: t,
JobURN: jobName.GetJobURN(t),
State: scheduler.ReplayStateCreated,
JobWithDetails: jobWithDetails,
})

go r.executor.Execute(replayID, replayReq.Tenant(), jobName) //nolint:contextcheck
Expand Down
25 changes: 21 additions & 4 deletions core/scheduler/service/replay_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobN
errMessage := err.Error()
if errors.Is(err, context.DeadlineExceeded) {
errMessage = "replay execution timed out"
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
w.sendReplayEvent(ctx, scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: jobTenant,
JobURN: jobName.GetJobURN(jobTenant),
State: scheduler.ReplayStateTimeout,
})
} else {
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
w.sendReplayEvent(ctx, scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: jobTenant,
Expand Down Expand Up @@ -159,7 +159,7 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI

if replayWithRun.Replay.IsTerminated() {
t := replayWithRun.Replay.Tenant()
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
w.sendReplayEvent(ctx, scheduler.ReplayNotificationAttrs{
JobName: replayWithRun.Replay.JobName().String(),
ReplayID: replayID.String(),
Tenant: t,
Expand Down Expand Up @@ -254,7 +254,8 @@ func (w *ReplayWorker) finishReplay(ctx context.Context, replay *scheduler.Repla
if syncedRunStatus.IsAnyFailure() {
replayState = scheduler.ReplayStateFailed
}
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{

w.sendReplayEvent(ctx, scheduler.ReplayNotificationAttrs{
JobName: replay.JobName().String(),
ReplayID: replayID.String(),
Tenant: replay.Tenant(),
Expand Down Expand Up @@ -432,3 +433,19 @@ func (w *ReplayWorker) getRequestsToProcess(ctx context.Context, replays []*sche
replayReqLag.Set(maxLag)
return requestsToProcess
}

func (w *ReplayWorker) sendReplayEvent(ctx context.Context, attr scheduler.ReplayNotificationAttrs) error {
jobName, err := scheduler.JobNameFrom(attr.JobName)
if err != nil {
w.logger.Error("[ReplayID: %s] unable adapt job name from %s", attr.ReplayID, attr.JobName)
return err
}
jobWithDetails, err := w.jobRepo.GetJobDetails(ctx, attr.Tenant.ProjectName(), jobName)
if err != nil {
w.logger.Error("[ReplayID: %s] unable get jobWithDetails for %s", attr.ReplayID, jobName)
return err
}
attr.JobWithDetails = jobWithDetails
w.alertManager.SendReplayEvent(&attr)
return nil
}
Loading
Loading