Skip to content

Commit

Permalink
feat: implemented maxcompute sdk changes
Browse files Browse the repository at this point in the history
  • Loading branch information
solsticemj25 committed Nov 13, 2024
1 parent aaf4ff8 commit e5e5e23
Showing 1 changed file with 93 additions and 132 deletions.
225 changes: 93 additions & 132 deletions plugins/extractors/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package maxcompute
import (
"context"
_ "embed" // used to print the embedded assets
"fmt"
"sync"
"time"

client2 "github.com/alibabacloud-go/darabonba-openapi/v2/client"
maxcomputeclient "github.com/alibabacloud-go/maxcompute-20220104/client"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/goto/meteor/models"
Expand All @@ -33,14 +31,14 @@ type Config struct {
Schemas []string `mapstructure:"schemas"`
Tables []string `mapstructure:"tables"`
} `mapstructure:"exclude,omitempty"`
MaxPageSize int32 `mapstructure:"max_page_size,omitempty"`
Concurrency int `mapstructure:"concurrency,omitempty"`
MixValues bool `mapstructure:"mix_values,omitempty"`
IncludeColumnProfile bool `mapstructure:"include_column_profile,omitempty"`
BuildViewLineage bool `mapstructure:"build_view_lineage,omitempty"`
IsCollectTableUsage bool `mapstructure:"collect_table_usage,omitempty"`
UsagePeriodInDay int `mapstructure:"usage_period_in_day,omitempty"`
UsageProjectNames []string `mapstructure:"usage_project_names,omitempty"`
Concurrency int `mapstructure:"concurrency,omitempty"`
BuildViewLineage bool `mapstructure:"build_view_lineage,omitempty"`
// MaxPageSize int32 `mapstructure:"max_page_size,omitempty"`
// MixValues bool `mapstructure:"mix_values,omitempty"`
// IncludeColumnProfile bool `mapstructure:"include_column_profile,omitempty"`
// IsCollectTableUsage bool `mapstructure:"collect_table_usage,omitempty"`
// UsagePeriodInDay int `mapstructure:"usage_period_in_day,omitempty"`
// UsageProjectNames []string `mapstructure:"usage_project_names,omitempty"`
}

