diff --git a/materialize-snowflake/snowflake.go b/materialize-snowflake/snowflake.go index 6fc62bdaa..d3d1a74b0 100644 --- a/materialize-snowflake/snowflake.go +++ b/materialize-snowflake/snowflake.go @@ -723,104 +723,108 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error // until they have all been successful, or an error has been thrown // If we see no results from the REST API for `maxTries` iterations, then we - // fallback to asking the `COPY_HISTORY` table. We allow up to 5 minutes for results to show up in the REST API. - var maxTries = 40 - var retryDelay = 3 * time.Second - for tries := 0; tries < maxTries; tries++ { - for pipeName, pipe := range pipes { - // if the pipe has no files to begin with, just skip it - // we might have processed all files of this pipe previously - if len(pipe.files) == 0 { - continue - } - - // We first try to check the status of pipes using the REST API's insertReport - // The REST API does not wake up the warehouse, hence our preference - // however this API is not the most ergonomic and sometimes does not yield - // results as expected - report, err := d.pipeClient.InsertReport(pipeName) - if err != nil { - return nil, fmt.Errorf("snowpipe: insertReports: %w", err) - } - - // If the files have already been loaded, when we submit a request to - // load those files again, our request will not show up in reports. - // Moreover, insertReport only retains events for 10 minutes. - // One way to find out whether the files were successfully loaded is to check - // the COPY_HISTORY to make sure they are there. The COPY_HISTORY is much more - // reliable. If they are not there, then something is wrong. - if len(report.Files) == 0 { - // We try `maxTries` times since it may take some time for the REST API - // to reflect the new pipe requests - if tries < maxTries-1 { - time.Sleep(retryDelay) + // fallback to asking the `COPY_HISTORY` table. We allow up to 5 minutes for results to show up in the REST API + // by calculating how many tries per pipe gives us 5 minutes + if len(pipes) > 0 { + // 5 minutes divided by three seconds for each pipe + var maxTries = (5 * 60) / (3 * len(pipes)) + var retryDelay = 3 * time.Second + for tries := 0; tries < maxTries; tries++ { + for pipeName, pipe := range pipes { + // if the pipe has no files to begin with, just skip it + // we might have processed all files of this pipe previously + if len(pipe.files) == 0 { continue } - log.WithFields(log.Fields{ - "tries": tries, - "pipe": pipeName, - }).Info("snowpipe: no files in report, fetching copy history from warehouse") - - var fileNames = make([]string, len(pipe.files)) - for i, f := range pipe.files { - fileNames[i] = f.Path - } - - rows, err := d.copyHistory(ctx, pipe.tableName, fileNames) + // We first try to check the status of pipes using the REST API's insertReport + // The REST API does not wake up the warehouse, hence our preference + // however this API is not the most ergonomic and sometimes does not yield + // results as expected + report, err := d.pipeClient.InsertReport(pipeName) if err != nil { - return nil, err + return nil, fmt.Errorf("snowpipe: insertReports: %w", err) } - // If there are items still in progress, we continue retrying until those items - // resolve to another status - var hasItemsInProgress = false - - for _, row := range rows { - if row.status == "Loaded" { - pipe.fileLoaded(row.fileName) - } else if row.status == "Load in progress" { - hasItemsInProgress = true - } else { - return nil, fmt.Errorf("unexpected status %q for files in pipe %q: %s", row.status, pipeName, row.firstErrorMessage) + // If the files have already been loaded, when we submit a request to + // load those files again, our request will not show up in reports. + // Moreover, insertReport only retains events for 10 minutes. + // One way to find out whether the files were successfully loaded is to check + // the COPY_HISTORY to make sure they are there. The COPY_HISTORY is much more + // reliable. If they are not there, then something is wrong. + if len(report.Files) == 0 { + // We try `maxTries` times since it may take some time for the REST API + // to reflect the new pipe requests + if tries < maxTries-1 { + time.Sleep(retryDelay) + continue } - } - // If items are still in progress, we continue trying to fetch their results - if hasItemsInProgress { - tries-- - time.Sleep(retryDelay) - continue - } + log.WithFields(log.Fields{ + "tries": tries, + "pipe": pipeName, + }).Info("snowpipe: no files in report, fetching copy history from warehouse") - if len(pipe.files) > 0 { - return nil, fmt.Errorf("snowpipe: could not find reports of successful processing for all files of pipe %v", pipe) - } + var fileNames = make([]string, len(pipe.files)) + for i, f := range pipe.files { + fileNames[i] = f.Path + } - // All files have been processed for this pipe, we can skip to the next pipe - d.deleteFiles(ctx, []string{pipe.dir}) - continue - } + rows, err := d.copyHistory(ctx, pipe.tableName, fileNames) + if err != nil { + return nil, err + } - // So long as we are able to get some results from the REST API, we do not - // want to ask COPY_HISTORY. So we reset the counter if we see some results from the REST API - tries = 0 + // If there are items still in progress, we continue retrying until those items + // resolve to another status + var hasItemsInProgress = false + + for _, row := range rows { + if row.status == "Loaded" { + pipe.fileLoaded(row.fileName) + } else if row.status == "Load in progress" { + hasItemsInProgress = true + } else { + return nil, fmt.Errorf("unexpected status %q for files in pipe %q: %s", row.status, pipeName, row.firstErrorMessage) + } + } - for _, reportFile := range report.Files { - if reportFile.Status == "LOADED" { - pipe.fileLoaded(reportFile.Path) - } else if reportFile.Status == "LOAD_FAILED" || reportFile.Status == "PARTIALLY_LOADED" { - return nil, fmt.Errorf("failed to load files in pipe %q: %s, %s", pipeName, reportFile.FirstError, reportFile.SystemError) - } else if reportFile.Status == "LOAD_IN_PROGRESS" { + // If items are still in progress, we continue trying to fetch their results + if hasItemsInProgress { + tries-- + time.Sleep(retryDelay) + continue + } + + if len(pipe.files) > 0 { + return nil, fmt.Errorf("snowpipe: could not find reports of successful processing for all files of pipe %v", pipe) + } + + // All files have been processed for this pipe, we can skip to the next pipe + d.deleteFiles(ctx, []string{pipe.dir}) continue } - } - if len(pipe.files) == 0 { - d.deleteFiles(ctx, []string{pipe.dir}) - continue - } else { - time.Sleep(retryDelay) + // So long as we are able to get some results from the REST API, we do not + // want to ask COPY_HISTORY. So we reset the counter if we see some results from the REST API + tries = 0 + + for _, reportFile := range report.Files { + if reportFile.Status == "LOADED" { + pipe.fileLoaded(reportFile.Path) + } else if reportFile.Status == "LOAD_FAILED" || reportFile.Status == "PARTIALLY_LOADED" { + return nil, fmt.Errorf("failed to load files in pipe %q: %s, %s", pipeName, reportFile.FirstError, reportFile.SystemError) + } else if reportFile.Status == "LOAD_IN_PROGRESS" { + continue + } + } + + if len(pipe.files) == 0 { + d.deleteFiles(ctx, []string{pipe.dir}) + continue + } else { + time.Sleep(retryDelay) + } } } }