Skip to content

Commit

Permalink
feat: auto dependency resolution support for maxcompute (#285)
Browse files Browse the repository at this point in the history
* feat: add upstream identifier for maxcompute

* fix: linter

* test: add test case for maxcompute identifier

* fix: linter

* feat: support tick quote in one part of table
  • Loading branch information
deryrahman committed Nov 6, 2024
1 parent 989b176 commit 33a350a
Show file tree
Hide file tree
Showing 11 changed files with 385 additions and 148 deletions.
30 changes: 19 additions & 11 deletions plugin/plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type EvaluatorFactory interface {

type UpstreamIdentifierFactory interface {
GetBQUpstreamIdentifier(ctx context.Context, svcAcc string, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error)
GetMaxcomputeUpstreamIdentifier(ctx context.Context, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error)
}

type PluginService struct {
Expand Down Expand Up @@ -108,20 +109,27 @@ func (s PluginService) IdentifyUpstreams(ctx context.Context, taskName string, c
evaluators = append(evaluators, evaluator)
}

if parserType != plugin.BQParser {
switch parserType {
case plugin.MaxcomputeParser:
upstreamIdentifier, err := s.upstreamIdentifierFactory.GetMaxcomputeUpstreamIdentifier(ctx, evaluators...)
if err != nil {
return nil, err
}
upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier)
case plugin.BQParser:
svcAcc, ok := compiledConfig[bqSvcAccKey]
if !ok {
return nil, fmt.Errorf("secret " + bqSvcAccKey + " required to generate upstream is not found")
}
upstreamIdentifier, err := s.upstreamIdentifierFactory.GetBQUpstreamIdentifier(ctx, svcAcc, evaluators...)
if err != nil {
return nil, err
}
upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier)
default:
s.l.Warn("parserType %s is not supported", parserType)
continue
}
// for now parser type is only scoped for bigquery, so that it uses bigquery as upstream identifier
svcAcc, ok := compiledConfig[bqSvcAccKey]
if !ok {
return nil, fmt.Errorf("secret " + bqSvcAccKey + " required to generate upstream is not found")
}
upstreamIdentifier, err := s.upstreamIdentifierFactory.GetBQUpstreamIdentifier(ctx, svcAcc, evaluators...)
if err != nil {
return nil, err
}
upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier)
}

// identify all upstream resource urns by all identifier from given asset
Expand Down
37 changes: 37 additions & 0 deletions plugin/plugin_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,43 @@ func (_m *UpstreamIdentifierFactory) GetBQUpstreamIdentifier(ctx context.Context
return r0, r1
}

// GetMaxcomputeUpstreamIdentifier provides a mock function with given fields: ctx, evaluators
func (_m *UpstreamIdentifierFactory) GetMaxcomputeUpstreamIdentifier(ctx context.Context, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error) {
_va := make([]interface{}, len(evaluators))
for _i := range evaluators {
_va[_i] = evaluators[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)

if len(ret) == 0 {
panic("no return value specified for GetMaxcomputeUpstreamIdentifier")
}

var r0 upstreamidentifier.UpstreamIdentifier
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error)); ok {
return rf(ctx, evaluators...)
}
if rf, ok := ret.Get(0).(func(context.Context, ...evaluator.Evaluator) upstreamidentifier.UpstreamIdentifier); ok {
r0 = rf(ctx, evaluators...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(upstreamidentifier.UpstreamIdentifier)
}
}

if rf, ok := ret.Get(1).(func(context.Context, ...evaluator.Evaluator) error); ok {
r1 = rf(ctx, evaluators...)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// NewUpstreamIdentifierFactory creates a new instance of UpstreamIdentifierFactory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewUpstreamIdentifierFactory(t interface {
Expand Down
3 changes: 2 additions & 1 deletion plugin/upstream_identifier/bq_upstream_identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/ext/store/bigquery"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/plugin/upstream_identifier/parser"
)

