Skip to content

Commit

Permalink
materialize-snowflake: limit number of retries based on pipe numbers
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Oct 1, 2024
1 parent ccca69c commit a7e2d0c
Showing 1 changed file with 87 additions and 83 deletions.
170 changes: 87 additions & 83 deletions materialize-snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,104 +723,108 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error
// until they have all been successful, or an error has been thrown

// If we see no results from the REST API for `maxTries` iterations, then we
// fallback to asking the `COPY_HISTORY` table. We allow up to 5 minutes for results to show up in the REST API.
var maxTries = 40
var retryDelay = 3 * time.Second
for tries := 0; tries < maxTries; tries++ {
for pipeName, pipe := range pipes {
// if the pipe has no files to begin with, just skip it
// we might have processed all files of this pipe previously
if len(pipe.files) == 0 {
continue
}

// We first try to check the status of pipes using the REST API's insertReport
// The REST API does not wake up the warehouse, hence our preference
// however this API is not the most ergonomic and sometimes does not yield
// results as expected
report, err := d.pipeClient.InsertReport(pipeName)
if err != nil {
return nil, fmt.Errorf("snowpipe: insertReports: %w", err)
}

// If the files have already been loaded, when we submit a request to
// load those files again, our request will not show up in reports.
// Moreover, insertReport only retains events for 10 minutes.
// One way to find out whether the files were successfully loaded is to check
// the COPY_HISTORY to make sure they are there. The COPY_HISTORY is much more
// reliable. If they are not there, then something is wrong.
if len(report.Files) == 0 {
// We try `maxTries` times since it may take some time for the REST API
// to reflect the new pipe requests
if tries < maxTries-1 {
time.Sleep(retryDelay)
// fallback to asking the `COPY_HISTORY` table. We allow up to 5 minutes for results to show up in the REST API
// by calculating how many tries per pipe gives us 5 minutes
if len(pipes) > 0 {
// 5 minutes divided by three seconds for each pipe
var maxTries = (5 * 60) / (3 * len(pipes))
var retryDelay = 3 * time.Second
for tries := 0; tries < maxTries; tries++ {
for pipeName, pipe := range pipes {
// if the pipe has no files to begin with, just skip it
// we might have processed all files of this pipe previously
if len(pipe.files) == 0 {
continue
}

log.WithFields(log.Fields{
"tries": tries,
"pipe": pipeName,
}).Info("snowpipe: no files in report, fetching copy history from warehouse")

var fileNames = make([]string, len(pipe.files))
for i, f := range pipe.files {
fileNames[i] = f.Path
}

rows, err := d.copyHistory(ctx, pipe.tableName, fileNames)
// We first try to check the status of pipes using the REST API's insertReport
// The REST API does not wake up the warehouse, hence our preference
// however this API is not the most ergonomic and sometimes does not yield
// results as expected
report, err := d.pipeClient.InsertReport(pipeName)
if err != nil {
return nil, err
return nil, fmt.Errorf("snowpipe: insertReports: %w", err)
}

// If there are items still in progress, we continue retrying until those items
// resolve to another status
var hasItemsInProgress = false

for _, row := range rows {
if row.status == "Loaded" {
pipe.fileLoaded(row.fileName)
} else if row.status == "Load in progress" {
hasItemsInProgress = true
} else {
return nil, fmt.Errorf("unexpected status %q for files in pipe %q: %s", row.status, pipeName, row.firstErrorMessage)
// If the files have already been loaded, when we submit a request to
// load those files again, our request will not show up in reports.
// Moreover, insertReport only retains events for 10 minutes.
// One way to find out whether the files were successfully loaded is to check
// the COPY_HISTORY to make sure they are there. The COPY_HISTORY is much more
// reliable. If they are not there, then something is wrong.
if len(report.Files) == 0 {
// We try `maxTries` times since it may take some time for the REST API
// to reflect the new pipe requests
if tries < maxTries-1 {
time.Sleep(retryDelay)
continue
}
}

// If items are still in progress, we continue trying to fetch their results
if hasItemsInProgress {
tries--
time.Sleep(retryDelay)
continue
}
log.WithFields(log.Fields{
"tries": tries,
"pipe": pipeName,
}).Info("snowpipe: no files in report, fetching copy history from warehouse")

if len(pipe.files) > 0 {
return nil, fmt.Errorf("snowpipe: could not find reports of successful processing for all files of pipe %v", pipe)
}
var fileNames = make([]string, len(pipe.files))
for i, f := range pipe.files {
fileNames[i] = f.Path
}

// All files have been processed for this pipe, we can skip to the next pipe
d.deleteFiles(ctx, []string{pipe.dir})
continue
}
rows, err := d.copyHistory(ctx, pipe.tableName, fileNames)
if err != nil {
return nil, err
}

// So long as we are able to get some results from the REST API, we do not
// want to ask COPY_HISTORY. So we reset the counter if we see some results from the REST API
tries = 0
// If there are items still in progress, we continue retrying until those items
// resolve to another status
var hasItemsInProgress = false

for _, row := range rows {
if row.status == "Loaded" {
pipe.fileLoaded(row.fileName)
} else if row.status == "Load in progress" {
hasItemsInProgress = true
} else {
return nil, fmt.Errorf("unexpected status %q for files in pipe %q: %s", row.status, pipeName, row.firstErrorMessage)
}
}

for _, reportFile := range report.Files {
if reportFile.Status == "LOADED" {
pipe.fileLoaded(reportFile.Path)
} else if reportFile.Status == "LOAD_FAILED" || reportFile.Status == "PARTIALLY_LOADED" {
return nil, fmt.Errorf("failed to load files in pipe %q: %s, %s", pipeName, reportFile.FirstError, reportFile.SystemError)
} else if reportFile.Status == "LOAD_IN_PROGRESS" {
// If items are still in progress, we continue trying to fetch their results
if hasItemsInProgress {
tries--
time.Sleep(retryDelay)
continue
}

if len(pipe.files) > 0 {
return nil, fmt.Errorf("snowpipe: could not find reports of successful processing for all files of pipe %v", pipe)
}

// All files have been processed for this pipe, we can skip to the next pipe
d.deleteFiles(ctx, []string{pipe.dir})
continue
}
}

if len(pipe.files) == 0 {
d.deleteFiles(ctx, []string{pipe.dir})
continue
} else {
time.Sleep(retryDelay)
// So long as we are able to get some results from the REST API, we do not
// want to ask COPY_HISTORY. So we reset the counter if we see some results from the REST API
tries = 0

for _, reportFile := range report.Files {
if reportFile.Status == "LOADED" {
pipe.fileLoaded(reportFile.Path)
} else if reportFile.Status == "LOAD_FAILED" || reportFile.Status == "PARTIALLY_LOADED" {
return nil, fmt.Errorf("failed to load files in pipe %q: %s, %s", pipeName, reportFile.FirstError, reportFile.SystemError)
} else if reportFile.Status == "LOAD_IN_PROGRESS" {
continue
}
}

if len(pipe.files) == 0 {
d.deleteFiles(ctx, []string{pipe.dir})
continue
} else {
time.Sleep(retryDelay)
}
}
}
}
Expand Down

0 comments on commit a7e2d0c

Please sign in to comment.