From 707a184444f2894277ddaca63c183148fdb3b879 Mon Sep 17 00:00:00 2001 From: driazati Date: Thu, 15 Sep 2022 13:08:38 -0700 Subject: [PATCH] fix lint 1 --- .github/workflows/lint.yml | 2 +- dev/determine_shards.py | 42 +++++-- dev/utils/__init__.py | 0 dev/utils/db.py | 2 +- dev/utils/forward.py | 238 ++++++------------------------------- dev/utils/net.py | 6 +- dev/utils/schema.py | 15 +-- dev/utils/utils.py | 5 +- 8 files changed, 82 insertions(+), 228 deletions(-) create mode 100644 dev/utils/__init__.py diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 13a00e3..dde32f5 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -56,4 +56,4 @@ jobs: VALIDATE_BASH: false VALIDATE_TERRAFORM_TERRASCAN: false VALIDATE_MARKDOWN: false - FILTER_REGEX_EXCLUDE: monitoring/.* + FILTER_REGEX_EXCLUDE: (monitoring/.*)|(dev/.*) diff --git a/dev/determine_shards.py b/dev/determine_shards.py index d1ea82c..7ffc51e 100644 --- a/dev/determine_shards.py +++ b/dev/determine_shards.py @@ -1,14 +1,17 @@ import argparse import asyncio +import math import re import statistics -import math -import rich - from typing import * +import rich + from utils import forward from utils.forward import * +from utils.net import init +from utils.schema import Build, Stage +from utils.utils import init_log def is_parallelizable(name: str, desc: str) -> bool: @@ -31,7 +34,21 @@ def is_parallelizable(name: str, desc: str) -> bool: return False -def analyze_stages(stage_name: str, stages: List[Stage], goal_runtime_m: float): +def find_existing_shards(stage_name: str, template: str): + with open(template) as f: + content = f.read() + + m = re.search(f'name="{stage_name}"(.*\n)+?.*num_shards=(\d+)', content, flags=re.MULTILINE) + if m is None: + print(f"Could not find {stage_name} in {template}, is that the right file?") + exit(1) + # print("match", m) + start, end = m.span() + # print(content[start:end]) + return int(m.groups()[1]) + + +def analyze_stages(stage_name: str, stages: List[Stage], goal_runtime_m: float, jenkins_dir: str): steps_across_shards = {} for stage in stages: for step in stage.steps: @@ -63,9 +80,14 @@ def analyze_stages(stage_name: str, stages: List[Stage], goal_runtime_m: float): num_shards = parallelizable_runtime_m / parallel_part num_shards = math.ceil(num_shards) + existing_shards = find_existing_shards(stage_name, jenkins_dir) + print(f" fixed runtime (m): {round(fixed_runtime_m, 2)}") print(f" parallel runtime (m): {round(parallelizable_runtime_m, 2)}") - print(f" required shards: {num_shards}") + if existing_shards == num_shards: + print(f" required shards: {num_shards} (no action required)") + else: + print(f" required shards: change from {existing_shards} to {num_shards} in {jenkins_dir}") def list_steps(build: Build): @@ -101,7 +123,7 @@ def total_rt(stage: Stage): ) -def analyze(build: Build, goal_runtime_m: float): +def analyze(build: Build, goal_runtime_m: float, jenkins_template): test_stages: List[Stage] = [] should_add = False for stage in build.stages: @@ -128,13 +150,13 @@ def analyze(build: Build, goal_runtime_m: float): merged_shards[stage.name] = [stage] for name, stages in merged_shards.items(): - analyze_stages(name, stages, goal_runtime_m) + analyze_stages(name, stages, goal_runtime_m, jenkins_template) async def main(args): async with aiohttp.ClientSession() as s: forward.SESSION = s - data = await fetch_branch(name=args.branch) + data = await fetch_branch(job_name=args.job, name=args.branch) return data @@ -144,8 +166,10 @@ async def main(args): ) parser.add_argument("--runtime-goal-m", required=True) parser.add_argument("--list-steps", action="store_true") + parser.add_argument("--job") parser.add_argument("--branch", default="main") parser.add_argument("--build", default="4082") + parser.add_argument("--jenkins-template") args = parser.parse_args() init(dir=".httpcache") init_log() @@ -157,4 +181,4 @@ async def main(args): list_steps(build) else: print(f"To reach goal runtime of {args.runtime_goal_m} for tests:") - analyze(build, goal_runtime_m=float(args.runtime_goal_m)) + analyze(build, goal_runtime_m=float(args.runtime_goal_m), jenkins_template=args.jenkins_template) diff --git a/dev/utils/__init__.py b/dev/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dev/utils/db.py b/dev/utils/db.py index c993002..c30178d 100644 --- a/dev/utils/db.py +++ b/dev/utils/db.py @@ -1,6 +1,6 @@ import os -from sqlalchemy import create_engine +from sqlalchemy import create_engine from sqlalchemy.dialects.postgresql import insert diff --git a/dev/utils/forward.py b/dev/utils/forward.py index f0dc36d..65b9ee6 100644 --- a/dev/utils/forward.py +++ b/dev/utils/forward.py @@ -1,26 +1,26 @@ """ Scrape Jenkins, send build data to Loki and Postgres """ -import dataclasses -import aiohttp +import argparse import asyncio +import dataclasses import datetime -import sys -import os -import time import json -import argparse +import logging +import os import subprocess +import sys +import time from pathlib import Path +from typing import * -# from sqlalchemy import select -from .utils import * +import aiohttp + +from . import db, schema from .net import * -from typing import * -from . import db -from . import schema -import logging +# from sqlalchemy import select +from .utils import * SESSION = None DEBUG = os.getenv("DEBUG", "0") == "1" @@ -38,7 +38,7 @@ def walk(o, visitor): walk(v, visitor) -async def blue(url: str, use_cache: bool = True, no_slash: bool = False) -> Any: +async def blue(job_name: str, url: str, use_cache: bool = True, no_slash: bool = False) -> Any: if DEBUG: use_cache = True @@ -49,7 +49,7 @@ async def blue(url: str, use_cache: bool = True, no_slash: bool = False) -> Any: raise RuntimeError("SESSION is None") r = await aioget( - f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/branches/{url}", + f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/{job_name}/branches/{url}", session=SESSION, use_cache=use_cache, ) @@ -135,7 +135,7 @@ def parse_date(d: str) -> datetime.datetime: async def fetch_stage( - branch_name: str, build: Build, stage_data: Dict[str, Any] + job_name: str, branch_name: str, build: Build, stage_data: Dict[str, Any] ) -> Stage: stage = Stage( name=stage_data["displayName"], @@ -147,24 +147,24 @@ async def fetch_stage( result=stage_data["result"], id=stage_data["id"], parent=stage_data["firstParent"], - url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/detail/{branch_name}/{build.id}/pipeline/{stage_data['id']}", + url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/detail/{branch_name}/{build.id}/pipeline/{stage_data['id']}", steps=[], ) - steps_data = await blue(f"{branch_name}/runs/{build.id}/nodes/{stage.id}/steps") + steps_data = await blue(job_name, f"{branch_name}/runs/{build.id}/nodes/{stage.id}/steps") for step_data in steps_data: - stage.steps.append(await fetch_step(branch_name, build, stage, step_data)) + stage.steps.append(await fetch_step(job_name, branch_name, build, stage, step_data)) return stage async def fetch_step( - branch_name: str, build: Build, stage: Stage, step_data: Dict[str, Any] + job_name: str, branch_name: str, build: Build, stage: Stage, step_data: Dict[str, Any] ) -> Step: id = step_data["id"] - log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/branches/{branch_name}/runs/{build.id}/nodes/{stage.id}/steps/{id}/log/" - # log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/branches/{branch_name}/runs/{build.id}/steps/{id}/log/" + log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/{job_name}/branches/{branch_name}/runs/{build.id}/nodes/{stage.id}/steps/{id}/log/" + # log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/{job_name}/branches/{branch_name}/runs/{build.id}/steps/{id}/log/" # log = await aioget(log_url, session=SESSION) log = "dog" return Step( @@ -176,12 +176,12 @@ async def fetch_step( description=step_data["displayDescription"], log_url=log_url, log=log, - url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/detail/{branch_name}/{build.id}/pipeline/{stage.id}#step-{step_data['id']}", + url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/detail/{branch_name}/{build.id}/pipeline/{stage.id}#step-{step_data['id']}", duration_ms=int(step_data["durationInMillis"]), ) -async def fetch_build(branch_name: str, build_data: Dict[str, Any]) -> Build: +async def fetch_build(job_name: str, branch_name: str, build_data: Dict[str, Any]) -> Build: queued_at = parse_date(build_data["enQueueTime"]) started_at = parse_date(build_data["startTime"]) ended_at = parse_date(build_data["endTime"]) @@ -192,13 +192,13 @@ async def fetch_build(branch_name: str, build_data: Dict[str, Any]) -> Build: if causes is None: causes = [] - test_summary = await blue(f"{branch_name}/runs/{build_data['id']}/blueTestSummary") + test_summary = await blue(job_name, f"{branch_name}/runs/{build_data['id']}/blueTestSummary") build = Build( causes=[c["shortDescription"] for c in causes], id=build_data["id"], - url=f"https://ci.tlcpack.ai/job/tvm/job/{branch_name}/{build_data['id']}/", - blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/detail/{branch_name}/{build_data['id']}/pipeline", + url=f"https://ci.tlcpack.ai/job/{job_name}/job/{branch_name}/{build_data['id']}/", + blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/detail/{branch_name}/{build_data['id']}/pipeline", state=build_data["state"], result=build_data["result"], queued_at=queued_at, @@ -217,26 +217,26 @@ async def fetch_build(branch_name: str, build_data: Dict[str, Any]) -> Build: total_tests=test_summary["total"], ) - nodes_data = await blue(f"{branch_name}/runs/{build.id}/nodes") + nodes_data = await blue(job_name, f"{branch_name}/runs/{build.id}/nodes") for stage_data in nodes_data: - build.stages.append(await fetch_stage(branch_name, build, stage_data)) + build.stages.append(await fetch_stage(job_name, branch_name, build, stage_data)) return build -async def fetch_branch(name): +async def fetch_branch(job_name: str, name: str): logging.info(f"Fetching branch {name}") - branch_data = await blue(f"{name}", use_cache=False) + branch_data = await blue(job_name, f"{name}", use_cache=False) branch = Branch( name=name, full_name=branch_data["fullName"], - url=f"https://ci.tlcpack.ai/job/tvm/job/{name}/", - blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/activity?branch={name}", + url=f"https://ci.tlcpack.ai/job/{job_name}/job/{name}/", + blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/activity?branch={name}", builds=[], ) # Jenkins only fetches the last 100 by default - builds = await blue(f"{name}/runs", use_cache=False) + builds = await blue(job_name, f"{name}/runs", use_cache=False) logging.info(f"Found {len(builds)} builds for branch {name}") builds = list(reversed(sorted(builds, key=lambda b: int(b["id"])))) for build_data in builds: @@ -244,178 +244,8 @@ async def fetch_branch(name): # Only look at completed builds continue - branch.builds.append(await fetch_build(name, build_data)) + branch.builds.append(await fetch_build(job_name, name, build_data)) break return branch - -def upload_log(branch: Branch, build: Build, stage: Stage, step: Step) -> None: - LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" - lines = step.log.split("\n") - lines = [[str(time.time_ns()), line[27:]] for line in lines if line.strip() != ""] - - url = f"http://{LOKI_HOST}/loki/api/v1/push" - headers = {"Content-type": "application/json"} - labels = { - "branch_name": branch.name, - "branch_full_name": branch.full_name, - "branch_url": branch.url, - "branch_blue_url": branch.blue_url, - "build_id": build.id, - "build_url": build.url, - "build_state": build.state, - "build_commit": build.commit, - "stage_name": stage.name, - "stage_id": stage.id, - "stage_state": stage.state, - "stage_result": stage.result, - "step_name": step.name, - "step_id": step.id, - "step_result": step.result, - "step_state": step.state, - "step_description": step.description, - "step_log_url": step.log_url, - } - - payload = {"streams": [{"stream": labels, "values": lines}]} - payload = json.dumps(payload) - logging.info(f"Uploading logs for {stage.name} / {step.name} at {step.url}") - r = requests.post(url, data=payload, headers=headers) - logging.info(f"Loki responded {r}") - if r.status_code >= 200 and r.status_code < 300: - print("ok") - else: - r = json.loads(r.content.decode()) - jprint(r) - raise RuntimeError(f"Failed to upload: {r['message']}") - - -def upload_sql(branch: Branch, build: Build) -> None: - engine = db.get_engine(db.connection_string("tvm")) - - def db_dict(table, obj, stage=None): - names = [c.name for c in table.columns] - r = {} - for n in names: - if hasattr(obj, n): - r[n] = getattr(obj, n) - else: - if n == "branch_name": - v = branch.name - elif n == "build_id": - v = build.id - elif n == "stage_id": - v = stage.id - r[n] = v - return r - - def write(conn, table, obj, stage=None): - db.upsert(conn, table, db_dict(table, obj, stage)) - - logging.info( - f"[db] Writing {len(build.stages)} stages on build {build.id} for {branch.name} ({build.blue_url})" - ) - with engine.connect() as conn: - for stage in build.stages: - for step in stage.steps: - write(conn, schema.step, step, stage=stage) - write(conn, schema.stage, stage) - write(conn, schema.branch, branch) - write(conn, schema.build, build) - - -async def fetch_and_store_branch(name: str) -> Branch: - logging.info(f"Fetching branch {name}") - branch_data = await blue(f"{name}", use_cache=False) - branch = Branch( - name=name, - full_name=branch_data["fullName"], - url=f"https://ci.tlcpack.ai/job/tvm/job/{name}/", - blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/activity?branch={name}", - builds=[], - ) - - logging.info("Querying existing builds") - engine = db.get_engine(db.connection_string("tvm")) - - # Jenkins only fetches the last 100 by default - builds = await blue(f"{name}/runs", use_cache=False) - logging.info(f"Found {len(builds)} builds for branch {name}") - builds = list(reversed(sorted(builds, key=lambda b: int(b["id"])))) - # builds = [builds[10]] - for build_data in builds: - if build_data["state"] != "FINISHED": - # Only look at completed builds - logging.info(f"Build {build_data['id']} is incomplete, skipping") - continue - - # If this build is in the DB already don't bother with it again - build_id = int(build_data["id"]) - with engine.connect() as conn: - s = ( - select(schema.build.c.branch_name, schema.build.c.id) - .where(schema.build.c.id == build_id) - .where(schema.build.c.branch_name == branch.name) - ) - result = conn.execute(s) - if result.rowcount != 0: - logging.info(f"Found build {build_id} in DB, skipping") - continue - else: - logging.info(f"Fetching build {build_id}") - - build = await fetch_build(name, build_data) - logging.info( - f"Uploading branch {branch.name} build {build.id} builds to DB and Loki" - ) - for stage in build.stages: - for step in stage.steps: - upload_log(branch, build, stage, step) - upload_sql(branch, build) - branch.builds.append(await fetch_build(name, build_data)) - - return branch - - -async def main(args): - global SESSION - async with aiohttp.ClientSession() as s: - SESSION = s - - runs = await aioget( - "https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/runs/", - session=s, - use_cache=False, - ) - runs = json.loads(runs) - branch_names = set(r["pipeline"] for r in runs) - branch_names.add("main") - branch_names = list(sorted(branch_names, key=lambda a: 0 if a == "main" else 1)) - # branch_names = ["main"] - - for name in branch_names: - await fetch_and_store_branch(name) - time.sleep(1) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Fetch Jenkins data and store it in Postgres + Loki" - ) - parser.add_argument("--forever", action="store_true", help="loop and re-fetch") - parser.add_argument( - "--wait-minutes", default=15, type=int, help="sleep time while looping" - ) - args = parser.parse_args() - init(dir=".httpcache") - init_log() - - if args.forever: - subprocess.run([sys.executable, str(SCHEMA_SCRIPT)], check=True) - while True: - asyncio.run(main(args)) - logging.info(f"Sleeping for {args.wait_minutes} minutes") - time.sleep(args.wait_minutes * 60) - else: - asyncio.run(main(args)) diff --git a/dev/utils/net.py b/dev/utils/net.py index 674f9a4..4616492 100644 --- a/dev/utils/net.py +++ b/dev/utils/net.py @@ -1,9 +1,9 @@ -import requests -import subprocess -import os import logging +import os +import subprocess from pathlib import Path +import requests CACHE_DIR = None DEBUG = os.getenv("DEBUG", "0") == "1" diff --git a/dev/utils/schema.py b/dev/utils/schema.py index 7aa9e25..21ae5fc 100644 --- a/dev/utils/schema.py +++ b/dev/utils/schema.py @@ -1,20 +1,21 @@ +from typing import Any, Dict + import sqlalchemy -from sqlalchemy.orm import declarative_base from sqlalchemy import ( + JSON, + Boolean, Column, + DateTime, Integer, String, - Boolean, - DateTime, - JSON, Text, + column, + table, ) +from sqlalchemy.orm import declarative_base from sqlalchemy.sql.sqltypes import Float -from sqlalchemy import table, column from . import db -from typing import Dict, Any - Base = declarative_base() diff --git a/dev/utils/utils.py b/dev/utils/utils.py index 6f8b286..34eface 100644 --- a/dev/utils/utils.py +++ b/dev/utils/utils.py @@ -1,10 +1,9 @@ -import json -import sys import asyncio +import json import logging +import sys from pathlib import Path - REPO_ROOT = Path(__file__).resolve().parent