type (
Expand Down Expand Up @@ -129,7 +130,7 @@ func NewBQUpstreamIdentifier(logger log.Logger, parserFunc ParserFunc, bqExtract

return &BQUpstreamIdentifier{
logger: logger,
parserFunc: parserFunc,
parserFunc: parser.BQURNDecorator(parserFunc),
extractorFunc: bqExtractorDecorator(logger, bqExtractorFunc),
evaluatorFuncs: sanitizedEvaluatorFuncs,
}, nil
Expand Down
19 changes: 9 additions & 10 deletions plugin/upstream_identifier/bq_upstream_identifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestIdentifyResources(t *testing.T) {
defer bqExtractorFunc.AssertExpectations(t)

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"})
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"})
bqExtractorFunc.On("Execute", ctx, mock.Anything).Return(nil, errors.New("some error"))

bqUpstreamIdentifier, err := upstreamidentifier.NewBQUpstreamIdentifier(logger, parserFunc.Execute, bqExtractorFunc.Execute, evaluatorFunc.Execute)
Expand All @@ -105,9 +105,8 @@ func TestIdentifyResources(t *testing.T) {
defer bqExtractorFunc.AssertExpectations(t)

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"broken://project1;dataset1.name1"})
// bq extractor should receives empty resource urn, since the urn construction is fail
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{}).Return(map[bigquery.ResourceURN]string{}, nil)
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1;dataset1.name1"})
// bq extractor should not be executed since the result of parser is empty

bqUpstreamIdentifier, err := upstreamidentifier.NewBQUpstreamIdentifier(logger, parserFunc.Execute, bqExtractorFunc.Execute, evaluatorFunc.Execute)
assert.NoError(t, err)
Expand Down Expand Up @@ -135,13 +134,13 @@ func TestIdentifyResources(t *testing.T) {
sqlView2 := "select 1 from `project1.dataset1.name1` join `project1.dataset1.name3` on true"

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"})
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1}, nil)

parserFunc.On("Execute", sqlView1).Return([]string{"bigquery://project1:dataset1.name2"})
parserFunc.On("Execute", sqlView1).Return([]string{"project1.dataset1.name2"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN2}).Return(map[bigquery.ResourceURN]string{resourceURN2: sqlView2}, nil)

parserFunc.On("Execute", sqlView2).Return([]string{"bigquery://project1:dataset1.name1", "bigquery://project1:dataset1.name3"})
parserFunc.On("Execute", sqlView2).Return([]string{"project1.dataset1.name1", "project1.dataset1.name3"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1, resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1, resourceURN3: ""}, nil)

parserFunc.On("Execute", "").Return([]string{})
Expand Down Expand Up @@ -172,13 +171,13 @@ func TestIdentifyResources(t *testing.T) {
sqlView2 := "select 1 from `project1.dataset1.name3`"

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"})
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1}, nil)

parserFunc.On("Execute", sqlView1).Return([]string{"bigquery://project1:dataset1.name2", "bigquery://project1:dataset1.name3"})
parserFunc.On("Execute", sqlView1).Return([]string{"project1.dataset1.name2", "project1.dataset1.name3"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN2, resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN2: sqlView2, resourceURN3: ""}, nil)

parserFunc.On("Execute", sqlView2).Return([]string{"bigquery://project1:dataset1.name3"})
parserFunc.On("Execute", sqlView2).Return([]string{"project1.dataset1.name3"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN3: ""}, nil)

parserFunc.On("Execute", "").Return([]string{})
Expand Down
74 changes: 74 additions & 0 deletions plugin/upstream_identifier/maxcompute_upstream_identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package upstreamidentifier

import (
"context"
"fmt"

"github.com/goto/salt/log"

"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/plugin/upstream_identifier/parser"
)

type MaxcomputeUpstreamIdentifier struct {
logger log.Logger
parserFunc ParserFunc
evaluatorFuncs []EvalAssetFunc
}

