Skip to content

Commit

Permalink
feat: handled column type STRUCT
Browse files Browse the repository at this point in the history
  • Loading branch information
solsticemj25 committed Nov 13, 2024
1 parent 546297c commit 34cfcde
Showing 1 changed file with 52 additions and 19 deletions.
71 changes: 52 additions & 19 deletions plugins/extractors/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import (
"context"
_ "embed" // used to print the embedded assets
"fmt"
"sync"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/goto/meteor/models"
v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
"github.com/goto/meteor/plugins"
"github.com/goto/meteor/registry"
"github.com/goto/meteor/utils"
"github.com/goto/salt/log"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -46,11 +47,11 @@ var summary string

var sampleConfig = `
project_name: goto_test
endpoint_project: maxcompute.ap-southeast-5.aliyuncs.com
endpoint_project: https://service.ap-southeast-5.maxcompute.aliyun.com/api
access_key:
id: access_key_id
secret: access_key_secret
schema_name: DEFAULT
schema_name: default
exclude:
schemas:
- schema_a
Expand Down Expand Up @@ -89,6 +90,8 @@ func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) error {

schemas := project.Schemas()

eg := errgroup.Group{}

err := schemas.List(func(schema *odps.Schema, err error) {
if err != nil {
e.logger.Error("failed to list schemas", "error", err)
Expand All @@ -106,49 +109,52 @@ func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) error {
newIns.SetDefaultProjectName(e.config.ProjectName)
newProj := newIns.Project(e.config.ProjectName)

e.fetchTablesFromSchema(newProj, emit)
eg.Go(func() error {
return e.fetchTablesFromSchema(newProj, emit)
})
})
if err != nil {
return err
}
return nil
return eg.Wait()
}

func (e *Extractor) fetchTablesFromSchema(project *odps.Project, emit plugins.Emit) {
var wg sync.WaitGroup
func (e *Extractor) fetchTablesFromSchema(project *odps.Project, emit plugins.Emit) error {
eg := errgroup.Group{}
project.Tables().List(
func(t *odps.Table, err error) {
wg.Add(1)
if err != nil {
e.logger.Error("table list", err)
return
}

if contains(e.config.Exclude.Tables, fmt.Sprintf("%s.%s", t.SchemaName(), t.Name())) {
wg.Done()
return
}
go e.processTable(t, emit, &wg)

eg.Go(func() error {
return e.processTable(t, emit)
})
},
)

wg.Wait()
return eg.Wait()
}

func (e *Extractor) processTable(table *odps.Table, emit plugins.Emit, wg *sync.WaitGroup) {
defer wg.Done()

func (e *Extractor) processTable(table *odps.Table, emit plugins.Emit) error {
err := table.Load()
if err != nil {
e.logger.Error("failed to get table info", "table", table.Name(), "error", err)
return
return err
}

asset, err := e.buildAsset(table)
if err != nil {
e.logger.Error("failed to build asset", "table", table.Name(), "error", err)
return
return err
}

emit(models.NewRecord(asset))
return nil
}

func (e *Extractor) buildAsset(tableInfo *odps.Table) (*v1beta2.Asset, error) {
Expand Down Expand Up @@ -177,10 +183,11 @@ func (e *Extractor) buildAsset(tableInfo *odps.Table) (*v1beta2.Asset, error) {
for i, col := range schema.Columns {
columnData := &v1beta2.Column{
Name: col.Name,
DataType: col.Type.Name(),
DataType: col.Type.ID().String(),
Description: col.Comment,
IsNullable: col.IsNullable,
Attributes: utils.TryParseMapToProto(buildColumnAttributesData(&schema.Columns[i])),
Columns: buildColumns(col.Type),
}
columns = append(columns, columnData)
}
Expand All @@ -201,6 +208,28 @@ func (e *Extractor) buildAsset(tableInfo *odps.Table) (*v1beta2.Asset, error) {
return asset, nil
}

func buildColumns(dataType datatype.DataType) []*v1beta2.Column {
if dataType.ID() != datatype.STRUCT {
return nil
}

structType, ok := dataType.(datatype.StructType)
if !ok {
return nil
}

var columns []*v1beta2.Column
for _, field := range structType.Fields {
column := &v1beta2.Column{
Name: field.Name,
DataType: field.Type.ID().String(),
Columns: buildColumns(field.Type),
}
columns = append(columns, column)
}
return columns
}

func buildTableAttributesData(tableInfo *odps.Table) map[string]interface{} {
attributesData := map[string]interface{}{}

Expand All @@ -217,7 +246,11 @@ func buildTableAttributesData(tableInfo *odps.Table) map[string]interface{} {
}

if tableInfo.ViewText() != "" {
attributesData["table_sql"] = tableInfo.ViewText
attributesData["table_sql"] = tableInfo.ViewText()
}

if tableInfo.ResourceUrl() != "" {
attributesData["resource_url"] = tableInfo.ResourceUrl()
}

return attributesData
Expand Down

0 comments on commit 34cfcde

Please sign in to comment.