From 954a3d3acfc1e8e4e96b1734ea997076f800af76 Mon Sep 17 00:00:00 2001 From: driazati Date: Tue, 23 Aug 2022 13:01:47 -0700 Subject: [PATCH 1/2] Add script to determine ideal number of shards This re-uses some code from the `monitoring/` dir with random modifications (maybe in the future we could de-duplicate these) but for now the code is fairly spaghetti-esque. `determine_shards.py` helps generate content for PRs like https://github.com/apache/tvm/pull/12473 --- .gitignore | 1 + dev/.gitignore | 1 + dev/README.md | 15 ++ dev/determine_shards.py | 160 +++++++++++++++ dev/utils/db.py | 51 +++++ dev/utils/forward.py | 421 ++++++++++++++++++++++++++++++++++++++++ dev/utils/net.py | 80 ++++++++ dev/utils/schema.py | 157 +++++++++++++++ dev/utils/utils.py | 45 +++++ 9 files changed, 931 insertions(+) create mode 100644 .gitignore create mode 100644 dev/.gitignore create mode 100644 dev/README.md create mode 100644 dev/determine_shards.py create mode 100644 dev/utils/db.py create mode 100644 dev/utils/forward.py create mode 100644 dev/utils/net.py create mode 100644 dev/utils/schema.py create mode 100644 dev/utils/utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..c18dd8d8 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/dev/.gitignore b/dev/.gitignore new file mode 100644 index 00000000..8baa2d67 --- /dev/null +++ b/dev/.gitignore @@ -0,0 +1 @@ +.httpcache/ diff --git a/dev/README.md b/dev/README.md new file mode 100644 index 00000000..ab545fbf --- /dev/null +++ b/dev/README.md @@ -0,0 +1,15 @@ +# developer scripts + +This is a collection of random scripts that are helpful to developing on TVM's CI. As one-off scripts these do not conform to the usual TVM quality standards which is why they are stored out of tree. + +## `determine_shards.py` + +Given a goal runtime for each test shard and a Jenkins job, print out the number of shards that should be used for each step. + +```bash +# print out number of shards per test step +python determine_shards.py --runtime-goal-m 90 --branch PR-12473 + +# see bottleneck steps individually +python determine_shards.py --runtime-goal-m 90 --branch PR-12473 --list-steps +``` \ No newline at end of file diff --git a/dev/determine_shards.py b/dev/determine_shards.py new file mode 100644 index 00000000..d1ea82ca --- /dev/null +++ b/dev/determine_shards.py @@ -0,0 +1,160 @@ +import argparse +import asyncio +import re +import statistics +import math +import rich + +from typing import * + +from utils import forward +from utils.forward import * + + +def is_parallelizable(name: str, desc: str) -> bool: + descs = { + "Run CPU integration tests", + "Run Hexagon tests", + "Run Python GPU integration tests", + "Run Python GPU unit tests", + "Run Python frontend tests", + "Run Python unit tests", + "Run VTA tests in FSIM", + "Run VTA tests in TSIM", + "Run i386 integration tests", + "Run test_arm_compute_lib test", + "Run TOPI tests", + "Run microTVM tests", + } + if name in descs: + return True + return False + + +def analyze_stages(stage_name: str, stages: List[Stage], goal_runtime_m: float): + steps_across_shards = {} + for stage in stages: + for step in stage.steps: + if step.name not in steps_across_shards: + steps_across_shards[step.name] = [] + steps_across_shards[step.name].append(step) + + fixed_runtime_m = 0 + parallelizable_runtime_m = 0 + for name, steps in steps_across_shards.items(): + parallelizable = is_parallelizable(name, "") + median_runtime_m = ( + statistics.median([step.duration_ms for step in steps]) / 1000.0 / 60.0 + ) + total_runtime_m = sum([step.duration_ms for step in steps]) / 1000.0 / 60.0 + if parallelizable: + parallelizable_runtime_m += total_runtime_m + else: + fixed_runtime_m += median_runtime_m + + parallel_part = goal_runtime_m - fixed_runtime_m + print(stage_name) + if parallel_part <= 0: + print( + f" fixed runtime is too long ({round(fixed_runtime_m, 2)}), cannot reach goal time" + ) + return + + num_shards = parallelizable_runtime_m / parallel_part + num_shards = math.ceil(num_shards) + + print(f" fixed runtime (m): {round(fixed_runtime_m, 2)}") + print(f" parallel runtime (m): {round(parallelizable_runtime_m, 2)}") + print(f" required shards: {num_shards}") + + +def list_steps(build: Build): + def total_rt(stage: Stage): + return sum(step.duration_ms for step in stage.steps) + + build.stages = sorted(build.stages, key=total_rt) + print("For build at", build.blue_url) + for stage in build.stages: + if stage.name in {"Build", "Test", "Deploy"}: + continue + total = sum(step.duration_ms for step in stage.steps) + if len(stage.steps) == 0: + rich.print(f"{stage.name}: skipped") + continue + median = statistics.median([step.duration_ms for step in stage.steps]) + m75 = statistics.median( + [step.duration_ms for step in stage.steps if step.duration_ms > median] + ) + rich.print(f"{stage.name}: {round(total /1000.0/60.0)}m") + for step in stage.steps: + if step.duration_ms > m75: + rich.print( + f" [bold red]{step.name}[/bold red]: {round(step.duration_ms / 1000.0 / 60.0, 2)}" + ) + elif step.duration_ms > median: + rich.print( + f" [magenta]{step.name}[/magenta]: {round(step.duration_ms / 1000.0 / 60.0, 2)}" + ) + else: + rich.print( + f" {step.name}: {round(step.duration_ms / 1000.0 / 60.0, 2)}" + ) + + +def analyze(build: Build, goal_runtime_m: float): + test_stages: List[Stage] = [] + should_add = False + for stage in build.stages: + if stage.name == "Test": + should_add = True + elif stage.name == "Deploy": + should_add = False + elif should_add: + test_stages.append(stage) + + names_to_stages = {} + for stage in test_stages: + names_to_stages[stage.name] = stage + + merged_shards = {} + for stage in test_stages: + m = re.match(r"(.*) \d+ of \d+", stage.name) + if m: + base_name = m.groups()[0] + if base_name not in merged_shards: + merged_shards[base_name] = [] + merged_shards[base_name].append(stage) + else: + merged_shards[stage.name] = [stage] + + for name, stages in merged_shards.items(): + analyze_stages(name, stages, goal_runtime_m) + + +async def main(args): + async with aiohttp.ClientSession() as s: + forward.SESSION = s + data = await fetch_branch(name=args.branch) + return data + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Determine number of Jenkins shards to use" + ) + parser.add_argument("--runtime-goal-m", required=True) + parser.add_argument("--list-steps", action="store_true") + parser.add_argument("--branch", default="main") + parser.add_argument("--build", default="4082") + args = parser.parse_args() + init(dir=".httpcache") + init_log() + + branch = asyncio.run(main(args)) + build = branch.builds[0] + + if args.list_steps: + list_steps(build) + else: + print(f"To reach goal runtime of {args.runtime_goal_m} for tests:") + analyze(build, goal_runtime_m=float(args.runtime_goal_m)) diff --git a/dev/utils/db.py b/dev/utils/db.py new file mode 100644 index 00000000..c9930026 --- /dev/null +++ b/dev/utils/db.py @@ -0,0 +1,51 @@ +import os +from sqlalchemy import create_engine + +from sqlalchemy.dialects.postgresql import insert + + +def connection_string(db="tvm"): + host = os.environ["db_host"] + password = os.environ["db_password"] + user = os.environ["db_user"] + + if db is None: + return f"postgresql://{user}:{password}@{host}" + else: + return f"postgresql://{user}:{password}@{host}/{db}" + + +engine = None + + +def get_engine(connection_string: str): + global engine + if engine is None: + engine = create_engine(connection_string, echo=bool(os.getenv("ECHO", False))) + + return engine + + +def clear_engine(): + global engine + engine = None + + +def upsert(engine, model, insert_dict): + """ + Insert or update to an engine backed by MySQL + """ + inserted = insert(model).values(**insert_dict) + # MySQL version: + # upserted = inserted.on_duplicate_key_update( + # **{k: inserted.inserted[k] for k, v in insert_dict.items()} + # ) + + # Postgres version: + upserted = inserted.on_conflict_do_update( + index_elements=model._pks, + # index_where=my_table.c.user_email.like("%@gmail.com"), + set_=insert_dict, + ) + res = engine.execute(upserted) + return res.lastrowid diff --git a/dev/utils/forward.py b/dev/utils/forward.py new file mode 100644 index 00000000..f0dc36de --- /dev/null +++ b/dev/utils/forward.py @@ -0,0 +1,421 @@ +""" +Scrape Jenkins, send build data to Loki and Postgres +""" +import dataclasses +import aiohttp +import asyncio +import datetime +import sys +import os +import time +import json +import argparse +import subprocess +from pathlib import Path + +# from sqlalchemy import select +from .utils import * +from .net import * +from typing import * + +from . import db +from . import schema +import logging + +SESSION = None +DEBUG = os.getenv("DEBUG", "0") == "1" +SCHEMA_SCRIPT = Path(__file__).resolve().parent / "schema.py" +# LOKI_HOST = os.environ["loki_host"] + + +def walk(o, visitor): + visitor(o) + if isinstance(o, dict): + for k, v in o.items(): + walk(v, visitor) + elif isinstance(o, list): + for v in o: + walk(v, visitor) + + +async def blue(url: str, use_cache: bool = True, no_slash: bool = False) -> Any: + if DEBUG: + use_cache = True + + if not no_slash and not url.endswith("/"): + url = url + "/" + + if SESSION is None: + raise RuntimeError("SESSION is None") + + r = await aioget( + f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/branches/{url}", + session=SESSION, + use_cache=use_cache, + ) + r = json.loads(r) + # These just clog up stuff for debugging + def cleaner(o): + if isinstance(o, dict): + if "_links" in o: + del o["_links"] + if "_class" in o: + del o["_class"] + + walk(r, cleaner) + return r + + +@dataclasses.dataclass +class Step: + name: str + id: int + result: str + started_at: datetime.datetime + state: str + description: str + log_url: str + duration_ms: int + url: str + log: str + + +@dataclasses.dataclass +class Stage: + name: str + id: int + duration_ms: int + state: str + result: str + started_at: datetime.datetime + parent: Optional["Stage"] + url: str + steps: List[Step] + + +@dataclasses.dataclass +class Build: + causes: List[str] + id: int + url: str + state: str + result: str + run_time_ms: int + queue_time_ms: int + queued_at: datetime.datetime + started_at: datetime.datetime + ended_at: datetime.datetime + duration_ms: int + commit: str + blue_url: str + failed_tests: int + fixed_tests: int + passed_tests: int + regressed_tests: int + skipped_tests: int + total_tests: int + stages: List[Stage] + + +@dataclasses.dataclass +class Branch: + name: str + full_name: str + url: str + blue_url: str + builds: List[Build] + + +# A branch has a number of builds which have nodes that are made of steps +DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%f%z" + + +def parse_date(d: str) -> datetime.datetime: + return datetime.datetime.strptime(d, DATE_FORMAT) + + +async def fetch_stage( + branch_name: str, build: Build, stage_data: Dict[str, Any] +) -> Stage: + stage = Stage( + name=stage_data["displayName"], + started_at=None + if stage_data["startTime"] is None + else parse_date(stage_data["startTime"]), + duration_ms=int(stage_data["durationInMillis"]), + state=stage_data["state"], + result=stage_data["result"], + id=stage_data["id"], + parent=stage_data["firstParent"], + url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/detail/{branch_name}/{build.id}/pipeline/{stage_data['id']}", + steps=[], + ) + + steps_data = await blue(f"{branch_name}/runs/{build.id}/nodes/{stage.id}/steps") + + for step_data in steps_data: + stage.steps.append(await fetch_step(branch_name, build, stage, step_data)) + + return stage + + +async def fetch_step( + branch_name: str, build: Build, stage: Stage, step_data: Dict[str, Any] +) -> Step: + id = step_data["id"] + log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/branches/{branch_name}/runs/{build.id}/nodes/{stage.id}/steps/{id}/log/" + # log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/branches/{branch_name}/runs/{build.id}/steps/{id}/log/" + # log = await aioget(log_url, session=SESSION) + log = "dog" + return Step( + name=step_data["displayName"], + id=step_data["id"], + result=step_data["result"], + started_at=parse_date(step_data["startTime"]), + state=step_data["state"], + description=step_data["displayDescription"], + log_url=log_url, + log=log, + url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/detail/{branch_name}/{build.id}/pipeline/{stage.id}#step-{step_data['id']}", + duration_ms=int(step_data["durationInMillis"]), + ) + + +async def fetch_build(branch_name: str, build_data: Dict[str, Any]) -> Build: + queued_at = parse_date(build_data["enQueueTime"]) + started_at = parse_date(build_data["startTime"]) + ended_at = parse_date(build_data["endTime"]) + + queue_time_ms = int((started_at - queued_at).total_seconds() * 1000) + run_time_ms = int((ended_at - started_at).total_seconds() * 1000) + causes = build_data["causes"] + if causes is None: + causes = [] + + test_summary = await blue(f"{branch_name}/runs/{build_data['id']}/blueTestSummary") + + build = Build( + causes=[c["shortDescription"] for c in causes], + id=build_data["id"], + url=f"https://ci.tlcpack.ai/job/tvm/job/{branch_name}/{build_data['id']}/", + blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/detail/{branch_name}/{build_data['id']}/pipeline", + state=build_data["state"], + result=build_data["result"], + queued_at=queued_at, + started_at=started_at, + ended_at=ended_at, + run_time_ms=run_time_ms, + queue_time_ms=queue_time_ms, + duration_ms=int(build_data["durationInMillis"]), + commit=build_data["commitId"], + stages=[], + failed_tests=test_summary["failed"], + fixed_tests=test_summary["fixed"], + passed_tests=test_summary["passed"], + regressed_tests=test_summary["regressions"], + skipped_tests=test_summary["skipped"], + total_tests=test_summary["total"], + ) + + nodes_data = await blue(f"{branch_name}/runs/{build.id}/nodes") + for stage_data in nodes_data: + build.stages.append(await fetch_stage(branch_name, build, stage_data)) + + return build + + +async def fetch_branch(name): + logging.info(f"Fetching branch {name}") + branch_data = await blue(f"{name}", use_cache=False) + branch = Branch( + name=name, + full_name=branch_data["fullName"], + url=f"https://ci.tlcpack.ai/job/tvm/job/{name}/", + blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/activity?branch={name}", + builds=[], + ) + + # Jenkins only fetches the last 100 by default + builds = await blue(f"{name}/runs", use_cache=False) + logging.info(f"Found {len(builds)} builds for branch {name}") + builds = list(reversed(sorted(builds, key=lambda b: int(b["id"])))) + for build_data in builds: + if build_data["state"] != "FINISHED": + # Only look at completed builds + continue + + branch.builds.append(await fetch_build(name, build_data)) + break + + return branch + + +def upload_log(branch: Branch, build: Build, stage: Stage, step: Step) -> None: + LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" + lines = step.log.split("\n") + lines = [[str(time.time_ns()), line[27:]] for line in lines if line.strip() != ""] + + url = f"http://{LOKI_HOST}/loki/api/v1/push" + headers = {"Content-type": "application/json"} + labels = { + "branch_name": branch.name, + "branch_full_name": branch.full_name, + "branch_url": branch.url, + "branch_blue_url": branch.blue_url, + "build_id": build.id, + "build_url": build.url, + "build_state": build.state, + "build_commit": build.commit, + "stage_name": stage.name, + "stage_id": stage.id, + "stage_state": stage.state, + "stage_result": stage.result, + "step_name": step.name, + "step_id": step.id, + "step_result": step.result, + "step_state": step.state, + "step_description": step.description, + "step_log_url": step.log_url, + } + + payload = {"streams": [{"stream": labels, "values": lines}]} + payload = json.dumps(payload) + logging.info(f"Uploading logs for {stage.name} / {step.name} at {step.url}") + r = requests.post(url, data=payload, headers=headers) + logging.info(f"Loki responded {r}") + if r.status_code >= 200 and r.status_code < 300: + print("ok") + else: + r = json.loads(r.content.decode()) + jprint(r) + raise RuntimeError(f"Failed to upload: {r['message']}") + + +def upload_sql(branch: Branch, build: Build) -> None: + engine = db.get_engine(db.connection_string("tvm")) + + def db_dict(table, obj, stage=None): + names = [c.name for c in table.columns] + r = {} + for n in names: + if hasattr(obj, n): + r[n] = getattr(obj, n) + else: + if n == "branch_name": + v = branch.name + elif n == "build_id": + v = build.id + elif n == "stage_id": + v = stage.id + r[n] = v + return r + + def write(conn, table, obj, stage=None): + db.upsert(conn, table, db_dict(table, obj, stage)) + + logging.info( + f"[db] Writing {len(build.stages)} stages on build {build.id} for {branch.name} ({build.blue_url})" + ) + with engine.connect() as conn: + for stage in build.stages: + for step in stage.steps: + write(conn, schema.step, step, stage=stage) + write(conn, schema.stage, stage) + write(conn, schema.branch, branch) + write(conn, schema.build, build) + + +async def fetch_and_store_branch(name: str) -> Branch: + logging.info(f"Fetching branch {name}") + branch_data = await blue(f"{name}", use_cache=False) + branch = Branch( + name=name, + full_name=branch_data["fullName"], + url=f"https://ci.tlcpack.ai/job/tvm/job/{name}/", + blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/activity?branch={name}", + builds=[], + ) + + logging.info("Querying existing builds") + engine = db.get_engine(db.connection_string("tvm")) + + # Jenkins only fetches the last 100 by default + builds = await blue(f"{name}/runs", use_cache=False) + logging.info(f"Found {len(builds)} builds for branch {name}") + builds = list(reversed(sorted(builds, key=lambda b: int(b["id"])))) + # builds = [builds[10]] + for build_data in builds: + if build_data["state"] != "FINISHED": + # Only look at completed builds + logging.info(f"Build {build_data['id']} is incomplete, skipping") + continue + + # If this build is in the DB already don't bother with it again + build_id = int(build_data["id"]) + with engine.connect() as conn: + s = ( + select(schema.build.c.branch_name, schema.build.c.id) + .where(schema.build.c.id == build_id) + .where(schema.build.c.branch_name == branch.name) + ) + result = conn.execute(s) + if result.rowcount != 0: + logging.info(f"Found build {build_id} in DB, skipping") + continue + else: + logging.info(f"Fetching build {build_id}") + + build = await fetch_build(name, build_data) + logging.info( + f"Uploading branch {branch.name} build {build.id} builds to DB and Loki" + ) + for stage in build.stages: + for step in stage.steps: + upload_log(branch, build, stage, step) + upload_sql(branch, build) + branch.builds.append(await fetch_build(name, build_data)) + + return branch + + +async def main(args): + global SESSION + async with aiohttp.ClientSession() as s: + SESSION = s + + runs = await aioget( + "https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/runs/", + session=s, + use_cache=False, + ) + runs = json.loads(runs) + branch_names = set(r["pipeline"] for r in runs) + branch_names.add("main") + branch_names = list(sorted(branch_names, key=lambda a: 0 if a == "main" else 1)) + # branch_names = ["main"] + + for name in branch_names: + await fetch_and_store_branch(name) + time.sleep(1) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Fetch Jenkins data and store it in Postgres + Loki" + ) + parser.add_argument("--forever", action="store_true", help="loop and re-fetch") + parser.add_argument( + "--wait-minutes", default=15, type=int, help="sleep time while looping" + ) + args = parser.parse_args() + init(dir=".httpcache") + init_log() + + if args.forever: + subprocess.run([sys.executable, str(SCHEMA_SCRIPT)], check=True) + while True: + asyncio.run(main(args)) + logging.info(f"Sleeping for {args.wait_minutes} minutes") + time.sleep(args.wait_minutes * 60) + else: + asyncio.run(main(args)) diff --git a/dev/utils/net.py b/dev/utils/net.py new file mode 100644 index 00000000..674f9a45 --- /dev/null +++ b/dev/utils/net.py @@ -0,0 +1,80 @@ +import requests +import subprocess +import os +import logging +from pathlib import Path + + +CACHE_DIR = None +DEBUG = os.getenv("DEBUG", "0") == "1" + + +def init(dir): + global CACHE_DIR + CACHE_DIR = Path(os.getcwd()).resolve() / dir + CACHE_DIR.mkdir(exist_ok=True, parents=True) + + +def get(url, *args, **kwargs): + use_cache = kwargs.pop("use_cache", True) + + cache_name = url.replace("/", "-") + cached = CACHE_DIR / cache_name + if cached.exists() and use_cache: + with open(cached, "rb") as f: + return f.read() + else: + checker = kwargs.pop("is_fresh", None) + result = requests.get(url, *args, **kwargs) + content = result.content.decode() + if checker is None or checker(url, content): + with open(cached, "w") as f: + f.write(content) + return content + + +def curl(url): + proc = subprocess.run(["curl", "-L", url], stdout=subprocess.PIPE, check=True) + return proc.stdout.decode() + + +async def aioget(url, session, use_cache=True): + if DEBUG: + use_cache = True + cache_name = url.replace("/", "-") + cached = CACHE_DIR / cache_name + if cached.exists() and use_cache: + logging.info(f"GET {url} [cached]") + with open(cached, "r") as f: + return f.read() + else: + if use_cache: + logging.info(f"GET {url} [cache miss]") + else: + logging.info(f"GET {url} [cache disabled]") + result = await session.get(url) + text = await result.text() + with open(cached, "w") as f: + f.write(text) + return text + + +async def aiogetc(url, session, use_cache=True): + if DEBUG: + use_cache = True + cache_name = url.replace("/", "-") + cached = CACHE_DIR / cache_name + if cached.exists() and use_cache: + logging.info(f"GET {url} [cached]") + with open(cached, "r") as f: + return f.read(), 200 + else: + if use_cache: + logging.info(f"GET {url} [cache miss]") + else: + logging.info(f"GET {url} [cache disabled]") + result = await session.get(url) + text = await result.text() + with open(cached, "w") as f: + f.write(text) + return text, result.status_code diff --git a/dev/utils/schema.py b/dev/utils/schema.py new file mode 100644 index 00000000..7aa9e25b --- /dev/null +++ b/dev/utils/schema.py @@ -0,0 +1,157 @@ +import sqlalchemy +from sqlalchemy.orm import declarative_base +from sqlalchemy import ( + Column, + Integer, + String, + Boolean, + DateTime, + JSON, + Text, +) +from sqlalchemy.sql.sqltypes import Float +from sqlalchemy import table, column + +from . import db +from typing import Dict, Any + + +Base = declarative_base() + + +def gen_table(name: str, columns: Dict[str, Any], base: Any) -> Any: + the_class = type( + name, + (base,), + { + "__tablename__": name, + "__table_args__": {"extend_existing": True}, + **columns, + }, + ) + name = the_class.__tablename__ + model_data = [name] + for k in [k for k in the_class.__dict__.keys() if not k.startswith("_")]: + model_data.append(column(k)) + model = table(*model_data) + model._pks = [k for k, v in columns.items() if v.primary_key] + return model, the_class + + +branch, Branch = gen_table( + "branch", + { + "name": Column(String(300), primary_key=True), + "full_name": Column(String(300)), + "url": Column(String(300)), + "blue_url": Column(String(300)), + }, + Base, +) + +build, Build = gen_table( + "build", + { + "causes": Column(Text), + "id": Column(Integer, primary_key=True), + "url": Column(String(300)), + "blue_url": Column(String(300)), + "state": Column(String(300)), + "result": Column(String(300)), + "queued_at": Column(DateTime), + "started_at": Column(DateTime), + "ended_at": Column(DateTime), + "duration_ms": Column(Integer), + "run_time_ms": Column(Integer), + "queue_time_ms": Column(Integer), + "failed_tests": Column(Integer), + "fixed_tests": Column(Integer), + "passed_tests": Column(Integer), + "regressed_tests": Column(Integer), + "skipped_tests": Column(Integer), + "total_tests": Column(Integer), + "commit": Column(String(300)), + "branch_name": Column(String(300), primary_key=True), + }, + Base, +) + +stage, Stage = gen_table( + "stage", + { + "name": Column(String(300)), + "id": Column(Integer, primary_key=True), + "duration_ms": Column(Integer), + "state": Column(String(300)), + "result": Column(String(300)), + "started_at": Column(DateTime), + "parent": Column(Integer), + "url": Column(String(300)), + "branch_name": Column(String(300), primary_key=True), + "build_id": Column(Integer, primary_key=True), + }, + Base, +) + + +step, Step = gen_table( + "step", + { + "name": Column(Text), + "id": Column(Integer, primary_key=True), + "result": Column(String(300)), + "started_at": Column(DateTime), + "state": Column(String(300)), + "description": Column(Text), + "log_url": Column(String(300)), + "duration_ms": Column(Integer), + "url": Column(String(300)), + "branch_name": Column(String(300), primary_key=True), + "build_id": Column(Integer, primary_key=True), + "stage_id": Column(Integer, primary_key=True), + }, + Base, +) + + +testcase, TestCase = gen_table( + "testcase", + { + "build_id": Column(Integer, primary_key=True), + "branch_name": Column(String(300), primary_key=True), + "blue_url": Column(String(300)), + "status": Column(String(300)), + "state": Column(String(300)), + "duration_ms": Column(Integer), + "stage": Column(String(500)), + "node_id": Column(String(500), primary_key=True), + "name": Column(String(500)), + "parameterless_name": Column(String(500)), + "file_name": Column(String(500)), + }, + Base, +) + + +def create(db_name): + connection = db.connection_string(db=None) + print(connection) + raw = db.get_engine(connection) + from sqlalchemy.orm import sessionmaker + + session = sessionmaker(bind=raw)() + session.connection().connection.set_isolation_level(0) + try: + session.execute(f"CREATE DATABASE {db_name}") + except sqlalchemy.exc.ProgrammingError as e: + if "already exists" not in str(e): + raise e + session.connection().connection.set_isolation_level(1) + db.clear_engine() + + Base.metadata.create_all(db.get_engine(db.connection_string(db_name))) + print("Done") + + +if __name__ == "__main__": + create("tvm") diff --git a/dev/utils/utils.py b/dev/utils/utils.py new file mode 100644 index 00000000..6f8b2867 --- /dev/null +++ b/dev/utils/utils.py @@ -0,0 +1,45 @@ +import json +import sys +import asyncio +import logging +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parent + + +class RelativePathFilter(logging.Filter): + def filter(self, record): + path = Path(record.pathname).resolve() + record.relativepath = str(path.relative_to(REPO_ROOT)) + return True + + +def jprint(o): + print(json.dumps(o, indent=2, default=str)) + + +def sprint(*args): + print(*args, file=sys.stderr) + + +async def gather_with_concurrency(n, *tasks): + semaphore = asyncio.Semaphore(n) + + async def sem_task(task): + async with semaphore: + return await task + + return await asyncio.gather(*(sem_task(task) for task in tasks)) + + +def init_log(): + logging.basicConfig( + format="[%(relativepath)s:%(lineno)d %(levelname)-1s] %(message)s", + level=logging.WARN, + ) + + # Flush on every log call (logging and then calling subprocess.run can make + # the output look confusing) + logging.root.handlers[0].addFilter(RelativePathFilter()) + logging.root.handlers[0].flush = sys.stderr.flush From 707a184444f2894277ddaca63c183148fdb3b879 Mon Sep 17 00:00:00 2001 From: driazati Date: Thu, 15 Sep 2022 13:08:38 -0700 Subject: [PATCH 2/2] fix lint 1 --- .github/workflows/lint.yml | 2 +- dev/determine_shards.py | 42 +++++-- dev/utils/__init__.py | 0 dev/utils/db.py | 2 +- dev/utils/forward.py | 238 ++++++------------------------------- dev/utils/net.py | 6 +- dev/utils/schema.py | 15 +-- dev/utils/utils.py | 5 +- 8 files changed, 82 insertions(+), 228 deletions(-) create mode 100644 dev/utils/__init__.py diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 13a00e3e..dde32f5e 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -56,4 +56,4 @@ jobs: VALIDATE_BASH: false VALIDATE_TERRAFORM_TERRASCAN: false VALIDATE_MARKDOWN: false - FILTER_REGEX_EXCLUDE: monitoring/.* + FILTER_REGEX_EXCLUDE: (monitoring/.*)|(dev/.*) diff --git a/dev/determine_shards.py b/dev/determine_shards.py index d1ea82ca..7ffc51eb 100644 --- a/dev/determine_shards.py +++ b/dev/determine_shards.py @@ -1,14 +1,17 @@ import argparse import asyncio +import math import re import statistics -import math -import rich - from typing import * +import rich + from utils import forward from utils.forward import * +from utils.net import init +from utils.schema import Build, Stage +from utils.utils import init_log def is_parallelizable(name: str, desc: str) -> bool: @@ -31,7 +34,21 @@ def is_parallelizable(name: str, desc: str) -> bool: return False -def analyze_stages(stage_name: str, stages: List[Stage], goal_runtime_m: float): +def find_existing_shards(stage_name: str, template: str): + with open(template) as f: + content = f.read() + + m = re.search(f'name="{stage_name}"(.*\n)+?.*num_shards=(\d+)', content, flags=re.MULTILINE) + if m is None: + print(f"Could not find {stage_name} in {template}, is that the right file?") + exit(1) + # print("match", m) + start, end = m.span() + # print(content[start:end]) + return int(m.groups()[1]) + + +def analyze_stages(stage_name: str, stages: List[Stage], goal_runtime_m: float, jenkins_dir: str): steps_across_shards = {} for stage in stages: for step in stage.steps: @@ -63,9 +80,14 @@ def analyze_stages(stage_name: str, stages: List[Stage], goal_runtime_m: float): num_shards = parallelizable_runtime_m / parallel_part num_shards = math.ceil(num_shards) + existing_shards = find_existing_shards(stage_name, jenkins_dir) + print(f" fixed runtime (m): {round(fixed_runtime_m, 2)}") print(f" parallel runtime (m): {round(parallelizable_runtime_m, 2)}") - print(f" required shards: {num_shards}") + if existing_shards == num_shards: + print(f" required shards: {num_shards} (no action required)") + else: + print(f" required shards: change from {existing_shards} to {num_shards} in {jenkins_dir}") def list_steps(build: Build): @@ -101,7 +123,7 @@ def total_rt(stage: Stage): ) -def analyze(build: Build, goal_runtime_m: float): +def analyze(build: Build, goal_runtime_m: float, jenkins_template): test_stages: List[Stage] = [] should_add = False for stage in build.stages: @@ -128,13 +150,13 @@ def analyze(build: Build, goal_runtime_m: float): merged_shards[stage.name] = [stage] for name, stages in merged_shards.items(): - analyze_stages(name, stages, goal_runtime_m) + analyze_stages(name, stages, goal_runtime_m, jenkins_template) async def main(args): async with aiohttp.ClientSession() as s: forward.SESSION = s - data = await fetch_branch(name=args.branch) + data = await fetch_branch(job_name=args.job, name=args.branch) return data @@ -144,8 +166,10 @@ async def main(args): ) parser.add_argument("--runtime-goal-m", required=True) parser.add_argument("--list-steps", action="store_true") + parser.add_argument("--job") parser.add_argument("--branch", default="main") parser.add_argument("--build", default="4082") + parser.add_argument("--jenkins-template") args = parser.parse_args() init(dir=".httpcache") init_log() @@ -157,4 +181,4 @@ async def main(args): list_steps(build) else: print(f"To reach goal runtime of {args.runtime_goal_m} for tests:") - analyze(build, goal_runtime_m=float(args.runtime_goal_m)) + analyze(build, goal_runtime_m=float(args.runtime_goal_m), jenkins_template=args.jenkins_template) diff --git a/dev/utils/__init__.py b/dev/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dev/utils/db.py b/dev/utils/db.py index c9930026..c30178d6 100644 --- a/dev/utils/db.py +++ b/dev/utils/db.py @@ -1,6 +1,6 @@ import os -from sqlalchemy import create_engine +from sqlalchemy import create_engine from sqlalchemy.dialects.postgresql import insert diff --git a/dev/utils/forward.py b/dev/utils/forward.py index f0dc36de..65b9ee6a 100644 --- a/dev/utils/forward.py +++ b/dev/utils/forward.py @@ -1,26 +1,26 @@ """ Scrape Jenkins, send build data to Loki and Postgres """ -import dataclasses -import aiohttp +import argparse import asyncio +import dataclasses import datetime -import sys -import os -import time import json -import argparse +import logging +import os import subprocess +import sys +import time from pathlib import Path +from typing import * -# from sqlalchemy import select -from .utils import * +import aiohttp + +from . import db, schema from .net import * -from typing import * -from . import db -from . import schema -import logging +# from sqlalchemy import select +from .utils import * SESSION = None DEBUG = os.getenv("DEBUG", "0") == "1" @@ -38,7 +38,7 @@ def walk(o, visitor): walk(v, visitor) -async def blue(url: str, use_cache: bool = True, no_slash: bool = False) -> Any: +async def blue(job_name: str, url: str, use_cache: bool = True, no_slash: bool = False) -> Any: if DEBUG: use_cache = True @@ -49,7 +49,7 @@ async def blue(url: str, use_cache: bool = True, no_slash: bool = False) -> Any: raise RuntimeError("SESSION is None") r = await aioget( - f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/branches/{url}", + f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/{job_name}/branches/{url}", session=SESSION, use_cache=use_cache, ) @@ -135,7 +135,7 @@ def parse_date(d: str) -> datetime.datetime: async def fetch_stage( - branch_name: str, build: Build, stage_data: Dict[str, Any] + job_name: str, branch_name: str, build: Build, stage_data: Dict[str, Any] ) -> Stage: stage = Stage( name=stage_data["displayName"], @@ -147,24 +147,24 @@ async def fetch_stage( result=stage_data["result"], id=stage_data["id"], parent=stage_data["firstParent"], - url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/detail/{branch_name}/{build.id}/pipeline/{stage_data['id']}", + url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/detail/{branch_name}/{build.id}/pipeline/{stage_data['id']}", steps=[], ) - steps_data = await blue(f"{branch_name}/runs/{build.id}/nodes/{stage.id}/steps") + steps_data = await blue(job_name, f"{branch_name}/runs/{build.id}/nodes/{stage.id}/steps") for step_data in steps_data: - stage.steps.append(await fetch_step(branch_name, build, stage, step_data)) + stage.steps.append(await fetch_step(job_name, branch_name, build, stage, step_data)) return stage async def fetch_step( - branch_name: str, build: Build, stage: Stage, step_data: Dict[str, Any] + job_name: str, branch_name: str, build: Build, stage: Stage, step_data: Dict[str, Any] ) -> Step: id = step_data["id"] - log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/branches/{branch_name}/runs/{build.id}/nodes/{stage.id}/steps/{id}/log/" - # log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/branches/{branch_name}/runs/{build.id}/steps/{id}/log/" + log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/{job_name}/branches/{branch_name}/runs/{build.id}/nodes/{stage.id}/steps/{id}/log/" + # log_url = f"https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/{job_name}/branches/{branch_name}/runs/{build.id}/steps/{id}/log/" # log = await aioget(log_url, session=SESSION) log = "dog" return Step( @@ -176,12 +176,12 @@ async def fetch_step( description=step_data["displayDescription"], log_url=log_url, log=log, - url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/detail/{branch_name}/{build.id}/pipeline/{stage.id}#step-{step_data['id']}", + url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/detail/{branch_name}/{build.id}/pipeline/{stage.id}#step-{step_data['id']}", duration_ms=int(step_data["durationInMillis"]), ) -async def fetch_build(branch_name: str, build_data: Dict[str, Any]) -> Build: +async def fetch_build(job_name: str, branch_name: str, build_data: Dict[str, Any]) -> Build: queued_at = parse_date(build_data["enQueueTime"]) started_at = parse_date(build_data["startTime"]) ended_at = parse_date(build_data["endTime"]) @@ -192,13 +192,13 @@ async def fetch_build(branch_name: str, build_data: Dict[str, Any]) -> Build: if causes is None: causes = [] - test_summary = await blue(f"{branch_name}/runs/{build_data['id']}/blueTestSummary") + test_summary = await blue(job_name, f"{branch_name}/runs/{build_data['id']}/blueTestSummary") build = Build( causes=[c["shortDescription"] for c in causes], id=build_data["id"], - url=f"https://ci.tlcpack.ai/job/tvm/job/{branch_name}/{build_data['id']}/", - blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/detail/{branch_name}/{build_data['id']}/pipeline", + url=f"https://ci.tlcpack.ai/job/{job_name}/job/{branch_name}/{build_data['id']}/", + blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/detail/{branch_name}/{build_data['id']}/pipeline", state=build_data["state"], result=build_data["result"], queued_at=queued_at, @@ -217,26 +217,26 @@ async def fetch_build(branch_name: str, build_data: Dict[str, Any]) -> Build: total_tests=test_summary["total"], ) - nodes_data = await blue(f"{branch_name}/runs/{build.id}/nodes") + nodes_data = await blue(job_name, f"{branch_name}/runs/{build.id}/nodes") for stage_data in nodes_data: - build.stages.append(await fetch_stage(branch_name, build, stage_data)) + build.stages.append(await fetch_stage(job_name, branch_name, build, stage_data)) return build -async def fetch_branch(name): +async def fetch_branch(job_name: str, name: str): logging.info(f"Fetching branch {name}") - branch_data = await blue(f"{name}", use_cache=False) + branch_data = await blue(job_name, f"{name}", use_cache=False) branch = Branch( name=name, full_name=branch_data["fullName"], - url=f"https://ci.tlcpack.ai/job/tvm/job/{name}/", - blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/activity?branch={name}", + url=f"https://ci.tlcpack.ai/job/{job_name}/job/{name}/", + blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/{job_name}/activity?branch={name}", builds=[], ) # Jenkins only fetches the last 100 by default - builds = await blue(f"{name}/runs", use_cache=False) + builds = await blue(job_name, f"{name}/runs", use_cache=False) logging.info(f"Found {len(builds)} builds for branch {name}") builds = list(reversed(sorted(builds, key=lambda b: int(b["id"])))) for build_data in builds: @@ -244,178 +244,8 @@ async def fetch_branch(name): # Only look at completed builds continue - branch.builds.append(await fetch_build(name, build_data)) + branch.builds.append(await fetch_build(job_name, name, build_data)) break return branch - -def upload_log(branch: Branch, build: Build, stage: Stage, step: Step) -> None: - LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" - lines = step.log.split("\n") - lines = [[str(time.time_ns()), line[27:]] for line in lines if line.strip() != ""] - - url = f"http://{LOKI_HOST}/loki/api/v1/push" - headers = {"Content-type": "application/json"} - labels = { - "branch_name": branch.name, - "branch_full_name": branch.full_name, - "branch_url": branch.url, - "branch_blue_url": branch.blue_url, - "build_id": build.id, - "build_url": build.url, - "build_state": build.state, - "build_commit": build.commit, - "stage_name": stage.name, - "stage_id": stage.id, - "stage_state": stage.state, - "stage_result": stage.result, - "step_name": step.name, - "step_id": step.id, - "step_result": step.result, - "step_state": step.state, - "step_description": step.description, - "step_log_url": step.log_url, - } - - payload = {"streams": [{"stream": labels, "values": lines}]} - payload = json.dumps(payload) - logging.info(f"Uploading logs for {stage.name} / {step.name} at {step.url}") - r = requests.post(url, data=payload, headers=headers) - logging.info(f"Loki responded {r}") - if r.status_code >= 200 and r.status_code < 300: - print("ok") - else: - r = json.loads(r.content.decode()) - jprint(r) - raise RuntimeError(f"Failed to upload: {r['message']}") - - -def upload_sql(branch: Branch, build: Build) -> None: - engine = db.get_engine(db.connection_string("tvm")) - - def db_dict(table, obj, stage=None): - names = [c.name for c in table.columns] - r = {} - for n in names: - if hasattr(obj, n): - r[n] = getattr(obj, n) - else: - if n == "branch_name": - v = branch.name - elif n == "build_id": - v = build.id - elif n == "stage_id": - v = stage.id - r[n] = v - return r - - def write(conn, table, obj, stage=None): - db.upsert(conn, table, db_dict(table, obj, stage)) - - logging.info( - f"[db] Writing {len(build.stages)} stages on build {build.id} for {branch.name} ({build.blue_url})" - ) - with engine.connect() as conn: - for stage in build.stages: - for step in stage.steps: - write(conn, schema.step, step, stage=stage) - write(conn, schema.stage, stage) - write(conn, schema.branch, branch) - write(conn, schema.build, build) - - -async def fetch_and_store_branch(name: str) -> Branch: - logging.info(f"Fetching branch {name}") - branch_data = await blue(f"{name}", use_cache=False) - branch = Branch( - name=name, - full_name=branch_data["fullName"], - url=f"https://ci.tlcpack.ai/job/tvm/job/{name}/", - blue_url=f"https://ci.tlcpack.ai/blue/organizations/jenkins/tvm/activity?branch={name}", - builds=[], - ) - - logging.info("Querying existing builds") - engine = db.get_engine(db.connection_string("tvm")) - - # Jenkins only fetches the last 100 by default - builds = await blue(f"{name}/runs", use_cache=False) - logging.info(f"Found {len(builds)} builds for branch {name}") - builds = list(reversed(sorted(builds, key=lambda b: int(b["id"])))) - # builds = [builds[10]] - for build_data in builds: - if build_data["state"] != "FINISHED": - # Only look at completed builds - logging.info(f"Build {build_data['id']} is incomplete, skipping") - continue - - # If this build is in the DB already don't bother with it again - build_id = int(build_data["id"]) - with engine.connect() as conn: - s = ( - select(schema.build.c.branch_name, schema.build.c.id) - .where(schema.build.c.id == build_id) - .where(schema.build.c.branch_name == branch.name) - ) - result = conn.execute(s) - if result.rowcount != 0: - logging.info(f"Found build {build_id} in DB, skipping") - continue - else: - logging.info(f"Fetching build {build_id}") - - build = await fetch_build(name, build_data) - logging.info( - f"Uploading branch {branch.name} build {build.id} builds to DB and Loki" - ) - for stage in build.stages: - for step in stage.steps: - upload_log(branch, build, stage, step) - upload_sql(branch, build) - branch.builds.append(await fetch_build(name, build_data)) - - return branch - - -async def main(args): - global SESSION - async with aiohttp.ClientSession() as s: - SESSION = s - - runs = await aioget( - "https://ci.tlcpack.ai/blue/rest/organizations/jenkins/pipelines/tvm/runs/", - session=s, - use_cache=False, - ) - runs = json.loads(runs) - branch_names = set(r["pipeline"] for r in runs) - branch_names.add("main") - branch_names = list(sorted(branch_names, key=lambda a: 0 if a == "main" else 1)) - # branch_names = ["main"] - - for name in branch_names: - await fetch_and_store_branch(name) - time.sleep(1) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Fetch Jenkins data and store it in Postgres + Loki" - ) - parser.add_argument("--forever", action="store_true", help="loop and re-fetch") - parser.add_argument( - "--wait-minutes", default=15, type=int, help="sleep time while looping" - ) - args = parser.parse_args() - init(dir=".httpcache") - init_log() - - if args.forever: - subprocess.run([sys.executable, str(SCHEMA_SCRIPT)], check=True) - while True: - asyncio.run(main(args)) - logging.info(f"Sleeping for {args.wait_minutes} minutes") - time.sleep(args.wait_minutes * 60) - else: - asyncio.run(main(args)) diff --git a/dev/utils/net.py b/dev/utils/net.py index 674f9a45..46164924 100644 --- a/dev/utils/net.py +++ b/dev/utils/net.py @@ -1,9 +1,9 @@ -import requests -import subprocess -import os import logging +import os +import subprocess from pathlib import Path +import requests CACHE_DIR = None DEBUG = os.getenv("DEBUG", "0") == "1" diff --git a/dev/utils/schema.py b/dev/utils/schema.py index 7aa9e25b..21ae5fca 100644 --- a/dev/utils/schema.py +++ b/dev/utils/schema.py @@ -1,20 +1,21 @@ +from typing import Any, Dict + import sqlalchemy -from sqlalchemy.orm import declarative_base from sqlalchemy import ( + JSON, + Boolean, Column, + DateTime, Integer, String, - Boolean, - DateTime, - JSON, Text, + column, + table, ) +from sqlalchemy.orm import declarative_base from sqlalchemy.sql.sqltypes import Float -from sqlalchemy import table, column from . import db -from typing import Dict, Any - Base = declarative_base() diff --git a/dev/utils/utils.py b/dev/utils/utils.py index 6f8b2867..34efacef 100644 --- a/dev/utils/utils.py +++ b/dev/utils/utils.py @@ -1,10 +1,9 @@ -import json -import sys import asyncio +import json import logging +import sys from pathlib import Path - REPO_ROOT = Path(__file__).resolve().parent