Skip to content

Commit

Permalink
[feat] Allow filtering descendant summary
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Oct 9, 2024
1 parent a3a6a8b commit 95408c7
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 20 deletions.
15 changes: 12 additions & 3 deletions fixbackend/inventory/inventory_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
TimeseriesRequest,
Scatters,
KindUsage,
KindUsageRequest,
)
from fixbackend.streaming_response import streaming_response, StreamOnSuccessResponse
from fixbackend.workspaces.dependencies import UserWorkspaceDependency
Expand Down Expand Up @@ -413,8 +414,16 @@ async def timeseries(graph_db: CurrentGraphDbDependency, ts: TimeseriesRequest)
aggregation=ts.aggregation,
)

@router.get("/descendant/summary", tags=["search"])
async def descendant_summary_account(graph_db: CurrentGraphDbDependency) -> Dict[str, KindUsage]:
return await inventory().descendant_summary(graph_db)
@router.post("/descendant/summary", tags=["search"])
async def descendant_summary_account(
graph_db: CurrentGraphDbDependency, request: KindUsageRequest
) -> Dict[str, KindUsage]:
return await inventory().descendant_summary(
graph_db,
request.cloud_ids or [],
request.account_ids or [],
request.region_ids or [],
request.kinds or [],
)

return router
7 changes: 7 additions & 0 deletions fixbackend/inventory/inventory_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ class TimeseriesRequest(BaseModel):
aggregation: Optional[str] = None


class KindUsageRequest(BaseModel):
cloud_ids: Optional[List[str]] = Field(default=None, description="The cloud ids to filter by.")
account_ids: Optional[List[str]] = Field(default=None, description="The account ids to filter by.")
region_ids: Optional[List[str]] = Field(default=None, description="The region ids to filter by.")
kinds: Optional[List[str]] = Field(default=None, description="The kinds to filter by.")


class KindUsage(BaseModel):
accounts: int = Field(default=0, description="The number of accounts using this kind.")
regions: int = Field(default=0, description="The number of regions using this kind.")
Expand Down
107 changes: 90 additions & 17 deletions fixbackend/inventory/inventory_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import asyncio
import json
import logging
Expand All @@ -43,7 +44,8 @@

from arq import func
from arq.connections import RedisSettings
from attr import frozen
from attr import frozen, evolve
from attrs import define, field
from fixcloudutils.redis.cache import RedisCache
from fixcloudutils.redis.worker_queue import WorkDispatcher, WorkerInstance
from fixcloudutils.service import Service
Expand Down Expand Up @@ -155,6 +157,24 @@ class InventorySummary:
buckets_size_bytes_progress: Tuple[int, int]


@define
class AccountRegionSummary:
account_id: str
cloud_id: str
count: Dict[str, int] = field(factory=lambda: defaultdict(int))
regions: Dict[str, Dict[str, int]] = field(factory=lambda: defaultdict(lambda: defaultdict(int)))

def for_regions(self, regions: List[str]) -> AccountRegionSummary:
return evolve(self, regions={r: count for r, count in self.regions.items() if r in regions})

def for_kinds(self, kinds: List[str]) -> AccountRegionSummary:
return evolve(
self,
count={k: count for k, count in self.count.items() if k in kinds},
regions={r: {k: v for k, v in kc.items() if k in kinds} for r, kc in self.regions.items()},
)


class InventoryService(Service):
def __init__(
self,
Expand Down Expand Up @@ -861,29 +881,82 @@ async def overall_score() -> Tuple[int, int]:

return await self.cache.call(compute_inventory_info, key=str(dba.workspace_id))(duration)

async def descendant_summary(self, dba: GraphDatabaseAccess) -> Dict[str, KindUsage]:
async def descendant_summary(
self,
dba: GraphDatabaseAccess,
cloud_ids: List[str],
account_ids: List[str],
region_ids: List[str],
kinds: List[str],
) -> Dict[str, KindUsage]:

async def account_region_summary() -> Dict[str, AccountRegionSummary]:
account_usage: Dict[str, AccountRegionSummary] = {}

def get_account_usage(account_id: str, cloud_id: str) -> AccountRegionSummary:
au = account_usage.get(account_id)
if not au:
au = AccountRegionSummary(account_id=account_id, cloud_id=cloud_id)
account_usage[account_id] = au
return au

async def compute_descendant_summary() -> Dict[str, KindUsage]:
account_usage: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
region_usage: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
async with self.client.search(dba, "is(account) and /metadata.descendant_count>0") as response:
async for acc in response:
if account_id := value_in_path(acc, "reported.id"):
if (cloud_id := value_in_path(acc, "ancestors.cloud.reported.id")) and (
account_id := value_in_path(acc, "reported.id")
):
descendant_summary = value_in_path(acc, "metadata.descendant_summary") or {}
account_summary = get_account_usage(account_id, cloud_id)
for descendant_kind, count in descendant_summary.items():
account_usage[descendant_kind][account_id] += count
account_summary.count[descendant_kind] += count

async with self.client.search(dba, "is(region) and /metadata.descendant_count>0") as response:
async for acc in response:
if region_id := value_in_path(acc, "reported.id"):
if (
(cloud_id := value_in_path(acc, "ancestors.cloud.reported.id"))
and (account_id := value_in_path(acc, "ancestors.account.reported.id"))
and (region_id := value_in_path(acc, "reported.id"))
):
account_summary = get_account_usage(account_id, cloud_id)
descendant_summary = value_in_path(acc, "metadata.descendant_summary") or {}
for descendant_kind, count in descendant_summary.items():
region_usage[descendant_kind][region_id] += count
kind_usage: Dict[str, KindUsage] = defaultdict(KindUsage)
for kind, accounts in account_usage.items():
kind_usage[kind].accounts = len(accounts)
kind_usage[kind].resources = sum(accounts.values())
account_summary.regions[region_id][descendant_kind] += count
return account_usage

summary = await self.cache.call(account_region_summary, key=str(dba.workspace_id))()
if cloud_ids:
summary = {k: v for k, v in summary.items() if v.cloud_id in cloud_ids}
if account_ids:
summary = {k: v for k, v in summary.items() if v.account_id in account_ids}
if region_ids:
summary = {k: v.for_regions(region_ids) for k, v in summary.items()}
if kinds:
summary = {k: v.for_kinds(kinds) for k, v in summary.items()}

account_usage: Dict[str, int] = defaultdict(int)
region_usage: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
account_region_usage: Dict[str, Set[str]] = defaultdict(set)
for account in summary.values():
for kind, count in account.count.items():
account_usage[kind] += count
for region, region_kinds in account.regions.items():
for kind, count in region_kinds.items():
account_region_usage[kind].add(account.account_id)
region_usage[kind][region] += count
kind_usage: Dict[str, KindUsage] = defaultdict(KindUsage)
if region_ids:
for kind, regions in region_usage.items():
kind_usage[kind].regions = len(regions)
return kind_usage

return await self.cache.call(compute_descendant_summary, key=str(dba.workspace_id))()
kd = kind_usage[kind]
kd.accounts = len(account_region_usage[kind])
kd.regions = len(regions)
kd.resources = sum(regions.values())
else:
for kind, count in account_usage.items():
kd = kind_usage[kind]
kd.accounts += 1
kd.regions = len(region_usage[kind])
kd.resources += count

for kind, regions in region_usage.items():
kind_usage[kind].regions = len(regions)
return kind_usage

0 comments on commit 95408c7

Please sign in to comment.