type Extractor struct {
Expand All @@ -66,16 +64,8 @@ exclude:
- schema_b
tables:
- schema_c.table_a
max_page_size: 100
concurrency: 10
mix_values: false
include_column_profile: true
build_view_lineage: true
collect_table_usage: false
usage_period_in_day: 7
usage_project_names:
- maxcompute-project-name
- other-maxcompute-project-name
`

var info = plugins.Info{
Expand All @@ -95,146 +85,120 @@ func New(logger log.Logger) *Extractor {
return e
}

func (c *Config) SetDefaults() {
if c.MaxPreviewRows == 0 {
c.MaxPreviewRows = 30
}
if c.Concurrency == 0 {
c.Concurrency = 10
}
if c.UsagePeriodInDay == 0 {
c.UsagePeriodInDay = 7
}
}

func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
return e.BaseExtractor.Init(ctx, config)
}

func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) error {
apiClient, err := maxcomputeclient.NewClient(&client2.Config{
AccessKeyId: &e.config.AccessKey.ID,
AccessKeySecret: &e.config.AccessKey.Secret,
Endpoint: &e.config.EndpointProject,
})
if err != nil {
panic(err)
}
aliAccount := account.NewAliyunAccount(e.config.AccessKey.ID, e.config.AccessKey.Secret)
odpsIns := odps.NewOdps(aliAccount, e.config.EndpointProject)
project := odpsIns.Project(e.config.ProjectName)

if e.config.SchemaName == "" {
// Fetch all schemas
aliAccount := account.NewAliyunAccount(e.config.AccessKey.ID, e.config.AccessKey.Secret)
odpsIns := odps.NewOdps(aliAccount, e.config.EndpointProject)
project := odpsIns.Project(e.config.ProjectName)
schemas := project.Schemas()
err := schemas.List(func(schema *odps.Schema, err error) {
if err != nil {
e.logger.Error("failed to list schemas", "error", err)
return
}
e.fetchTablesFromSchema(apiClient, schema.Name(), emit)
})
if err != nil {
return err
}
} else {
// Fetch tables from the specified schema
e.fetchTablesFromSchema(apiClient, e.config.SchemaName, emit)
}

return nil
}
schemas := project.Schemas()

func (e *Extractor) fetchTablesFromSchema(apiClient *maxcomputeclient.Client, schemaName string, emit plugins.Emit) {
var wg sync.WaitGroup

var marker string
var counter int
for {
e.logger.Info("fetching tables", "schema", schemaName, "marker", marker)
resp, err := e.fetchTables(apiClient, schemaName, marker)
err := schemas.List(func(schema *odps.Schema, err error) {
if err != nil {
e.logger.Error("failed to fetch tables", "schema", schemaName, "error", err)
e.logger.Error("failed to list schemas", "error", err)
return
}

counter += len(resp.Body.Data.Tables)
e.logger.Info("fetched tables", "schema", schemaName, "count", counter)
wg.Add(len(resp.Body.Data.Tables))

for _, table := range resp.Body.Data.Tables {
go e.processTable(apiClient, table, emit, &wg)
if e.config.SchemaName != "" && schema.Name() != e.config.SchemaName {
return
}
wg.Wait()
if len(resp.Body.Data.Tables) == 0 || len(resp.Body.Data.Tables) < int(e.config.MaxPageSize) || *resp.Body.Data.Marker == "" {
break
if contains(e.config.Exclude.Schemas, schema.Name()) {
return
}
marker = *resp.Body.Data.Marker

newIns := odps.NewOdps(aliAccount, e.config.EndpointProject)
newIns.SetCurrentSchemaName(schema.Name())
newIns.SetDefaultProjectName(e.config.ProjectName)
newProj := newIns.Project(e.config.ProjectName)

e.fetchTablesFromSchema(newProj, emit)
})
if err != nil {
return err
}
return nil
}

func (e *Extractor) fetchTables(apiClient *maxcomputeclient.Client, schemaName, marker string) (*maxcomputeclient.ListTablesResponse, error) {
return apiClient.ListTables(&e.config.ProjectName, &maxcomputeclient.ListTablesRequest{
SchemaName: &schemaName,
MaxItem: &e.config.MaxPageSize,
Marker: &marker,
})
func (e *Extractor) fetchTablesFromSchema(project *odps.Project, emit plugins.Emit) {
var wg sync.WaitGroup
project.Tables().List(
func(t *odps.Table, err error) {
wg.Add(1)
if err != nil {
e.logger.Error("table list", err)
}
if contains(e.config.Exclude.Tables, fmt.Sprintf("%s.%s", t.SchemaName(), t.Name())) {
wg.Done()
return
}
go e.processTable(t, emit, &wg)
},
)

wg.Wait()
}

func (e *Extractor) processTable(apiClient *maxcomputeclient.Client, table *maxcomputeclient.ListTablesResponseBodyDataTables,
emit plugins.Emit, wg *sync.WaitGroup,
) {
func (e *Extractor) processTable(table *odps.Table, emit plugins.Emit, wg *sync.WaitGroup) {
defer wg.Done()

tableInfo, err := apiClient.GetTableInfo(&e.config.ProjectName, table.Name, &maxcomputeclient.GetTableInfoRequest{})
err := table.Load()
if err != nil {
e.logger.Error("failed to get table info", "table", *table.Name, "error", err)
e.logger.Error("failed to get table info", "table", table.Name(), "error", err)
return
}

asset, err := e.buildAsset(tableInfo)
fmt.Println(table.Name())

asset, err := e.buildAsset(table)
if err != nil {
e.logger.Error("failed to build asset", "table", *table.Name, "error", err)
e.logger.Error("failed to build asset", "table", table.Name(), "error", err)
return
}

emit(models.NewRecord(asset))
}

func (e *Extractor) buildAsset(tableInfo *maxcomputeclient.GetTableInfoResponse) (*v1beta2.Asset, error) {
func (e *Extractor) buildAsset(tableInfo *odps.Table) (*v1beta2.Asset, error) {
defaultSchema := "default"
if tableInfo.Body.Data.Schema == nil {
tableInfo.Body.Data.Schema = &defaultSchema
schemaName := tableInfo.SchemaName()
if schemaName == "" {
schemaName = defaultSchema
}

tableURN := plugins.MaxComputeURN(*tableInfo.Body.Data.ProjectName, *tableInfo.Body.Data.Schema, *tableInfo.Body.Data.Name)
tableURN := plugins.MaxComputeURN(tableInfo.ProjectName(), schemaName, tableInfo.Name())

schema := tableInfo.Schema()
asset := &v1beta2.Asset{
Urn: tableURN,
Name: *tableInfo.Body.Data.Name,
Type: *tableInfo.Body.Data.Type,
Description: *tableInfo.Body.Data.Comment,
CreateTime: convertInt64ToTimestamp(tableInfo.Body.Data.CreationTime),
UpdateTime: convertInt64ToTimestamp(tableInfo.Body.Data.LastModifiedTime),
Name: schema.TableName,
Type: tableInfo.Type().String(),
Description: schema.Comment,
CreateTime: timestamppb.New(tableInfo.CreatedTime()),
UpdateTime: timestamppb.New(tableInfo.LastModifiedTime()),
Service: "maxcompute",
// Labels: tableInfo.TableLabel(),
}

attributesData := buildAttributesData(tableInfo)

var columns []*v1beta2.Column
for _, col := range tableInfo.Body.Data.NativeColumns {
for _, col := range schema.Columns {
columnData := &v1beta2.Column{
Name: *col.Name,
DataType: *col.Type,
Description: *col.Comment,
Name: col.Name,
DataType: col.Type.Name(),
Description: col.Comment,
IsNullable: col.IsNullable,
}
columns = append(columns, columnData)
}

tableData := &v1beta2.Table{
Attributes: utils.TryParseMapToProto(attributesData),
Columns: columns,
UpdateTime: convertInt64ToTimestamp(tableInfo.Body.Data.LastDDLTime),
CreateTime: timestamppb.New(tableInfo.CreatedTime()),
UpdateTime: timestamppb.New(tableInfo.LastModifiedTime()),
}

table, err := anypb.New(tableData)
Expand All @@ -246,46 +210,43 @@ func (e *Extractor) buildAsset(tableInfo *maxcomputeclient.GetTableInfoResponse)
return asset, nil
}

func buildAttributesData(tableInfo *maxcomputeclient.GetTableInfoResponse) map[string]interface{} {
func buildAttributesData(tableInfo *odps.Table) map[string]interface{} {
attributesData := map[string]interface{}{}

if tableInfo == nil || tableInfo.Body == nil || tableInfo.Body.Data == nil {
if tableInfo == nil {
return attributesData
}

if tableInfo.Body.Data.ProjectName != nil {
attributesData["project_name"] = *tableInfo.Body.Data.ProjectName
if tableInfo.ProjectName() != "" {
attributesData["project_name"] = tableInfo.ProjectName()
}

if tableInfo.Body.Data.Schema != nil {
attributesData["schema"] = *tableInfo.Body.Data.Schema
if tableInfo.SchemaName() != "" {
attributesData["schema"] = tableInfo.SchemaName()
}

if tableInfo.Body.Data.OdpsPropertiesRolearn != nil {
attributesData["resource_url"] = *tableInfo.Body.Data.OdpsPropertiesRolearn
if tableInfo.ViewText() != "" {
attributesData["table_sql"] = tableInfo.ViewText
}

if tableInfo.Body.Data.CreationTime != nil {
attributesData["partition_field"] = *tableInfo.Body.Data.CreationTime
}
// if tableInfo.Body.Data.OdpsPropertiesRolearn != nil {

Check failure on line 232 in plugins/extractors/maxcompute/maxcompute.go

View workflow job for this annotation

GitHub Actions / golangci

commentedOutCode: may want to remove commented-out code (gocritic)
// attributesData["resource_url"] = *tableInfo.Body.Data.OdpsPropertiesRolearn
// }

if tableInfo.Body.Data.ViewText != nil {
attributesData["table_sql"] = *tableInfo.Body.Data.ViewText
}
// if tableInfo.Body.Data.CreationTime != nil {

Check failure on line 236 in plugins/extractors/maxcompute/maxcompute.go

View workflow job for this annotation

GitHub Actions / golangci

commentedOutCode: may want to remove commented-out code (gocritic)
// attributesData["partition_field"] = *tableInfo.Body.Data.CreationTime
// }

return attributesData
}

func convertInt64ToTimestamp(unixTimeMs *int64) *timestamppb.Timestamp {
if unixTimeMs == nil {
return nil
func contains(slice []string, item string) bool {
for _, s := range slice {
if s == item {
return true
}
}

seconds := *unixTimeMs / 1000
nanoseconds := (*unixTimeMs % 1000) * 1_000_000

t := time.Unix(seconds, nanoseconds).UTC()
return timestamppb.New(t)
return false
}

func init() {
Expand Down

0 comments on commit e5e5e23

Please sign in to comment.