Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more event logging and timeout for cloud account event handler #361

Merged
merged 2 commits into from
Apr 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 117 additions & 112 deletions fixbackend/cloud_accounts/service_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,132 +324,137 @@ async def handle_stack_deleted(msg: Json) -> Optional[CloudAccount]:
return None

async def process_domain_event(self, message: Json, context: MessageContext) -> None:
log.info(f"Received domain event: {message}")

async def send_pub_sub_message(
e: Union[AwsAccountDegraded, AwsAccountDiscovered, AwsAccountDeleted, AwsAccountConfigured]
) -> None:
msg = e.to_json()
msg.pop("tenant_id", None)
await self.pubsub_publisher.publish(kind=e.kind, message=msg, channel=f"tenant-events::{e.tenant_id}")

match context.kind:
case TenantAccountsCollected.kind:
event = TenantAccountsCollected.from_json(message)

accounts = await self.cloud_account_repository.list(list(event.cloud_accounts.keys()))
collected_accounts = [account for account in accounts if account.id in event.cloud_accounts]
first_workspace_collect = all(account.last_scan_started_at is None for account in accounts)
first_account_collect = any(account.last_scan_started_at is None for account in collected_accounts)

set_workspace_id(event.tenant_id)
for account_id, account in event.cloud_accounts.items():
set_fix_cloud_account_id(account_id)
set_cloud_account_id(account.account_id)

def compute_failed_scan_count(acc: CloudAccount) -> int:
if account.scanned_resources < 50:
return acc.failed_scan_count + 1
else:
return 0

