diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/session.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/session.go index 7cc815321ca..6a3f733cf99 100644 --- a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/session.go +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/session.go @@ -178,7 +178,10 @@ func (s *session) Start(ctx context.Context) error { for { select { case <-connectToACS: - logger.Debug("Received connect to ACS message. Attempting connect to ACS") + logger.Debug("Received connect to ACS message. Attempting connect to ACS", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) // Start a session with ACS. acsError := s.startSessionOnce(ctx) @@ -188,29 +191,40 @@ func (s *session) Start(ctx context.Context) error { if ok { logger.Info("Waiting before reconnecting to ACS", logger.Fields{ - "reconnectDelay": reconnectDelay.String(), + "reconnectDelay": reconnectDelay.String(), + "containerInstanceARN": s.containerInstanceARN, }) waitComplete := waitForDuration(ctx, reconnectDelay) if waitComplete { // If the context was not canceled and we've waited for the // wait duration without any errors, send the message to the channel // to reconnect to ACS. - logger.Info("Done waiting; reconnecting to ACS") + logger.Info("Done waiting; reconnecting to ACS", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) sendEmptyMessageOnChannel(connectToACS) } else { // Wait was interrupted. We expect the session to close as canceling // the session context is the only way to end up here. Print a message // to indicate the same. - logger.Info("Interrupted waiting for reconnect delay to elapse; Expect session to close") + logger.Info("Interrupted waiting for reconnect delay to elapse; Expect session to close", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) } } else { // No need to delay reconnect - reconnect immediately. - logger.Info("Reconnecting to ACS immediately without waiting") + logger.Info("Reconnecting to ACS immediately without waiting", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) sendEmptyMessageOnChannel(connectToACS) } case <-ctx.Done(): logger.Info("ACS session ended (context closed)", logger.Fields{ - field.Reason: ctx.Err(), + field.Reason: ctx.Err(), + "containerInstanceARN": s.containerInstanceARN, }) return ctx.Err() } @@ -223,7 +237,8 @@ func (s *session) startSessionOnce(ctx context.Context) error { acsEndpoint, err := s.ecsClient.DiscoverPollEndpoint(s.containerInstanceARN) if err != nil { logger.Error("ACS: Unable to discover poll endpoint", logger.Fields{ - field.Error: err, + "containerInstanceARN": s.containerInstanceARN, + field.Error: err, }) return err } @@ -242,7 +257,8 @@ func (s *session) startSessionOnce(ctx context.Context) error { s.disconnectJitter) if err != nil { logger.Error("Failed to connect to ACS", logger.Fields{ - field.Error: err, + "containerInstanceARN": s.containerInstanceARN, + field.Error: err, }) return err } @@ -253,7 +269,11 @@ func (s *session) startSessionOnce(ctx context.Context) error { // Connection to ACS was successful. Moving forward, rely on ACS to send credentials to Agent at its own cadence // and make sure Agent does not force ACS to send credentials for any subsequent reconnects to ACS. - logger.Info("Connected to ACS endpoint") + logger.Info("Connected to ACS endpoint", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + "lastConnectedTime": s.lastConnectedTime, + }) s.sendCredentials = false return s.startACSSession(ctx, client) @@ -264,7 +284,13 @@ func (s *session) startSessionOnce(ctx context.Context) error { // the context is canceled. func (s *session) startACSSession(ctx context.Context, client wsclient.ClientServer) error { ctx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + logger.Info("Current ACS session is done and a new one might be created.", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) + cancel() + }() responseSender := func(response interface{}) error { return client.MakeRequest(response) @@ -314,18 +340,31 @@ func (s *session) startACSSession(ctx context.Context, client wsclient.ClientSer func (s *session) reconnectDelay(acsError error) (time.Duration, bool) { if isInactiveInstanceError(acsError) { - logger.Info("Container instance is deregistered") + logger.Info("Container instance is deregistered", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) s.inactiveInstanceCB() return s.inactiveInstanceReconnectDelay, true } + if shouldReconnectWithoutBackoff(acsError) { // ACS has closed the connection for valid reasons. Example: periodic disconnect. // No need to wait/backoff to reconnect. - logger.Info("ACS WebSocket connection closed for a valid reason") + logger.Info("ACS WebSocket connection closed for a valid reason", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) s.backoff.Reset() return 0, false } + + logger.Warn("ACS WebSocket connection closed", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + field.Error: acsError, + }) // Disconnected unexpectedly from ACS, compute backoff duration to reconnect. return s.backoff.Duration(), true } diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/wsclient/client.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/wsclient/client.go index bf4750776c7..aa4e587d736 100644 --- a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/wsclient/client.go +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/wsclient/client.go @@ -290,7 +290,10 @@ func (cs *ClientServerImpl) Connect(disconnectMetricName string, } } logger.Warn(fmt.Sprintf( - "Error creating a websocket client: %v", err)) + "Error creating a websocket client: %v", err), + logger.Fields{ + "URL": cs.URL, + }) return nil, errors.Wrapf(err, "websocket client: unable to dial %s response: %s", parsedURL.Host, string(resp)) } @@ -486,20 +489,29 @@ func (cs *ClientServerImpl) ConsumeMessages(ctx context.Context) error { case err == nil: if messageType != websocket.TextMessage { // maybe not fatal though, we'll try to process it anyways - logger.Error(fmt.Sprintf("Unexpected messageType: %v", messageType)) + logger.Error(fmt.Sprintf("Unexpected messageType: %v", messageType), + logger.Fields{ + "URL": cs.URL, + }) } cs.handleMessage(message) case permissibleCloseCode(err): - logger.Debug(fmt.Sprintf("Connection closed for a valid reason: %s", err)) + logger.Debug(fmt.Sprintf("Connection closed for a valid reason: %s", err), + logger.Fields{ + "URL": cs.URL, + }) errChan <- io.EOF return default: // Unexpected error occurred logger.Debug(fmt.Sprintf("Error getting message from ws backend: error: [%v], messageType: [%v] ", - err, messageType)) + err, messageType), + logger.Fields{ + "URL": cs.URL, + }) errChan <- err return } @@ -553,11 +565,17 @@ func (cs *ClientServerImpl) CreateRequestMessage(input interface{}) ([]byte, err func (cs *ClientServerImpl) handleMessage(data []byte) { typedMessage, typeStr, err := DecodeData(data, cs.TypeDecoder) if err != nil { - logger.Warn(fmt.Sprintf("Unable to handle message from backend: %v", err)) + logger.Warn(fmt.Sprintf("Unable to handle message from backend: %v", err), + logger.Fields{ + "URL": cs.URL, + }) return } - logger.Debug(fmt.Sprintf("Received message of type: %s", typeStr)) + logger.Debug(fmt.Sprintf("Received message of type: %s", typeStr), + logger.Fields{ + "URL": cs.URL, + }) if cs.AnyRequestHandler != nil { reflect.ValueOf(cs.AnyRequestHandler).Call([]reflect.Value{reflect.ValueOf(typedMessage)}) @@ -566,7 +584,9 @@ func (cs *ClientServerImpl) handleMessage(data []byte) { if handler, ok := cs.RequestHandlers[typeStr]; ok { reflect.ValueOf(handler).Call([]reflect.Value{reflect.ValueOf(typedMessage)}) } else { - logger.Info(fmt.Sprintf("No handler for message type: %s %s", typeStr, typedMessage)) + logger.Info(fmt.Sprintf("No handler for message type: %s %s", typeStr, typedMessage), logger.Fields{ + "URL": cs.URL, + }) } } @@ -604,6 +624,7 @@ func (cs *ClientServerImpl) newDisconnectTimeoutHandler(startTime time.Time, logger.Info(("Websocket connection established."), logger.Fields{ "URL": cs.URL, "ConnectTime": startTime.Format(dateTimeFormat), + "maxConnectionDuration": maxConnectionDuration, "ExpectedDisconnectTime": startTime.Add(disconnectTimeout).Format(dateTimeFormat), }) diff --git a/ecs-agent/acs/session/session.go b/ecs-agent/acs/session/session.go index 7cc815321ca..6a3f733cf99 100644 --- a/ecs-agent/acs/session/session.go +++ b/ecs-agent/acs/session/session.go @@ -178,7 +178,10 @@ func (s *session) Start(ctx context.Context) error { for { select { case <-connectToACS: - logger.Debug("Received connect to ACS message. Attempting connect to ACS") + logger.Debug("Received connect to ACS message. Attempting connect to ACS", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) // Start a session with ACS. acsError := s.startSessionOnce(ctx) @@ -188,29 +191,40 @@ func (s *session) Start(ctx context.Context) error { if ok { logger.Info("Waiting before reconnecting to ACS", logger.Fields{ - "reconnectDelay": reconnectDelay.String(), + "reconnectDelay": reconnectDelay.String(), + "containerInstanceARN": s.containerInstanceARN, }) waitComplete := waitForDuration(ctx, reconnectDelay) if waitComplete { // If the context was not canceled and we've waited for the // wait duration without any errors, send the message to the channel // to reconnect to ACS. - logger.Info("Done waiting; reconnecting to ACS") + logger.Info("Done waiting; reconnecting to ACS", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) sendEmptyMessageOnChannel(connectToACS) } else { // Wait was interrupted. We expect the session to close as canceling // the session context is the only way to end up here. Print a message // to indicate the same. - logger.Info("Interrupted waiting for reconnect delay to elapse; Expect session to close") + logger.Info("Interrupted waiting for reconnect delay to elapse; Expect session to close", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) } } else { // No need to delay reconnect - reconnect immediately. - logger.Info("Reconnecting to ACS immediately without waiting") + logger.Info("Reconnecting to ACS immediately without waiting", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) sendEmptyMessageOnChannel(connectToACS) } case <-ctx.Done(): logger.Info("ACS session ended (context closed)", logger.Fields{ - field.Reason: ctx.Err(), + field.Reason: ctx.Err(), + "containerInstanceARN": s.containerInstanceARN, }) return ctx.Err() } @@ -223,7 +237,8 @@ func (s *session) startSessionOnce(ctx context.Context) error { acsEndpoint, err := s.ecsClient.DiscoverPollEndpoint(s.containerInstanceARN) if err != nil { logger.Error("ACS: Unable to discover poll endpoint", logger.Fields{ - field.Error: err, + "containerInstanceARN": s.containerInstanceARN, + field.Error: err, }) return err } @@ -242,7 +257,8 @@ func (s *session) startSessionOnce(ctx context.Context) error { s.disconnectJitter) if err != nil { logger.Error("Failed to connect to ACS", logger.Fields{ - field.Error: err, + "containerInstanceARN": s.containerInstanceARN, + field.Error: err, }) return err } @@ -253,7 +269,11 @@ func (s *session) startSessionOnce(ctx context.Context) error { // Connection to ACS was successful. Moving forward, rely on ACS to send credentials to Agent at its own cadence // and make sure Agent does not force ACS to send credentials for any subsequent reconnects to ACS. - logger.Info("Connected to ACS endpoint") + logger.Info("Connected to ACS endpoint", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + "lastConnectedTime": s.lastConnectedTime, + }) s.sendCredentials = false return s.startACSSession(ctx, client) @@ -264,7 +284,13 @@ func (s *session) startSessionOnce(ctx context.Context) error { // the context is canceled. func (s *session) startACSSession(ctx context.Context, client wsclient.ClientServer) error { ctx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + logger.Info("Current ACS session is done and a new one might be created.", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) + cancel() + }() responseSender := func(response interface{}) error { return client.MakeRequest(response) @@ -314,18 +340,31 @@ func (s *session) startACSSession(ctx context.Context, client wsclient.ClientSer func (s *session) reconnectDelay(acsError error) (time.Duration, bool) { if isInactiveInstanceError(acsError) { - logger.Info("Container instance is deregistered") + logger.Info("Container instance is deregistered", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) s.inactiveInstanceCB() return s.inactiveInstanceReconnectDelay, true } + if shouldReconnectWithoutBackoff(acsError) { // ACS has closed the connection for valid reasons. Example: periodic disconnect. // No need to wait/backoff to reconnect. - logger.Info("ACS WebSocket connection closed for a valid reason") + logger.Info("ACS WebSocket connection closed for a valid reason", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + }) s.backoff.Reset() return 0, false } + + logger.Warn("ACS WebSocket connection closed", + logger.Fields{ + "containerInstanceARN": s.containerInstanceARN, + field.Error: acsError, + }) // Disconnected unexpectedly from ACS, compute backoff duration to reconnect. return s.backoff.Duration(), true } diff --git a/ecs-agent/wsclient/client.go b/ecs-agent/wsclient/client.go index bf4750776c7..aa4e587d736 100644 --- a/ecs-agent/wsclient/client.go +++ b/ecs-agent/wsclient/client.go @@ -290,7 +290,10 @@ func (cs *ClientServerImpl) Connect(disconnectMetricName string, } } logger.Warn(fmt.Sprintf( - "Error creating a websocket client: %v", err)) + "Error creating a websocket client: %v", err), + logger.Fields{ + "URL": cs.URL, + }) return nil, errors.Wrapf(err, "websocket client: unable to dial %s response: %s", parsedURL.Host, string(resp)) } @@ -486,20 +489,29 @@ func (cs *ClientServerImpl) ConsumeMessages(ctx context.Context) error { case err == nil: if messageType != websocket.TextMessage { // maybe not fatal though, we'll try to process it anyways - logger.Error(fmt.Sprintf("Unexpected messageType: %v", messageType)) + logger.Error(fmt.Sprintf("Unexpected messageType: %v", messageType), + logger.Fields{ + "URL": cs.URL, + }) } cs.handleMessage(message) case permissibleCloseCode(err): - logger.Debug(fmt.Sprintf("Connection closed for a valid reason: %s", err)) + logger.Debug(fmt.Sprintf("Connection closed for a valid reason: %s", err), + logger.Fields{ + "URL": cs.URL, + }) errChan <- io.EOF return default: // Unexpected error occurred logger.Debug(fmt.Sprintf("Error getting message from ws backend: error: [%v], messageType: [%v] ", - err, messageType)) + err, messageType), + logger.Fields{ + "URL": cs.URL, + }) errChan <- err return } @@ -553,11 +565,17 @@ func (cs *ClientServerImpl) CreateRequestMessage(input interface{}) ([]byte, err func (cs *ClientServerImpl) handleMessage(data []byte) { typedMessage, typeStr, err := DecodeData(data, cs.TypeDecoder) if err != nil { - logger.Warn(fmt.Sprintf("Unable to handle message from backend: %v", err)) + logger.Warn(fmt.Sprintf("Unable to handle message from backend: %v", err), + logger.Fields{ + "URL": cs.URL, + }) return } - logger.Debug(fmt.Sprintf("Received message of type: %s", typeStr)) + logger.Debug(fmt.Sprintf("Received message of type: %s", typeStr), + logger.Fields{ + "URL": cs.URL, + }) if cs.AnyRequestHandler != nil { reflect.ValueOf(cs.AnyRequestHandler).Call([]reflect.Value{reflect.ValueOf(typedMessage)}) @@ -566,7 +584,9 @@ func (cs *ClientServerImpl) handleMessage(data []byte) { if handler, ok := cs.RequestHandlers[typeStr]; ok { reflect.ValueOf(handler).Call([]reflect.Value{reflect.ValueOf(typedMessage)}) } else { - logger.Info(fmt.Sprintf("No handler for message type: %s %s", typeStr, typedMessage)) + logger.Info(fmt.Sprintf("No handler for message type: %s %s", typeStr, typedMessage), logger.Fields{ + "URL": cs.URL, + }) } } @@ -604,6 +624,7 @@ func (cs *ClientServerImpl) newDisconnectTimeoutHandler(startTime time.Time, logger.Info(("Websocket connection established."), logger.Fields{ "URL": cs.URL, "ConnectTime": startTime.Format(dateTimeFormat), + "maxConnectionDuration": maxConnectionDuration, "ExpectedDisconnectTime": startTime.Add(disconnectTimeout).Format(dateTimeFormat), })