Skip to content

Commit

Permalink
refactor: linter - fixed cognitive complexity
Browse files Browse the repository at this point in the history
  • Loading branch information
solsticemj25 committed Nov 11, 2024
1 parent 2f85f66 commit 44f63cb
Showing 1 changed file with 66 additions and 37 deletions.
103 changes: 66 additions & 37 deletions plugins/extractors/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,34 +112,17 @@ func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) error {
var counter int
for {
e.logger.Info("fetching tables", "marker", marker)
resp, err := apiClient.ListTables(&e.config.ProjectName, &maxcomputeclient.ListTablesRequest{
MaxItem: &e.config.MaxPageSize,
Marker: &marker,
})
resp, err := e.fetchTables(apiClient, marker)
if err != nil {
panic(err)
return err
}

counter += len(resp.Body.Data.Tables)
e.logger.Info("fetched tables", "count", counter)
wg.Add(len(resp.Body.Data.Tables))

for _, table := range resp.Body.Data.Tables {
go func(table *maxcomputeclient.ListTablesResponseBodyDataTables) {
defer wg.Done()
tableInfo, err := apiClient.GetTableInfo(&e.config.ProjectName, table.Name, &maxcomputeclient.GetTableInfoRequest{})
if err != nil {
panic(err)
}

asset, err := e.buildAsset(tableInfo)
if err != nil {
e.logger.Error("failed to build asset", "table", *table.Name, "error", err)
return
}

emit(models.NewRecord(asset))
}(table)
go e.processTable(apiClient, table, emit, &wg)
}
wg.Wait()
if len(resp.Body.Data.Tables) == 0 || len(resp.Body.Data.Tables) < int(e.config.MaxPageSize) || *resp.Body.Data.Marker == "" {
Expand All @@ -151,6 +134,33 @@ func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) error {
return nil
}

func (e *Extractor) fetchTables(apiClient *maxcomputeclient.Client, marker string) (*maxcomputeclient.ListTablesResponse, error) {
return apiClient.ListTables(&e.config.ProjectName, &maxcomputeclient.ListTablesRequest{
MaxItem: &e.config.MaxPageSize,
Marker: &marker,
})
}

func (e *Extractor) processTable(apiClient *maxcomputeclient.Client, table *maxcomputeclient.ListTablesResponseBodyDataTables,
emit plugins.Emit, wg *sync.WaitGroup,
) {
defer wg.Done()

tableInfo, err := apiClient.GetTableInfo(&e.config.ProjectName, table.Name, &maxcomputeclient.GetTableInfoRequest{})
if err != nil {
e.logger.Error("failed to get table info", "table", *table.Name, "error", err)
return
}

asset, err := e.buildAsset(tableInfo)
if err != nil {
e.logger.Error("failed to build asset", "table", *table.Name, "error", err)
return
}

emit(models.NewRecord(asset))
}

func (e *Extractor) buildAsset(tableInfo *maxcomputeclient.GetTableInfoResponse) (*v1beta2.Asset, error) {
defaultSchema := "default"
if tableInfo.Body.Data.Schema == nil {
Expand All @@ -159,20 +169,6 @@ func (e *Extractor) buildAsset(tableInfo *maxcomputeclient.GetTableInfoResponse)

tableURN := plugins.MaxComputeURN(*tableInfo.Body.Data.ProjectName, *tableInfo.Body.Data.Schema, *tableInfo.Body.Data.Name)

// TODO(mayur): Add all the P0 metadata
// Table Name ✅
// Project Name (Data Layer) ✅
// Table Type ✅
// Created_At ✅
// Last_Updated_At (for both DML & DDL) ✅ ✅
// Partition Field
// Table Schema ✅
// Table Description ✅
// Resource URL
// Table Refresh Frequency → previously get from Optimus, but we need to provide it as well with tengo script
// Table SQL
// User Query References

asset := &v1beta2.Asset{
Urn: tableURN,
Name: *tableInfo.Body.Data.Name,
Expand All @@ -183,12 +179,19 @@ func (e *Extractor) buildAsset(tableInfo *maxcomputeclient.GetTableInfoResponse)
Service: "maxcompute",
}

attributesData := map[string]interface{}{
"project_name": *tableInfo.Body.Data.ProjectName,
"schema": *tableInfo.Body.Data.Schema,
attributesData := buildAttributesData(tableInfo)

var columns []*v1beta2.Column
for _, col := range tableInfo.Body.Data.NativeColumns {
columnData := &v1beta2.Column{
Name: *col.Name,
}
columns = append(columns, columnData)
}

tableData := &v1beta2.Table{
Attributes: utils.TryParseMapToProto(attributesData),
Columns: columns,
UpdateTime: convertInt64ToTimestamp(tableInfo.Body.Data.LastDDLTime),
}

Expand All @@ -201,6 +204,32 @@ func (e *Extractor) buildAsset(tableInfo *maxcomputeclient.GetTableInfoResponse)
return asset, nil
}

func buildAttributesData(tableInfo *maxcomputeclient.GetTableInfoResponse) map[string]interface{} {
attributesData := map[string]interface{}{}

if tableInfo == nil || tableInfo.Body == nil || tableInfo.Body.Data == nil {
return attributesData
}

if tableInfo.Body.Data.ProjectName != nil {
attributesData["project_name"] = *tableInfo.Body.Data.ProjectName
}

if tableInfo.Body.Data.Schema != nil {
attributesData["schema"] = *tableInfo.Body.Data.Schema
}

if tableInfo.Body.Data.OdpsPropertiesRolearn != nil {
attributesData["resource_url"] = *tableInfo.Body.Data.OdpsPropertiesRolearn
}

if tableInfo.Body.Data.CreationTime != nil {
attributesData["partition_field"] = *tableInfo.Body.Data.CreationTime
}

return attributesData
}

func convertInt64ToTimestamp(unixTimeMs *int64) *timestamppb.Timestamp {
if unixTimeMs == nil {
return nil
Expand Down

0 comments on commit 44f63cb

Please sign in to comment.