From 004ffc53a0f6114f49145846aa7132ca2822884d Mon Sep 17 00:00:00 2001 From: Jerry Fan Date: Wed, 23 Oct 2024 11:00:22 -0400 Subject: [PATCH] propagate bazooka failure in auxo --- indexer/services/auxo/__tests__/index.test.ts | 75 ++- indexer/services/auxo/src/helpers.ts | 476 ++++++++++++++++++ indexer/services/auxo/src/index.ts | 461 +---------------- 3 files changed, 554 insertions(+), 458 deletions(-) create mode 100644 indexer/services/auxo/src/helpers.ts diff --git a/indexer/services/auxo/__tests__/index.test.ts b/indexer/services/auxo/__tests__/index.test.ts index 7f7d0c304e..9021760e50 100644 --- a/indexer/services/auxo/__tests__/index.test.ts +++ b/indexer/services/auxo/__tests__/index.test.ts @@ -1,5 +1,74 @@ -describe('index', () => { - it('true is true', () => { - expect(true).toEqual(true); +// handler.test.ts +import { handler } from '../src/index'; +import * as helpers from '../src/helpers'; +import { InvokeCommandOutput, LambdaClient } from '@aws-sdk/client-lambda'; +import { APIGatewayEvent, Context } from 'aws-lambda'; +import { AuxoEventJson } from 'src/types'; + +// Mock logger and startBugsnag from @dydxprotocol-indexer/base +jest.mock('@dydxprotocol-indexer/base', () => { + const originalModule = jest.requireActual('@dydxprotocol-indexer/base'); + return { + ...originalModule, + logger: { + info: jest.fn(), + error: jest.fn(), + crit: jest.fn(), + }, + startBugsnag: jest.fn(), + }; +}); + +describe('Auxo Handler', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('should return 500 when Bazooka Lambda errors', async () => { + // Mock upgradeBazooka to do nothing + jest.spyOn(helpers, 'upgradeBazooka').mockResolvedValue(undefined); + + // Mock LambdaClient.send to return response with FunctionError + jest.spyOn(LambdaClient.prototype, 'send').mockImplementation(() => { + return { + StatusCode: 500, + FunctionError: 'Some bazooka error', + $metadata: { + httpStatusCode: 200, // api returns 200 even if lambda runtime error + requestId: 'mock-request-id-invoke', + extendedRequestId: 'mock-extended-request-id-invoke', + cfId: 'mock-cf-id-invoke', + attempts: 1, + totalRetryDelay: 0, + }, + } as InvokeCommandOutput; + }); + + const mockEvent: APIGatewayEvent & AuxoEventJson = { + // APIGatewayEvent properties + body: null, + headers: {}, + multiValueHeaders: {}, + httpMethod: 'POST', + isBase64Encoded: false, + path: '/deploy', + pathParameters: null, + queryStringParameters: null, + multiValueQueryStringParameters: null, + stageVariables: null, + resource: '', + requestContext: {} as any, + // AuxoEventJson properties + upgrade_tag: 'some_tag', + prefix: 'some_prefix', + region: 'us-east-1', + regionAbbrev: 'us-east-1', + addNewKafkaTopics: false, + onlyRunDbMigrationAndCreateKafkaTopics: false, + }; + + const mockContext: Context = {} as any; + + await expect(handler(mockEvent, mockContext)).rejects.toThrow('bazooka failure: Some bazooka error'); }); }); diff --git a/indexer/services/auxo/src/helpers.ts b/indexer/services/auxo/src/helpers.ts new file mode 100644 index 0000000000..18f835c4c6 --- /dev/null +++ b/indexer/services/auxo/src/helpers.ts @@ -0,0 +1,476 @@ +// define support functions for index.ts here instead of directly in index.ts to make mocking +// simpler with jest.spyOn + +import { + DescribeImagesCommand, DescribeImagesCommandOutput, ECRClient, ImageDetail, +} from '@aws-sdk/client-ecr'; +import { + ContainerDefinition, + DescribeServicesCommand, + DescribeServicesCommandOutput, + DescribeTaskDefinitionCommand, + DescribeTaskDefinitionCommandOutput, + ECSClient, + RegisterTaskDefinitionCommand, + RegisterTaskDefinitionCommandOutput, + Service, + TaskDefinition, + UpdateServiceCommand, + UpdateServiceCommandOutput, +} from '@aws-sdk/client-ecs'; +import { + GetFunctionCommand, + GetFunctionCommandOutput, + InvokeCommand, + InvokeCommandOutput, + LambdaClient, + LastUpdateStatus, + UpdateFunctionCodeCommand, +} from '@aws-sdk/client-lambda'; +import { logger } from '@dydxprotocol-indexer/base'; +import { APIGatewayEvent } from 'aws-lambda'; +import _ from 'lodash'; + +import config from './config'; +import { + BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD, + BAZOOKA_DB_MIGRATION_PAYLOAD, + BAZOOKA_LAMBDA_FUNCTION_NAME, + ECS_SERVICE_NAMES, + SERVICE_NAME_SUFFIX, +} from './constants'; +import { AuxoEventJson, EcsServiceNames, TaskDefinitionArnMap } from './types'; + +export async function upgradeBazooka( + lambda: LambdaClient, + ecr: ECRClient, + event: APIGatewayEvent & AuxoEventJson, +): Promise { + const imageDetail: ImageDetail = await getImageDetail(ecr, `${event.prefix}-indexer-bazooka`, event); + const imageUri = `${imageDetail.registryId}.dkr.ecr.${event.region}.amazonaws.com/${imageDetail.repositoryName}@${imageDetail.imageDigest}`; + logger.info({ + at: 'index#upgradeBazooka', + message: `Upgrading bazooka to ${imageUri}`, + }); + + // Update Lambda function with the new image + await lambda.send( + new UpdateFunctionCodeCommand({ + FunctionName: BAZOOKA_LAMBDA_FUNCTION_NAME, + ImageUri: imageUri, + }), + ); + + // Wait for the lambda to be updated, with a timeout of 120s. + await Promise.race([ + checkLambdaStatus(lambda), + sleep(120000), + ]); +} + +async function checkLambdaStatus( + lambda: LambdaClient, +): Promise { + let updateStatus: LastUpdateStatus | string = LastUpdateStatus.InProgress; + + while (updateStatus === LastUpdateStatus.InProgress) { + const statusResponse: GetFunctionCommandOutput = await lambda.send( + new GetFunctionCommand({ + FunctionName: BAZOOKA_LAMBDA_FUNCTION_NAME, + }), + ); + + updateStatus = statusResponse.Configuration!.LastUpdateStatus!; + if (updateStatus === LastUpdateStatus.Successful) { + logger.info({ + at: 'index#upgradeBazooka', + message: 'Successfully upgraded bazooka', + response: statusResponse, + }); + return; + } else if (updateStatus === LastUpdateStatus.Failed) { + throw new Error('Failed to upgrade bazooka'); + } + + // Wait for 5s before checking again + await new Promise((resolve) => setTimeout(resolve, 5000)); + } +} + +async function getImageDetail( + ecr: ECRClient, + repositoryName: string, + event: APIGatewayEvent & AuxoEventJson, +): Promise { + logger.info({ + at: 'index#getImageDetail', + message: 'Getting ecr images', + repositoryName, + event, + }); + const images: DescribeImagesCommandOutput = await ecr.send(new DescribeImagesCommand({ + repositoryName, + imageIds: [ + { + imageTag: event.upgrade_tag, + }, + ], + })); + logger.info({ + at: 'index#getImageDetail', + message: 'Successfully got ecr images', + images, + repositoryName, + event, + }); + + if (!images.imageDetails || images.imageDetails.length === 0) { + logger.error({ + at: 'index#getImageDetail', + message: 'Unable to find ecr image', + imageTag: event.upgrade_tag, + repositoryName, + event, + }); + throw new Error('Unable to find ecr image'); + } + return images.imageDetails[0]; + +} + +export async function runDbAndKafkaMigration( + createNewKafkaTopics: boolean, + lambda: LambdaClient, +): Promise { + logger.info({ + at: 'index#runDbMigration', + message: 'Running db migration', + }); + const payload = createNewKafkaTopics + ? BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD + : BAZOOKA_DB_MIGRATION_PAYLOAD; + + const response: InvokeCommandOutput = await lambda.send(new InvokeCommand({ + FunctionName: BAZOOKA_LAMBDA_FUNCTION_NAME, + Payload: payload, + // RequestResponse means that the lambda is synchronously invoked + InvocationType: 'RequestResponse', + })); + + // lambda.send() response 200 refers to successful invocation of lambda, not the lambda's response + // Check for FunctionError to determine if the lambda failed + if (response.FunctionError) { + logger.error({ + at: 'index#runDbMigration', + message: `Failed to run db migration due to bazooka failure: ${response.FunctionError}`, + }); + throw new Error(`bazooka failure: ${response.FunctionError}`); + } + + if (response.StatusCode === 200) { + logger.info({ + at: 'index#runDbMigration', + message: 'Successfully ran db migration', + response, + }); + } +} + +export async function createNewEcsTaskDefinitions( + ecs: ECSClient, + ecr: ECRClient, + event: APIGatewayEvent & AuxoEventJson, +): Promise { + logger.info({ + at: 'index#createNewEcsTaskDefinitions', + message: 'Creating new ECS Task Definitions', + }); + const taskDefinitionArns: string[] = await Promise.all(_.map( + ECS_SERVICE_NAMES, + (serviceName: EcsServiceNames) => createNewEcsTaskDefinition(ecs, ecr, event, serviceName), + )); + logger.info({ + at: 'index#createNewEcsTaskDefinitions', + message: 'Created new ECS Task Definition', + }); + return _.zipObject(ECS_SERVICE_NAMES, taskDefinitionArns); +} + +/** + * @returns The revision number of the new task definition + */ +async function createNewEcsTaskDefinition( + ecs: ECSClient, + ecr: ECRClient, + event: APIGatewayEvent & AuxoEventJson, + serviceName: EcsServiceNames, +): Promise { + // Check that the ECR image exists, will throw error here if it does not + await getImageDetail(ecr, `${event.prefix}-indexer-${serviceName}`, event); + + const taskDefinitionName = `${event.prefix}-indexer-${event.regionAbbrev}-${serviceName}-task`; + logger.info({ + at: 'index#createNewEcsTaskDefinition', + message: 'Get existing ECS Task Definition', + taskDefinitionName, + }); + const describeResult: DescribeTaskDefinitionCommandOutput = await ecs.send( + new DescribeTaskDefinitionCommand({ + taskDefinition: taskDefinitionName, + }), + ); + logger.info({ + at: 'index#createNewEcsTaskDefinition', + message: 'Got existing ECS Task Definition', + taskDefinitionName, + describeResult, + }); + + if (describeResult.taskDefinition === undefined) { + logger.error({ + at: 'index#createNewEcsTaskDefinition', + message: 'Unable to find existing ECS Task Definition', + taskDefinitionName, + }); + throw new Error('Unable to find existing ECS Task Definition'); + } + + // All ECS Task Definitions should have two container definitions, the service container + // , and the datadog agent + const taskDefinition: TaskDefinition = describeResult.taskDefinition; + const serviceContainerDefinitionIndex: number = getServiceContainerDefinitionIndex( + taskDefinition, + ); + + const serviceContainerDefinition: + ContainerDefinition = taskDefinition.containerDefinitions![serviceContainerDefinitionIndex]; + if (serviceContainerDefinition.image === undefined) { + logger.error({ + at: 'index#createNewEcsTaskDefinition', + message: 'No image found in the container definition', + taskDefinitionName, + }); + throw new Error('No image found in the container definition'); + } + const originalImage: string = serviceContainerDefinition.image; + const updatedContainerDefinitions: ContainerDefinition[] = _.cloneDeep( + taskDefinition.containerDefinitions!, + ); + const newImage: string = `${_.split(originalImage, ':')[0]}:${event.upgrade_tag}`; + updatedContainerDefinitions[serviceContainerDefinitionIndex].image = newImage; + + logger.info({ + at: 'index#createNewEcsTaskDefinition', + message: 'Registering new task definition', + taskDefinitionName, + }); + const registerResult: + RegisterTaskDefinitionCommandOutput = await ecs.send(new RegisterTaskDefinitionCommand({ + family: taskDefinition.family, + taskRoleArn: taskDefinition.taskRoleArn, + executionRoleArn: taskDefinition.executionRoleArn, + networkMode: taskDefinition.networkMode, + containerDefinitions: updatedContainerDefinitions, + volumes: taskDefinition.volumes, + placementConstraints: taskDefinition.placementConstraints, + requiresCompatibilities: taskDefinition.requiresCompatibilities, + cpu: taskDefinition.cpu, + memory: taskDefinition.memory, + ipcMode: taskDefinition.ipcMode, + proxyConfiguration: taskDefinition.proxyConfiguration, + inferenceAccelerators: taskDefinition.inferenceAccelerators, + runtimePlatform: taskDefinition.runtimePlatform, + })); + + if (registerResult.taskDefinition === undefined || + registerResult.taskDefinition.taskDefinitionArn === undefined + ) { + logger.error({ + at: 'index#createNewEcsTaskDefinition', + message: 'Failed to register new task definition', + }); + throw new Error('Failed to register new task definition'); + } + + await waitForTaskDefinitionToRegister(ecs, registerResult); + return registerResult.taskDefinition.taskDefinitionArn; +} + +function getServiceContainerDefinitionIndex( + taskDefinition: TaskDefinition, +): number { + const containerDefinitions: + ContainerDefinition[] | undefined = taskDefinition.containerDefinitions; + if (containerDefinitions === undefined || containerDefinitions.length === 0) { + logger.error({ + at: 'index#getServiceTaskDefinition', + message: 'No container definitions found in the task definition', + taskDefinition, + }); + throw new Error('No container definitions found in the task definition'); + } + + const index: number = containerDefinitions.findIndex( + (containerDefinition: ContainerDefinition) => { + return _.endsWith(containerDefinition.name, SERVICE_NAME_SUFFIX); + }, + ); + if (index >= 0) { + return index; + } + + logger.error({ + at: 'index#getServiceTaskDefinition', + message: 'No service container definition found in the task definition', + containerDefinitions, + }); + throw new Error('No service container definition found in the task definition'); +} + +/** + * Registering a task definition is asynchronous, and this step ensures that the task definition + * is usable in the ECS service before we attempt to update the ECS service. + */ +async function waitForTaskDefinitionToRegister( + ecs: ECSClient, + registerResult: RegisterTaskDefinitionCommandOutput, +): Promise { + const taskDefinition: + string = `${registerResult.taskDefinition!.family}:${registerResult.taskDefinition!.revision}`; + for (let i = 0; i <= config.MAX_TASK_DEFINITION_WAIT_TIME_MS; i += config.SLEEP_TIME_MS) { + const describeResult: DescribeTaskDefinitionCommandOutput = await ecs.send( + new DescribeTaskDefinitionCommand({ + taskDefinition, + }), + ); + + if (describeResult.taskDefinition !== undefined) { + logger.info({ + at: 'index#waitForTaskDefinitionToRegister', + message: 'Task definition registered', + taskDefinition, + describeResult, + }); + return; + } + logger.info({ + at: 'index#waitForTaskDefinitionToRegister', + message: `Task definition is undefined, sleeping ${config.SLEEP_TIME_MS}ms`, + }); + + await sleep(config.SLEEP_TIME_MS); + } + logger.error({ + at: 'index#waitForTaskDefinitionToRegister', + message: 'Timed out waiting for task definition to register', + taskDefinition, + }); + throw new Error('Timed out waiting for task definition to register'); +} + +async function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export async function upgradeEcsServices( + ecs: ECSClient, + event: APIGatewayEvent & AuxoEventJson, + taskDefinitionArnMap: TaskDefinitionArnMap, +): Promise { + logger.info({ + at: 'index#upgradeEcsServices', + message: 'Describe Services', + }); + const ecsPrefix: string = `${event.prefix}-indexer-${event.regionAbbrev}`; + const response: DescribeServicesCommandOutput = await ecs.send(new DescribeServicesCommand({ + cluster: `${ecsPrefix}-cluster`, + services: _.map( + ECS_SERVICE_NAMES, + (serviceName: EcsServiceNames) => { + return `${ecsPrefix}-${serviceName}`; + }, + ), + include: [], + })); + + logger.info({ + at: 'index#upgradeEcsServices', + message: 'Described Services', + response, + }); + + if (response.services === undefined) { + logger.error({ + at: 'index#upgradeEcsServices', + message: 'No services found', + }); + throw new Error('No services found'); + } else if (response.services.length !== ECS_SERVICE_NAMES.length) { + logger.error({ + at: 'index#upgradeEcsServices', + message: 'Not all services found', + numServicesFound: response.services.length, + services: response.services, + numServicesExpected: ECS_SERVICE_NAMES.length, + }); + throw new Error('Not all services found'); + } + + logger.info({ + at: 'index#upgradeEcsServices', + message: 'Upgrading ECS Services', + }); + const services: Service[] = response.services; + await Promise.all(_.map( + ECS_SERVICE_NAMES, + (serviceName: string, index: number) => upgradeEcsService( + ecs, + services[index], + taskDefinitionArnMap[serviceName], + ), + )); + + logger.info({ + at: 'index#upgradeEcsServices', + message: 'Upgraded ECS Services', + }); +} + +async function upgradeEcsService( + ecs: ECSClient, + service: Service, + taskDefinitionArn: string, +): Promise { + logger.info({ + at: 'index#upgradeEcsService', + message: 'Upgrading ECS Service', + service, + taskDefinitionArn, + }); + const response: UpdateServiceCommandOutput = await ecs.send(new UpdateServiceCommand({ + cluster: service.clusterArn, + service: service.serviceName, + desiredCount: service.desiredCount, + taskDefinition: taskDefinitionArn, + capacityProviderStrategy: service.capacityProviderStrategy, + deploymentConfiguration: service.deploymentConfiguration, + networkConfiguration: service.networkConfiguration, + placementConstraints: service.placementConstraints, + placementStrategy: service.placementStrategy, + platformVersion: service.platformVersion, + healthCheckGracePeriodSeconds: service.healthCheckGracePeriodSeconds, + enableExecuteCommand: service.enableExecuteCommand, + enableECSManagedTags: service.enableECSManagedTags, + loadBalancers: service.loadBalancers, + propagateTags: service.propagateTags, + serviceRegistries: service.serviceRegistries, + })); + + logger.info({ + at: 'index#upgradeEcsService', + message: 'Upgraded ECS Service', + serviceName: service.serviceName, + taskDefinitionArn, + response, + }); +} diff --git a/indexer/services/auxo/src/index.ts b/indexer/services/auxo/src/index.ts index f350fa56f3..34b08806c2 100644 --- a/indexer/services/auxo/src/index.ts +++ b/indexer/services/auxo/src/index.ts @@ -1,42 +1,13 @@ -import { - DescribeImagesCommand, DescribeImagesCommandOutput, ECRClient, ImageDetail, -} from '@aws-sdk/client-ecr'; -import { - ContainerDefinition, - DescribeServicesCommand, - DescribeServicesCommandOutput, - DescribeTaskDefinitionCommand, - DescribeTaskDefinitionCommandOutput, - ECSClient, - RegisterTaskDefinitionCommand, - RegisterTaskDefinitionCommandOutput, - Service, - TaskDefinition, - UpdateServiceCommand, - UpdateServiceCommandOutput, -} from '@aws-sdk/client-ecs'; -import { - GetFunctionCommand, - GetFunctionCommandOutput, - InvokeCommand, - InvokeCommandOutput, - LambdaClient, - LastUpdateStatus, - UpdateFunctionCodeCommand, -} from '@aws-sdk/client-lambda'; +import { ECRClient } from '@aws-sdk/client-ecr'; +import { ECSClient } from '@aws-sdk/client-ecs'; +import { LambdaClient } from '@aws-sdk/client-lambda'; import { logger, startBugsnag } from '@dydxprotocol-indexer/base'; import { APIGatewayEvent, APIGatewayProxyResult, Context } from 'aws-lambda'; -import _ from 'lodash'; -import config from './config'; import { - BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD, - BAZOOKA_DB_MIGRATION_PAYLOAD, - BAZOOKA_LAMBDA_FUNCTION_NAME, - ECS_SERVICE_NAMES, - SERVICE_NAME_SUFFIX, -} from './constants'; -import { AuxoEventJson, EcsServiceNames, TaskDefinitionArnMap } from './types'; + upgradeBazooka, runDbAndKafkaMigration, createNewEcsTaskDefinitions, upgradeEcsServices, +} from './helpers'; +import { AuxoEventJson, TaskDefinitionArnMap } from './types'; /** * Upgrades all services and run migrations @@ -105,423 +76,3 @@ export async function handler( }), }; } - -async function upgradeBazooka( - lambda: LambdaClient, - ecr: ECRClient, - event: APIGatewayEvent & AuxoEventJson, -): Promise { - const imageDetail: ImageDetail = await getImageDetail(ecr, `${event.prefix}-indexer-bazooka`, event); - const imageUri = `${imageDetail.registryId}.dkr.ecr.${event.region}.amazonaws.com/${imageDetail.repositoryName}@${imageDetail.imageDigest}`; - logger.info({ - at: 'index#upgradeBazooka', - message: `Upgrading bazooka to ${imageUri}`, - }); - - // Update Lambda function with the new image - await lambda.send( - new UpdateFunctionCodeCommand({ - FunctionName: BAZOOKA_LAMBDA_FUNCTION_NAME, - ImageUri: imageUri, - }), - ); - - // Wait for the lambda to be updated, with a timeout of 120s. - await Promise.race([ - checkLambdaStatus(lambda), - sleep(120000), - ]); -} - -async function checkLambdaStatus( - lambda: LambdaClient, -): Promise { - let updateStatus: LastUpdateStatus | string = LastUpdateStatus.InProgress; - - while (updateStatus === LastUpdateStatus.InProgress) { - const statusResponse: GetFunctionCommandOutput = await lambda.send( - new GetFunctionCommand({ - FunctionName: BAZOOKA_LAMBDA_FUNCTION_NAME, - }), - ); - - updateStatus = statusResponse.Configuration!.LastUpdateStatus!; - if (updateStatus === LastUpdateStatus.Successful) { - logger.info({ - at: 'index#upgradeBazooka', - message: 'Successfully upgraded bazooka', - response: statusResponse, - }); - return; - } else if (updateStatus === LastUpdateStatus.Failed) { - throw new Error('Failed to upgrade bazooka'); - } - - // Wait for 5s before checking again - await new Promise((resolve) => setTimeout(resolve, 5000)); - } -} - -async function getImageDetail( - ecr: ECRClient, - repositoryName: string, - event: APIGatewayEvent & AuxoEventJson, -): Promise { - logger.info({ - at: 'index#getImageDetail', - message: 'Getting ecr images', - repositoryName, - event, - }); - const images: DescribeImagesCommandOutput = await ecr.send(new DescribeImagesCommand({ - repositoryName, - imageIds: [ - { - imageTag: event.upgrade_tag, - }, - ], - })); - logger.info({ - at: 'index#getImageDetail', - message: 'Successfully got ecr images', - images, - repositoryName, - event, - }); - - if (!images.imageDetails || images.imageDetails.length === 0) { - logger.error({ - at: 'index#getImageDetail', - message: 'Unable to find ecr image', - imageTag: event.upgrade_tag, - repositoryName, - event, - }); - throw new Error('Unable to find ecr image'); - } - return images.imageDetails[0]; - -} - -async function runDbAndKafkaMigration( - createNewKafkaTopics: boolean, - lambda: ECRClient, -): Promise { - logger.info({ - at: 'index#runDbMigration', - message: 'Running db migration', - }); - const payload = createNewKafkaTopics - ? BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD - : BAZOOKA_DB_MIGRATION_PAYLOAD; - const response: InvokeCommandOutput = await lambda.send(new InvokeCommand({ - FunctionName: BAZOOKA_LAMBDA_FUNCTION_NAME, - Payload: payload, - // RequestResponse means that the lambda is synchronously invoked - InvocationType: 'RequestResponse', - })); - logger.info({ - at: 'index#runDbMigration', - message: 'Successfully ran db migration', - response, - }); -} - -async function createNewEcsTaskDefinitions( - ecs: ECSClient, - ecr: ECRClient, - event: APIGatewayEvent & AuxoEventJson, -): Promise { - logger.info({ - at: 'index#createNewEcsTaskDefinitions', - message: 'Creating new ECS Task Definitions', - }); - const taskDefinitionArns: string[] = await Promise.all(_.map( - ECS_SERVICE_NAMES, - (serviceName: EcsServiceNames) => createNewEcsTaskDefinition(ecs, ecr, event, serviceName), - )); - logger.info({ - at: 'index#createNewEcsTaskDefinitions', - message: 'Created new ECS Task Definition', - }); - return _.zipObject(ECS_SERVICE_NAMES, taskDefinitionArns); -} - -/** - * @returns The revision number of the new task definition - */ -async function createNewEcsTaskDefinition( - ecs: ECSClient, - ecr: ECRClient, - event: APIGatewayEvent & AuxoEventJson, - serviceName: EcsServiceNames, -): Promise { - // Check that the ECR image exists, will throw error here if it does not - await getImageDetail(ecr, `${event.prefix}-indexer-${serviceName}`, event); - - const taskDefinitionName = `${event.prefix}-indexer-${event.regionAbbrev}-${serviceName}-task`; - logger.info({ - at: 'index#createNewEcsTaskDefinition', - message: 'Get existing ECS Task Definition', - taskDefinitionName, - }); - const describeResult: DescribeTaskDefinitionCommandOutput = await ecs.send( - new DescribeTaskDefinitionCommand({ - taskDefinition: taskDefinitionName, - }), - ); - logger.info({ - at: 'index#createNewEcsTaskDefinition', - message: 'Got existing ECS Task Definition', - taskDefinitionName, - describeResult, - }); - - if (describeResult.taskDefinition === undefined) { - logger.error({ - at: 'index#createNewEcsTaskDefinition', - message: 'Unable to find existing ECS Task Definition', - taskDefinitionName, - }); - throw new Error('Unable to find existing ECS Task Definition'); - } - - // All ECS Task Definitions should have two container definitions, the service container - // , and the datadog agent - const taskDefinition: TaskDefinition = describeResult.taskDefinition; - const serviceContainerDefinitionIndex: number = getServiceContainerDefinitionIndex( - taskDefinition, - ); - - const serviceContainerDefinition: - ContainerDefinition = taskDefinition.containerDefinitions![serviceContainerDefinitionIndex]; - if (serviceContainerDefinition.image === undefined) { - logger.error({ - at: 'index#createNewEcsTaskDefinition', - message: 'No image found in the container definition', - taskDefinitionName, - }); - throw new Error('No image found in the container definition'); - } - const originalImage: string = serviceContainerDefinition.image; - const updatedContainerDefinitions: ContainerDefinition[] = _.cloneDeep( - taskDefinition.containerDefinitions!, - ); - const newImage: string = `${_.split(originalImage, ':')[0]}:${event.upgrade_tag}`; - updatedContainerDefinitions[serviceContainerDefinitionIndex].image = newImage; - - logger.info({ - at: 'index#createNewEcsTaskDefinition', - message: 'Registering new task definition', - taskDefinitionName, - }); - const registerResult: - RegisterTaskDefinitionCommandOutput = await ecs.send(new RegisterTaskDefinitionCommand({ - family: taskDefinition.family, - taskRoleArn: taskDefinition.taskRoleArn, - executionRoleArn: taskDefinition.executionRoleArn, - networkMode: taskDefinition.networkMode, - containerDefinitions: updatedContainerDefinitions, - volumes: taskDefinition.volumes, - placementConstraints: taskDefinition.placementConstraints, - requiresCompatibilities: taskDefinition.requiresCompatibilities, - cpu: taskDefinition.cpu, - memory: taskDefinition.memory, - ipcMode: taskDefinition.ipcMode, - proxyConfiguration: taskDefinition.proxyConfiguration, - inferenceAccelerators: taskDefinition.inferenceAccelerators, - runtimePlatform: taskDefinition.runtimePlatform, - })); - - if (registerResult.taskDefinition === undefined || - registerResult.taskDefinition.taskDefinitionArn === undefined - ) { - logger.error({ - at: 'index#createNewEcsTaskDefinition', - message: 'Failed to register new task definition', - }); - throw new Error('Failed to register new task definition'); - } - - await waitForTaskDefinitionToRegister(ecs, registerResult); - return registerResult.taskDefinition.taskDefinitionArn; -} - -function getServiceContainerDefinitionIndex( - taskDefinition: TaskDefinition, -): number { - const containerDefinitions: - ContainerDefinition[] | undefined = taskDefinition.containerDefinitions; - if (containerDefinitions === undefined || containerDefinitions.length === 0) { - logger.error({ - at: 'index#getServiceTaskDefinition', - message: 'No container definitions found in the task definition', - taskDefinition, - }); - throw new Error('No container definitions found in the task definition'); - } - - const index: number = containerDefinitions.findIndex( - (containerDefinition: ContainerDefinition) => { - return _.endsWith(containerDefinition.name, SERVICE_NAME_SUFFIX); - }, - ); - if (index >= 0) { - return index; - } - - logger.error({ - at: 'index#getServiceTaskDefinition', - message: 'No service container definition found in the task definition', - containerDefinitions, - }); - throw new Error('No service container definition found in the task definition'); -} - -/** - * Registering a task definition is asynchronous, and this step ensures that the task definition - * is usable in the ECS service before we attempt to update the ECS service. - */ -async function waitForTaskDefinitionToRegister( - ecs: ECSClient, - registerResult: RegisterTaskDefinitionCommandOutput, -): Promise { - const taskDefinition: - string = `${registerResult.taskDefinition!.family}:${registerResult.taskDefinition!.revision}`; - for (let i = 0; i <= config.MAX_TASK_DEFINITION_WAIT_TIME_MS; i += config.SLEEP_TIME_MS) { - const describeResult: DescribeTaskDefinitionCommandOutput = await ecs.send( - new DescribeTaskDefinitionCommand({ - taskDefinition, - }), - ); - - if (describeResult.taskDefinition !== undefined) { - logger.info({ - at: 'index#waitForTaskDefinitionToRegister', - message: 'Task definition registered', - taskDefinition, - describeResult, - }); - return; - } - logger.info({ - at: 'index#waitForTaskDefinitionToRegister', - message: `Task definition is undefined, sleeping ${config.SLEEP_TIME_MS}ms`, - }); - - await sleep(config.SLEEP_TIME_MS); - } - logger.error({ - at: 'index#waitForTaskDefinitionToRegister', - message: 'Timed out waiting for task definition to register', - taskDefinition, - }); - throw new Error('Timed out waiting for task definition to register'); -} - -async function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - -async function upgradeEcsServices( - ecs: ECSClient, - event: APIGatewayEvent & AuxoEventJson, - taskDefinitionArnMap: TaskDefinitionArnMap, -): Promise { - logger.info({ - at: 'index#upgradeEcsServices', - message: 'Describe Services', - }); - const ecsPrefix: string = `${event.prefix}-indexer-${event.regionAbbrev}`; - const response: DescribeServicesCommandOutput = await ecs.send(new DescribeServicesCommand({ - cluster: `${ecsPrefix}-cluster`, - services: _.map( - ECS_SERVICE_NAMES, - (serviceName: EcsServiceNames) => { - return `${ecsPrefix}-${serviceName}`; - }, - ), - include: [], - })); - - logger.info({ - at: 'index#upgradeEcsServices', - message: 'Described Services', - response, - }); - - if (response.services === undefined) { - logger.error({ - at: 'index#upgradeEcsServices', - message: 'No services found', - }); - throw new Error('No services found'); - } else if (response.services.length !== ECS_SERVICE_NAMES.length) { - logger.error({ - at: 'index#upgradeEcsServices', - message: 'Not all services found', - numServicesFound: response.services.length, - services: response.services, - numServicesExpected: ECS_SERVICE_NAMES.length, - }); - throw new Error('Not all services found'); - } - - logger.info({ - at: 'index#upgradeEcsServices', - message: 'Upgrading ECS Services', - }); - const services: Service[] = response.services; - await Promise.all(_.map( - ECS_SERVICE_NAMES, - (serviceName: string, index: number) => upgradeEcsService( - ecs, - services[index], - taskDefinitionArnMap[serviceName], - ), - )); - - logger.info({ - at: 'index#upgradeEcsServices', - message: 'Upgraded ECS Services', - }); -} - -async function upgradeEcsService( - ecs: ECSClient, - service: Service, - taskDefinitionArn: string, -): Promise { - logger.info({ - at: 'index#upgradeEcsService', - message: 'Upgrading ECS Service', - service, - taskDefinitionArn, - }); - const response: UpdateServiceCommandOutput = await ecs.send(new UpdateServiceCommand({ - cluster: service.clusterArn, - service: service.serviceName, - desiredCount: service.desiredCount, - taskDefinition: taskDefinitionArn, - capacityProviderStrategy: service.capacityProviderStrategy, - deploymentConfiguration: service.deploymentConfiguration, - networkConfiguration: service.networkConfiguration, - placementConstraints: service.placementConstraints, - placementStrategy: service.placementStrategy, - platformVersion: service.platformVersion, - healthCheckGracePeriodSeconds: service.healthCheckGracePeriodSeconds, - enableExecuteCommand: service.enableExecuteCommand, - enableECSManagedTags: service.enableECSManagedTags, - loadBalancers: service.loadBalancers, - propagateTags: service.propagateTags, - serviceRegistries: service.serviceRegistries, - })); - - logger.info({ - at: 'index#upgradeEcsService', - message: 'Upgraded ECS Service', - serviceName: service.serviceName, - taskDefinitionArn, - response, - }); -}