Skip to content

Commit

Permalink
Log acs endpoint for connection issues (#4401)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxx0624 authored Oct 23, 2024
1 parent ec1598e commit 1e6b153
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 38 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 51 additions & 12 deletions ecs-agent/acs/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ func (s *session) Start(ctx context.Context) error {
for {
select {
case <-connectToACS:
logger.Debug("Received connect to ACS message. Attempting connect to ACS")
logger.Debug("Received connect to ACS message. Attempting connect to ACS",
logger.Fields{
"containerInstanceARN": s.containerInstanceARN,
})

// Start a session with ACS.
acsError := s.startSessionOnce(ctx)
Expand All @@ -188,29 +191,40 @@ func (s *session) Start(ctx context.Context) error {

if ok {
logger.Info("Waiting before reconnecting to ACS", logger.Fields{
"reconnectDelay": reconnectDelay.String(),
"reconnectDelay": reconnectDelay.String(),
"containerInstanceARN": s.containerInstanceARN,
})
waitComplete := waitForDuration(ctx, reconnectDelay)
if waitComplete {
// If the context was not canceled and we've waited for the
// wait duration without any errors, send the message to the channel
// to reconnect to ACS.
logger.Info("Done waiting; reconnecting to ACS")
logger.Info("Done waiting; reconnecting to ACS",
logger.Fields{
"containerInstanceARN": s.containerInstanceARN,
})
sendEmptyMessageOnChannel(connectToACS)
} else {
// Wait was interrupted. We expect the session to close as canceling
// the session context is the only way to end up here. Print a message
// to indicate the same.
logger.Info("Interrupted waiting for reconnect delay to elapse; Expect session to close")
logger.Info("Interrupted waiting for reconnect delay to elapse; Expect session to close",
logger.Fields{
"containerInstanceARN": s.containerInstanceARN,
})
}
} else {
// No need to delay reconnect - reconnect immediately.
logger.Info("Reconnecting to ACS immediately without waiting")
logger.Info("Reconnecting to ACS immediately without waiting",
logger.Fields{
"containerInstanceARN": s.containerInstanceARN,
})
sendEmptyMessageOnChannel(connectToACS)
}
case <-ctx.Done():
logger.Info("ACS session ended (context closed)", logger.Fields{
field.Reason: ctx.Err(),
field.Reason: ctx.Err(),
"containerInstanceARN": s.containerInstanceARN,
})
return ctx.Err()
}
Expand All @@ -223,7 +237,8 @@ func (s *session) startSessionOnce(ctx context.Context) error {
acsEndpoint, err := s.ecsClient.DiscoverPollEndpoint(s.containerInstanceARN)
if err != nil {
logger.Error("ACS: Unable to discover poll endpoint", logger.Fields{
field.Error: err,
"containerInstanceARN": s.containerInstanceARN,
field.Error: err,
})
return err
}
Expand All @@ -242,7 +257,8 @@ func (s *session) startSessionOnce(ctx context.Context) error {
s.disconnectJitter)
if err != nil {
logger.Error("Failed to connect to ACS", logger.Fields{
field.Error: err,
"containerInstanceARN": s.containerInstanceARN,
field.Error: err,
})
return err
}
Expand All @@ -253,7 +269,11 @@ func (s *session) startSessionOnce(ctx context.Context) error {

// Connection to ACS was successful. Moving forward, rely on ACS to send credentials to Agent at its own cadence
// and make sure Agent does not force ACS to send credentials for any subsequent reconnects to ACS.
logger.Info("Connected to ACS endpoint")
logger.Info("Connected to ACS endpoint",
logger.Fields{
"containerInstanceARN": s.containerInstanceARN,
"lastConnectedTime": s.lastConnectedTime,
})
s.sendCredentials = false

return s.startACSSession(ctx, client)
Expand All @@ -264,7 +284,13 @@ func (s *session) startSessionOnce(ctx context.Context) error {
// the context is canceled.
func (s *session) startACSSession(ctx context.Context, client wsclient.ClientServer) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
logger.Info("Current ACS session is done and a new one might be created.",
logger.Fields{
"containerInstanceARN": s.containerInstanceARN,
})
cancel()
}()

responseSender := func(response interface{}) error {
return client.MakeRequest(response)
Expand Down Expand Up @@ -314,18 +340,31 @@ func (s *session) startACSSession(ctx context.Context, client wsclient.ClientSer

func (s *session) reconnectDelay(acsError error) (time.Duration, bool) {
if isInactiveInstanceError(acsError) {
logger.Info("Container instance is deregistered")
logger.Info("Container instance is deregistered",
logger.Fields{
"containerInstanceARN": s.containerInstanceARN,
})
s.inactiveInstanceCB()
return s.inactiveInstanceReconnectDelay, true
}

if shouldReconnectWithoutBackoff(acsError) {
// ACS has closed the connection for valid reasons. Example: periodic disconnect.
// No need to wait/backoff to reconnect.
logger.Info("ACS WebSocket connection closed for a valid reason")
logger.Info("ACS WebSocket connection closed for a valid reason",
logger.Fields{
"containerInstanceARN": s.containerInstanceARN,
})
s.backoff.Reset()
return 0, false

}

logger.Warn("ACS WebSocket connection closed",
logger.Fields{
"containerInstanceARN": s.containerInstanceARN,
field.Error: acsError,
})
// Disconnected unexpectedly from ACS, compute backoff duration to reconnect.
return s.backoff.Duration(), true
}
Expand Down
Loading

0 comments on commit 1e6b153

Please sign in to comment.