func NewMaxcomputeUpstreamIdentifier(logger log.Logger, parserFunc ParserFunc, evaluatorFuncs ...EvalAssetFunc) (*MaxcomputeUpstreamIdentifier, error) {
me := errors.NewMultiError("create maxcompute upstream generator errors")
if logger == nil {
me.Append(fmt.Errorf("logger is nil"))
}
if parserFunc == nil {
me.Append(fmt.Errorf("parserFunc is nil"))
}
sanitizedEvaluatorFuncs := []EvalAssetFunc{}
for _, evaluatorFunc := range evaluatorFuncs {
if evaluatorFunc != nil {
sanitizedEvaluatorFuncs = append(sanitizedEvaluatorFuncs, evaluatorFunc)
}
}
if len(sanitizedEvaluatorFuncs) == 0 {
me.Append(fmt.Errorf("non-nil evaluatorFuncs is needed"))
}
if me.ToErr() != nil {
return nil, me.ToErr()
}
return &MaxcomputeUpstreamIdentifier{
logger: logger,
parserFunc: parser.MaxcomputeURNDecorator(parserFunc),
evaluatorFuncs: evaluatorFuncs,
}, nil
}

func (g MaxcomputeUpstreamIdentifier) IdentifyResources(_ context.Context, assets map[string]string) ([]resource.URN, error) {
resourceURNs := []resource.URN{}

// generate resource urn with upstream from each evaluator
for _, evaluatorFunc := range g.evaluatorFuncs {
query := evaluatorFunc(assets)
if query == "" {
continue
}
resources := g.identifyResources(query)
resourceURNs = append(resourceURNs, resources...)
}
return resourceURNs, nil
}

func (g MaxcomputeUpstreamIdentifier) identifyResources(query string) []resource.URN {
resources := g.parserFunc(query)
resourceURNs := make([]resource.URN, len(resources))
for i, r := range resources {
resourceURN, err := resource.ParseURN(r)
if err != nil {
g.logger.Error("error when parsing resource urn %s", r)
continue
}
resourceURNs[i] = resourceURN
}
return resourceURNs
}
62 changes: 62 additions & 0 deletions plugin/upstream_identifier/maxcompute_upstream_identifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package upstreamidentifier_test

import (
"context"
"testing"

"github.com/goto/salt/log"
"github.com/stretchr/testify/assert"

upstreamidentifier "github.com/goto/optimus/plugin/upstream_identifier"
)

func TestNewMaxcomputeUpstreamIdentifier(t *testing.T) {
logger := log.NewNoop()
parserFunc := func(string) []string { return nil }
evaluatorFunc := func(map[string]string) string { return "" }
t.Run("return error when logger is nil", func(t *testing.T) {
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(nil, parserFunc, evaluatorFunc)
assert.Error(t, err)
assert.Nil(t, upstreamIdentifier)
})
t.Run("return error when parserFunc is nil", func(t *testing.T) {
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, nil, evaluatorFunc)
assert.Error(t, err)
assert.Nil(t, upstreamIdentifier)
})
t.Run("return error when no evaluators", func(t *testing.T) {
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc)
assert.Error(t, err)
assert.Nil(t, upstreamIdentifier)
})
t.Run("return error when evaluatorFuncs is nil", func(t *testing.T) {
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc, nil)
assert.Error(t, err)
assert.Nil(t, upstreamIdentifier)
})
t.Run("return success", func(t *testing.T) {
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc, evaluatorFunc)
assert.NoError(t, err)
assert.NotNil(t, upstreamIdentifier)
})
}

func TestMaxcomputeUpstreamIdentifier_IdentifyResources(t *testing.T) {
ctx := context.Background()
logger := log.NewNoop()
assets := map[string]string{
"./query.sql": "select 1 from project1.schema1.name1",
}
// TODO: adding failure test cases
t.Run("return success", func(t *testing.T) {
parserFunc := func(string) []string { return []string{"project1.schema1.name1"} }
evaluatorFunc := func(map[string]string) string { return "./query.sql" }
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc, evaluatorFunc)
assert.NoError(t, err)
assert.NotNil(t, upstreamIdentifier)
resourceURNs, err := upstreamIdentifier.IdentifyResources(ctx, assets)
assert.NoError(t, err)
assert.Len(t, resourceURNs, 1)
assert.Equal(t, "maxcompute://project1.schema1.name1", resourceURNs[0].String())
})
}
Loading

0 comments on commit 33a350a

Please sign in to comment.