diff --git a/plugins/extractors/maxcompute/maxcompute.go b/plugins/extractors/maxcompute/maxcompute.go index b291fd47..0b9221a4 100644 --- a/plugins/extractors/maxcompute/maxcompute.go +++ b/plugins/extractors/maxcompute/maxcompute.go @@ -112,12 +112,9 @@ func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) error { var counter int for { e.logger.Info("fetching tables", "marker", marker) - resp, err := apiClient.ListTables(&e.config.ProjectName, &maxcomputeclient.ListTablesRequest{ - MaxItem: &e.config.MaxPageSize, - Marker: &marker, - }) + resp, err := e.fetchTables(apiClient, marker) if err != nil { - panic(err) + return err } counter += len(resp.Body.Data.Tables) @@ -125,21 +122,7 @@ func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) error { wg.Add(len(resp.Body.Data.Tables)) for _, table := range resp.Body.Data.Tables { - go func(table *maxcomputeclient.ListTablesResponseBodyDataTables) { - defer wg.Done() - tableInfo, err := apiClient.GetTableInfo(&e.config.ProjectName, table.Name, &maxcomputeclient.GetTableInfoRequest{}) - if err != nil { - panic(err) - } - - asset, err := e.buildAsset(tableInfo) - if err != nil { - e.logger.Error("failed to build asset", "table", *table.Name, "error", err) - return - } - - emit(models.NewRecord(asset)) - }(table) + go e.processTable(apiClient, table, emit, &wg) } wg.Wait() if len(resp.Body.Data.Tables) == 0 || len(resp.Body.Data.Tables) < int(e.config.MaxPageSize) || *resp.Body.Data.Marker == "" { @@ -151,6 +134,32 @@ func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) error { return nil } +func (e *Extractor) fetchTables(apiClient *maxcomputeclient.Client, marker string) (*maxcomputeclient.ListTablesResponse, error) { + return apiClient.ListTables(&e.config.ProjectName, &maxcomputeclient.ListTablesRequest{ + MaxItem: &e.config.MaxPageSize, + Marker: &marker, + }) +} + +func (e *Extractor) processTable(apiClient *maxcomputeclient.Client, table *maxcomputeclient.ListTablesResponseBodyDataTables, + emit plugins.Emit, wg *sync.WaitGroup) { + defer wg.Done() + + tableInfo, err := apiClient.GetTableInfo(&e.config.ProjectName, table.Name, &maxcomputeclient.GetTableInfoRequest{}) + if err != nil { + e.logger.Error("failed to get table info", "table", *table.Name, "error", err) + return + } + + asset, err := e.buildAsset(tableInfo) + if err != nil { + e.logger.Error("failed to build asset", "table", *table.Name, "error", err) + return + } + + emit(models.NewRecord(asset)) +} + func (e *Extractor) buildAsset(tableInfo *maxcomputeclient.GetTableInfoResponse) (*v1beta2.Asset, error) { defaultSchema := "default" if tableInfo.Body.Data.Schema == nil {