updated = await self.cloud_account_repository.update(
account_id,
lambda acc: evolve(
acc,
last_scan_duration_seconds=account.duration_seconds,
last_scan_resources_scanned=account.scanned_resources,
last_scan_started_at=account.started_at,
next_scan=event.next_run,
failed_scan_count=compute_failed_scan_count(acc),
),
async with asyncio.timeout(10):
match context.kind:
case TenantAccountsCollected.kind:
event = TenantAccountsCollected.from_json(message)

accounts = await self.cloud_account_repository.list(list(event.cloud_accounts.keys()))
collected_accounts = [account for account in accounts if account.id in event.cloud_accounts]
first_workspace_collect = all(account.last_scan_started_at is None for account in accounts)
first_account_collect = any(account.last_scan_started_at is None for account in collected_accounts)

set_workspace_id(event.tenant_id)
for account_id, account in event.cloud_accounts.items():
set_fix_cloud_account_id(account_id)
set_cloud_account_id(account.account_id)

def compute_failed_scan_count(acc: CloudAccount) -> int:
if account.scanned_resources < 50:
return acc.failed_scan_count + 1
else:
return 0

updated = await self.cloud_account_repository.update(
account_id,
lambda acc: evolve(
acc,
last_scan_duration_seconds=account.duration_seconds,
last_scan_resources_scanned=account.scanned_resources,
last_scan_started_at=account.started_at,
next_scan=event.next_run,
failed_scan_count=compute_failed_scan_count(acc),
),
)

if updated.failed_scan_count > 3:
await self.__degrade_account(updated.id, "Too many consecutive failed scans")

user_id = await self.analytics_event_sender.user_id_from_workspace(event.tenant_id)
if first_workspace_collect:
await self.analytics_event_sender.send(
AEFirstWorkspaceCollectFinished(user_id, event.tenant_id)
)
# inform workspace users about the first successful collect
await self.notification_service.send_message_to_workspace(
workspace_id=event.tenant_id, message=email.SecurityScanFinished()
)
if first_account_collect:
await self.analytics_event_sender.send(AEFirstAccountCollectFinished(user_id, event.tenant_id))

await self.analytics_event_sender.send(
AEWorkspaceCollectFinished(
user_id,
event.tenant_id,
len(collected_accounts),
sum(a.scanned_resources for a in event.cloud_accounts.values()),
)
)

if updated.failed_scan_count > 3:
await self.__degrade_account(updated.id, "Too many consecutive failed scans")
case AwsAccountDiscovered.kind:
discovered_event = AwsAccountDiscovered.from_json(message)
set_cloud_account_id(discovered_event.aws_account_id)
set_fix_cloud_account_id(discovered_event.cloud_account_id)
set_workspace_id(discovered_event.tenant_id)
await self.process_discovered_event(discovered_event)
await send_pub_sub_message(discovered_event)

case AwsAccountConfigured.kind:
configured_event = AwsAccountConfigured.from_json(message)
await send_pub_sub_message(configured_event)

user_id = await self.analytics_event_sender.user_id_from_workspace(event.tenant_id)
if first_workspace_collect:
await self.analytics_event_sender.send(AEFirstWorkspaceCollectFinished(user_id, event.tenant_id))
# inform workspace users about the first successful collect
case AwsAccountDeleted.kind:
deleted_event = AwsAccountDeleted.from_json(message)
await send_pub_sub_message(deleted_event)

case AwsAccountDegraded.kind:
degraded_event = AwsAccountDegraded.from_json(message)
await self.notification_service.send_message_to_workspace(
workspace_id=event.tenant_id, message=email.SecurityScanFinished()
)
if first_account_collect:
await self.analytics_event_sender.send(AEFirstAccountCollectFinished(user_id, event.tenant_id))

await self.analytics_event_sender.send(
AEWorkspaceCollectFinished(
user_id,
event.tenant_id,
len(collected_accounts),
sum(a.scanned_resources for a in event.cloud_accounts.values()),
workspace_id=degraded_event.tenant_id,
message=email.AccountDegraded(
cloud_account_id=degraded_event.aws_account_id,
tenant_id=degraded_event.tenant_id,
account_name=degraded_event.aws_account_name,
),
)
)

case AwsAccountDiscovered.kind:
discovered_event = AwsAccountDiscovered.from_json(message)
set_cloud_account_id(discovered_event.aws_account_id)
set_fix_cloud_account_id(discovered_event.cloud_account_id)
set_workspace_id(discovered_event.tenant_id)
await self.process_discovered_event(discovered_event)
await send_pub_sub_message(discovered_event)

case AwsAccountConfigured.kind:
configured_event = AwsAccountConfigured.from_json(message)
await send_pub_sub_message(configured_event)

case AwsAccountDeleted.kind:
deleted_event = AwsAccountDeleted.from_json(message)
await send_pub_sub_message(deleted_event)

case AwsAccountDegraded.kind:
degraded_event = AwsAccountDegraded.from_json(message)
await self.notification_service.send_message_to_workspace(
workspace_id=degraded_event.tenant_id,
message=email.AccountDegraded(
cloud_account_id=degraded_event.aws_account_id,
tenant_id=degraded_event.tenant_id,
account_name=degraded_event.aws_account_name,
),
)
await send_pub_sub_message(degraded_event)

case ProductTierChanged.kind:
ptc_evt = ProductTierChanged.from_json(message)
new_account_limit = ProductTierSettings[ptc_evt.product_tier].account_limit or math.inf
old_account_limit = ProductTierSettings[ptc_evt.previous_tier].account_limit or math.inf
if new_account_limit < old_account_limit:
# we should not have infinity here
new_account_limit = round(new_account_limit)
# tier changed, time to delete accounts
all_accounts = await self.list_accounts(ptc_evt.workspace_id)
# keep the last new_account_limit accounts
to_delete = all_accounts[:-new_account_limit]
# delete them all in parallel
await send_pub_sub_message(degraded_event)

case ProductTierChanged.kind:
ptc_evt = ProductTierChanged.from_json(message)
new_account_limit = ProductTierSettings[ptc_evt.product_tier].account_limit or math.inf
old_account_limit = ProductTierSettings[ptc_evt.previous_tier].account_limit or math.inf
if new_account_limit < old_account_limit:
# we should not have infinity here
new_account_limit = round(new_account_limit)
# tier changed, time to delete accounts
all_accounts = await self.list_accounts(ptc_evt.workspace_id)
# keep the last new_account_limit accounts
to_delete = all_accounts[:-new_account_limit]
# delete them all in parallel
async with asyncio.TaskGroup() as tg:
for cloud_account in to_delete:
tg.create_task(
self.delete_cloud_account(ptc_evt.user_id, cloud_account.id, ptc_evt.workspace_id)
)

case AwsMarketplaceSubscriptionCancelled.kind:
evt = AwsMarketplaceSubscriptionCancelled.from_json(message)
workspaces = await self.workspace_repository.list_workspaces_by_subscription_id(evt.subscription_id)
async with asyncio.TaskGroup() as tg:
for cloud_account in to_delete:
tg.create_task(
self.delete_cloud_account(ptc_evt.user_id, cloud_account.id, ptc_evt.workspace_id)
)

case AwsMarketplaceSubscriptionCancelled.kind:
evt = AwsMarketplaceSubscriptionCancelled.from_json(message)
workspaces = await self.workspace_repository.list_workspaces_by_subscription_id(evt.subscription_id)
async with asyncio.TaskGroup() as tg:
for ws in workspaces:
# first move the tier to free
await self.workspace_repository.update_payment_on_hold(ws.id, utc())
# second remove the subscription from the workspace
await self.workspace_repository.update_subscription(ws.id, None)
# third disable all accounts
account_limit = Free.account_limit or 1
all_accounts = await self.list_accounts(ws.id)
# keep the last account_limit accounts
to_disable = all_accounts[:-account_limit]
for cloud_account in to_disable:
tg.create_task(self.update_cloud_account_enabled(ws.id, cloud_account.id, False))
for ws in workspaces:
# first move the tier to free
await self.workspace_repository.update_payment_on_hold(ws.id, utc())
# second remove the subscription from the workspace
await self.workspace_repository.update_subscription(ws.id, None)
# third disable all accounts
account_limit = Free.account_limit or 1
all_accounts = await self.list_accounts(ws.id)
# keep the last account_limit accounts
to_disable = all_accounts[:-account_limit]
for cloud_account in to_disable:
tg.create_task(self.update_cloud_account_enabled(ws.id, cloud_account.id, False))

case _:
pass # ignore other domain events
case _:
pass # ignore other domain events

async def process_discovered_event(self, discovered: AwsAccountDiscovered) -> None:
account = await self.cloud_account_repository.get(discovered.cloud_account_id)
Expand Down
Loading