Skip to content

Commit

Permalink
fix: retain previous output during intermediate states (#106)
Browse files Browse the repository at this point in the history
* fix: retain previous output during intermediate states

* feat: add new resource labels to resource revision

* fix: firehose replicas should always be greater than 0

* fix: create resource revision only for user triggered updates

* feat: add revision reason to resource revisions

Co-authored-by: Rohil Surana <[email protected]>
  • Loading branch information
spy16 and rohilsurana authored Jan 12, 2023
1 parent 9a3050e commit 43e888f
Show file tree
Hide file tree
Showing 18 changed files with 68 additions and 45 deletions.
24 changes: 13 additions & 11 deletions core/mocks/resource_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/module/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Driver interface {
type Plan struct {
Resource resource.Resource
ScheduleRunAt time.Time
Reason string
}

// Loggable extension of driver allows streaming log data for a resource.
Expand Down
3 changes: 2 additions & 1 deletion core/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Store interface {
List(ctx context.Context, filter Filter) ([]Resource, error)

Create(ctx context.Context, r Resource, hooks ...MutationHook) error
Update(ctx context.Context, r Resource, hooks ...MutationHook) error
Update(ctx context.Context, r Resource, saveRevision bool, reason string, hooks ...MutationHook) error
Delete(ctx context.Context, urn string, hooks ...MutationHook) error

Revisions(ctx context.Context, selector RevisionsSelector) ([]Revision, error)
Expand Down Expand Up @@ -68,6 +68,7 @@ type RevisionsSelector struct {
type Revision struct {
ID int64 `json:"id"`
URN string `json:"urn"`
Reason string `json:"reason"`
Labels map[string]string `json:"labels"`
CreatedAt time.Time `json:"created_at"`

Expand Down
2 changes: 1 addition & 1 deletion core/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *Service) syncChange(ctx context.Context, urn string) (*resource.Resourc
return nil, err
}
} else {
if err := s.upsert(ctx, module.Plan{Resource: *res}, false); err != nil {
if err := s.upsert(ctx, module.Plan{Resource: *res}, false, false, ""); err != nil {
return nil, err
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *Service) execAction(ctx context.Context, res resource.Resource, act mod
planned.Resource.UpdatedAt = s.clock()
}

if err := s.upsert(ctx, *planned, isCreate(act.Name)); err != nil {
if err := s.upsert(ctx, *planned, isCreate(act.Name), true, planned.Reason); err != nil {
return nil, err
}
return &planned.Resource, nil
Expand Down Expand Up @@ -101,7 +101,7 @@ func (s *Service) planChange(ctx context.Context, res resource.Resource, act mod
return planned, nil
}

func (s *Service) upsert(ctx context.Context, plan module.Plan, isCreate bool) error {
func (s *Service) upsert(ctx context.Context, plan module.Plan, isCreate bool, saveRevision bool, reason string) error {
var hooks []resource.MutationHook
hooks = append(hooks, func(ctx context.Context) error {
if plan.Resource.State.IsTerminal() {
Expand All @@ -122,7 +122,7 @@ func (s *Service) upsert(ctx context.Context, plan module.Plan, isCreate bool) e
if isCreate {
err = s.store.Create(ctx, plan.Resource, hooks...)
} else {
err = s.store.Update(ctx, plan.Resource, hooks...)
err = s.store.Update(ctx, plan.Resource, saveRevision, reason, hooks...)
}

if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions core/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ func TestService_UpdateResource(t *testing.T) {
Once()

resourceRepo.EXPECT().
Update(mock.Anything, mock.Anything, mock.Anything).
Run(func(ctx context.Context, r resource.Resource, hooks ...resource.MutationHook) {
Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Run(func(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook) {
assert.Len(t, hooks, 1)
assert.NoError(t, hooks[0](ctx))
}).
Expand Down Expand Up @@ -453,9 +453,9 @@ func TestService_UpdateResource(t *testing.T) {
Return(&testResource, nil).Once()

resourceRepo.EXPECT().
Update(mock.Anything, mock.Anything, mock.Anything).
Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil).
Run(func(ctx context.Context, r resource.Resource, hooks ...resource.MutationHook) {
Run(func(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook) {
assert.Len(t, hooks, 1)
assert.NoError(t, hooks[0](ctx))
}).
Expand Down Expand Up @@ -578,7 +578,7 @@ func TestService_DeleteResource(t *testing.T) {
Once()

resourceRepo.EXPECT().
Update(mock.Anything, mock.Anything, mock.Anything).
Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(testErr).
Once()

Expand Down Expand Up @@ -625,7 +625,7 @@ func TestService_DeleteResource(t *testing.T) {
Once()

resourceRepo.EXPECT().
Update(mock.Anything, mock.Anything, mock.Anything).
Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil).
Once()

Expand Down Expand Up @@ -780,7 +780,7 @@ func TestService_ApplyAction(t *testing.T) {
}, nil).
Once()
resourceRepo.EXPECT().
Update(mock.Anything, mock.Anything, mock.Anything).
Update(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil).
Once()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/stretchr/testify v1.7.1
github.com/xeipuuv/gojsonschema v1.2.0
go.buf.build/odpf/gw/odpf/proton v1.1.122
go.buf.build/odpf/gwv/odpf/proton v1.1.133
go.buf.build/odpf/gwv/odpf/proton v1.1.172
go.opencensus.io v0.23.0
go.uber.org/zap v1.21.0
google.golang.org/grpc v1.46.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1409,8 +1409,8 @@ go.buf.build/odpf/gw/odpf/proton v1.1.122 h1:6NM4D8VwKIdq6F0A5nXnmxPp7LnzuwsGCeV
go.buf.build/odpf/gw/odpf/proton v1.1.122/go.mod h1:FySqyI0YPPldpzXULKDcIC/bMJIdGaO6j36i1ZKJSvE=
go.buf.build/odpf/gwv/envoyproxy/protoc-gen-validate v1.1.7/go.mod h1:2Tg6rYIoDhpl39Zd2+WBOF9uG4XxAOs0bK2Z2/bwTOc=
go.buf.build/odpf/gwv/grpc-ecosystem/grpc-gateway v1.1.46/go.mod h1:UrBCdmHgaY/pLapYUMOq01c1yuzwT8AEBTsgpmzq2zo=
go.buf.build/odpf/gwv/odpf/proton v1.1.133 h1:lYGtd7HoAA/KtEOi9ncfx44Pdi9RTr7HyyqdqCTytRI=
go.buf.build/odpf/gwv/odpf/proton v1.1.133/go.mod h1:V6NNZKrRPHjMkIPiSXvwUHks0D8bUGPXAjXUaujG/90=
go.buf.build/odpf/gwv/odpf/proton v1.1.172 h1:cGk4ctsVhBK4d6mV+QVrJD0rWkXtDO+ogCA8l3BCkhk=
go.buf.build/odpf/gwv/odpf/proton v1.1.172/go.mod h1:V6NNZKrRPHjMkIPiSXvwUHks0D8bUGPXAjXUaujG/90=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
Expand Down
1 change: 1 addition & 0 deletions internal/server/v1/resources/mappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func revisionToProto(revision resource.Revision) (*entropyv1beta1.ResourceRevisi
return &entropyv1beta1.ResourceRevision{
Id: strconv.FormatInt(revision.ID, decimalBase),
Urn: revision.URN,
Reason: revision.Reason,
Labels: revision.Labels,
CreatedAt: timestamppb.New(revision.CreatedAt),
Spec: spec,
Expand Down
24 changes: 13 additions & 11 deletions internal/store/postgres/resource_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ func (st *Store) Create(ctx context.Context, r resource.Resource, hooks ...resou
return translateErr(err)
}

// TODO: Add labels for revisions
rev := resource.Revision{
URN: r.URN,
Spec: r.Spec,
Labels: map[string]string{},
Labels: r.Labels,
Reason: "resource created",
}

if err := insertRevision(ctx, tx, rev); err != nil {
Expand All @@ -135,7 +135,7 @@ func (st *Store) Create(ctx context.Context, r resource.Resource, hooks ...resou
return nil
}

func (st *Store) Update(ctx context.Context, r resource.Resource, hooks ...resource.MutationHook) error {
func (st *Store) Update(ctx context.Context, r resource.Resource, saveRevision bool, reason string, hooks ...resource.MutationHook) error {
updateResource := func(ctx context.Context, tx *sqlx.Tx) error {
id, err := translateURNToID(ctx, tx, r.URN)
if err != nil {
Expand Down Expand Up @@ -165,15 +165,17 @@ func (st *Store) Update(ctx context.Context, r resource.Resource, hooks ...resou
return err
}

// TODO: Add labels for revisions
rev := resource.Revision{
URN: r.URN,
Spec: r.Spec,
Labels: map[string]string{},
}
if saveRevision {
rev := resource.Revision{
URN: r.URN,
Spec: r.Spec,
Labels: r.Labels,
Reason: reason,
}

if err := insertRevision(ctx, tx, rev); err != nil {
return translateErr(err)
if err := insertRevision(ctx, tx, rev); err != nil {
return translateErr(err)
}
}

return runAllHooks(ctx, hooks)
Expand Down
3 changes: 2 additions & 1 deletion internal/store/postgres/revision_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
type revisionModel struct {
ID int64 `db:"id"`
URN string `db:"urn"`
Reason string `db:"reason"`
CreatedAt time.Time `db:"created_at"`
SpecConfigs []byte `db:"spec_configs"`
}

func readRevisionRecord(ctx context.Context, r sqlx.QueryerContext, id int64, into *revisionModel) error {
cols := []string{"id", "urn", "created_at", "spec_configs"}
cols := []string{"id", "urn", "reason", "created_at", "spec_configs"}
builder := sq.Select(cols...).From(tableRevisions).Where(sq.Eq{"id": id})

query, args, err := builder.PlaceholderFormat(sq.Dollar).ToSql()
Expand Down
5 changes: 3 additions & 2 deletions internal/store/postgres/revision_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (st *Store) getRevisionByID(ctx context.Context, id int64) (*resource.Revis
return &resource.Revision{
ID: rec.ID,
URN: rec.URN,
Reason: rec.Reason,
Labels: tagsToLabelMap(tags),
CreatedAt: rec.CreatedAt,
Spec: resource.Spec{
Expand All @@ -98,8 +99,8 @@ func insertRevision(ctx context.Context, tx *sqlx.Tx, rev resource.Revision) err

func insertRevisionRecord(ctx context.Context, runner sq.BaseRunner, r resource.Revision) (int64, error) {
q := sq.Insert(tableRevisions).
Columns("urn", "spec_configs").
Values(r.URN, r.Spec.Configs).
Columns("urn", "reason", "spec_configs").
Values(r.URN, r.Reason, r.Spec.Configs).
Suffix(`RETURNING "id"`).
PlaceholderFormat(sq.Dollar)

Expand Down
3 changes: 2 additions & 1 deletion internal/store/postgres/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ CREATE TABLE IF NOT EXISTS modules (
updated_at timestamp with time zone NOT NULL DEFAULT current_timestamp
);

CREATE INDEX IF NOT EXISTS idx_modules_project ON modules (project);
CREATE INDEX IF NOT EXISTS idx_modules_project ON modules (project);
ALTER TABLE revisions ADD COLUMN IF NOT EXISTS reason TEXT DEFAULT '<none>' NOT NULL;
9 changes: 8 additions & 1 deletion modules/firehose/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (*firehoseModule) planCreate(res module.ExpandedResource, act module.Action
if reqConf.StopTime != nil {
plan.ScheduleRunAt = *reqConf.StopTime
}
plan.Reason = "firehose created"
return &plan, nil
}

Expand All @@ -70,6 +71,7 @@ func (*firehoseModule) planChange(res module.ExpandedResource, act module.Action
if conf.StopTime != nil {
plan.ScheduleRunAt = *conf.StopTime
}
plan.Reason = "firehose config updated"

case ScaleAction:
var scaleParams struct {
Expand All @@ -79,17 +81,21 @@ func (*firehoseModule) planChange(res module.ExpandedResource, act module.Action
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
}
conf.Firehose.Replicas = scaleParams.Replicas
plan.Reason = "firehose scaled"

case StartAction:
conf.State = stateRunning
plan.Reason = "firehose started"

case StopAction:
conf.State = stateStopped
plan.Reason = "firehose stopped"
}

r.Spec.Configs = conf.JSON()
r.State = resource.State{
Status: resource.StatusPending,
Output: res.State.Output,
ModuleData: moduleData{
PendingSteps: []string{releaseUpdate},
}.JSON(),
Expand Down Expand Up @@ -125,12 +131,13 @@ func (*firehoseModule) planReset(res module.ExpandedResource, act module.ActionR
r.Spec.Configs = conf.JSON()
r.State = resource.State{
Status: resource.StatusPending,
Output: res.State.Output,
ModuleData: moduleData{
PendingSteps: []string{releaseUpdate, consumerReset, releaseUpdate},
ResetTo: resetTo,
StateOverride: stateStopped,
}.JSON(),
}

return &module.Plan{Resource: r}, nil
return &module.Plan{Resource: r, Reason: "firehose consumer reset"}, nil
}
4 changes: 4 additions & 0 deletions modules/firehose/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestFirehoseModule_Plan(t *testing.T) {
ModuleData: []byte(`{"pending_steps":["release_create"]}`),
},
},
Reason: "firehose created",
},
},
{
Expand Down Expand Up @@ -96,6 +97,7 @@ func TestFirehoseModule_Plan(t *testing.T) {
ModuleData: []byte(`{"pending_steps":["release_update"]}`),
},
},
Reason: "firehose scaled",
},
},
{
Expand All @@ -119,6 +121,7 @@ func TestFirehoseModule_Plan(t *testing.T) {
ModuleData: []byte(`{"pending_steps":["release_update","consumer_reset","release_update"],"reset_to":"2022-06-22T00:00:00+00:00","state_override":"STOPPED"}`),
},
},
Reason: "firehose consumer reset",
},
},
{
Expand All @@ -143,6 +146,7 @@ func TestFirehoseModule_Plan(t *testing.T) {
},
},
ScheduleRunAt: parseTime("3022-07-13T00:40:14.028016Z"),
Reason: "firehose created",
},
},
}
Expand Down
3 changes: 2 additions & 1 deletion modules/firehose/schema/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
"properties": {
"replicas": {
"type": "number",
"default": 1
"default": 1,
"minimum": 1
},
"kafka_broker_address": {
"type": "string"
Expand Down
3 changes: 2 additions & 1 deletion modules/firehose/schema/scale.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"type": "object",
"properties": {
"replicas": {
"type": "number"
"type": "number",
"minimum": 1
}
},
"required": ["replicas"]
Expand Down
2 changes: 1 addition & 1 deletion modules/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (m *kubeModule) Plan(ctx context.Context, res module.ExpandedResource, act
Status: resource.StatusCompleted,
Output: output,
}
return &module.Plan{Resource: res.Resource}, nil
return &module.Plan{Resource: res.Resource, Reason: "kubernetes cluster details updated"}, nil
}

func (*kubeModule) Sync(_ context.Context, res module.ExpandedResource) (*resource.State, error) {
Expand Down

0 comments on commit 43e888f

Please sign in to comment.