-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement operations status polling in backend pod #749
base: main
Are you sure you want to change the base?
Conversation
4426287
to
e910484
Compare
e910484
to
cd016ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At a glance, this appears to perform what we need. This feels like code that should get some tests because it is foundational - can we add them to solidify our requirements as code? e.g. happy paths / error paths / "no more than 1 call is made to Items
since it is not concurrency safe", etc
func (iter operationCacheIterator) Items(ctx context.Context) iter.Seq[[]byte] { | ||
return func(yield func([]byte) bool) { | ||
for _, doc := range iter.operation { | ||
// Marshalling the document struct only to immediately unmarshal | ||
// it back to a document struct is a little silly but this is to | ||
// conform to the DBClientIterator interface. | ||
item, err := json.Marshal(doc) | ||
if err != nil { | ||
iter.err = err | ||
return | ||
} | ||
|
||
if !yield(item) { | ||
return | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (iter operationCacheIterator) GetError() error { | ||
return iter.err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't look concurrency safe - not sure if that's a requirement. Should we at least log err
as we set it? How would we know what call to Items failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not meant to be concurrency safe, it's meant to be used as part of a range-over-function pattern, which is new in Go 1.23. (Think generator functions in Python, if you're familiar with that.)
The pattern as described in the blog post doesn't address the possibility of iteration failing (such as when paging over results from a remote source) so I came up with this iterator interface as a way to stash an iterator error.
The idiom looks like:
for item := range iterator.Items(ctx) {
...
}
// Find out if iteration completed or aborted early.
err := iterator.GetError()
An iterator error would immediately break the "for" loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also maybe worth mentioning, since I didn't realize you were highlighting the in-memory cache...
I don't believe this particular function is currently used. I wrote it for the in-memory cache in order to fulfill the DBClient
interface, but the in-memory cache is nowadays only used to mock database operations in unit tests.
Are you referring to the new database functions or to the backend code paths... or both? The backend might be difficult to write unit tests for at the moment since parts of it rely on database locking (see #680), which I don't have a way to mock at the moment. (Mocking should be doable, it's just not done.) |
b60460d
to
a8d253e
Compare
Converts the result of azcosmos.ContainerItem.NewQueryItemsPager to a failable push iterator (push iterators are new in Go 1.23).
This also defines a DBClientIterator interface so the in-memory cache can mimic QueryItemsIterator.
Will add something similar for node pools once the new "aro_hcp" API is available from Cluster Service.
Periodically scans the "Operations" Cosmos DB container.
a8d253e
to
8e280fe
Compare
What this PR does
This adds functionality to the backend pod which updates Cosmos DB items with the status of active (non-terminal) asynchronous operations for clusters. Status polling for node pools is not currently possible; this part will be completed when the new /api/aro_hcp/v1 OCM endpoint becomes available.
This runs on two independent polling intervals, each of which can be overridden through environment variables.
The first polling, which by default runs every 30s, scans the Cosmos DB "Operations" container for items with a non-terminal status and stores an internal list.
The second polling, which by default runs every 10s, iterates over that list and queries Cluster Service for the status of each resource. It then translates the Cluster Service status to a ProvisioningState value, takes an error message for failed operations, and -- with the subscription locked -- updates the appropriate "Operations" item and "Resources" item in Cosmos DB.
Additionally, if a deletion operation has completed successfully, it deletes the corresponding "Resources" item from Cosmos DB.
Jira: ARO-8598 - Prototype async op status notification mechanism for RP
Link to demo recording:
Special notes for your reviewer
For the record, I'm not a fan of this design and consider it a "Mark I" iteration. Polling introduces a lot of latency in the statuses reported by the
Microsoft.RedHatOpenShift/hcpOpenShiftClusters
API. But we're stuck with polling for now for two reasons: