Skip to content

Commit

Permalink
feat: creates consumers if not present in NATS
Browse files Browse the repository at this point in the history
The jetstream-controller its an operate that should ensure that a system
is in a given state. During our tests we found out that if a consumers
or stream are deleted via code or through `nats-box` they are not
created again by the controller.

From my understanding this is happening because we are checking if the
observed generation number and generation number match in the CRD, which
on this case (a manually delete consumer) will not change and thus not
trigger a recreation.

The following PR aims to resources (consumers and streams) being created
in NATS in case they are not found by the client but there is a CRD for it.
  • Loading branch information
danielcibrao-form3 committed Jan 13, 2023
1 parent 9c1be41 commit 5a92f01
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 2 deletions.
1 change: 1 addition & 0 deletions cmd/nats-boot-config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func main() {
if os.Getenv("DEBUG") == "true" {
log.SetLevel(log.DebugLevel)
}

formatter := &log.TextFormatter{
FullTimestamp: true,
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (c *Controller) processConsumerObject(cns *apis.Consumer, jsmc jsmClient) (
return err
}
updateOK := (consumerOK && !deleteOK && newGeneration)
createOK := (!consumerOK && !deleteOK && newGeneration)
createOK := (!consumerOK && !deleteOK)

switch {
case createOK:
Expand Down
2 changes: 1 addition & 1 deletion controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (c *Controller) processStreamObject(str *apis.Stream, jsmc jsmClient) (err
return err
}
updateOK := (strOK && !deleteOK && newGeneration)
createOK := (!strOK && !deleteOK && newGeneration)
createOK := (!strOK && !deleteOK)

switch {
case createOK:
Expand Down

0 comments on commit 5a92f01

Please sign in to comment.