diff --git a/cloudinit/analyze/__init__.py b/cloudinit/analyze/__init__.py index a18141d845e..cc187391908 100644 --- a/cloudinit/analyze/__init__.py +++ b/cloudinit/analyze/__init__.py @@ -6,13 +6,15 @@ import re import sys from datetime import datetime -from typing import IO +from typing import IO, Dict, List, Tuple, Union from cloudinit.analyze import dump, show from cloudinit.atomic_helper import json_dumps -def get_parser(parser=None): +def get_parser( + parser: argparse.ArgumentParser = None, +) -> argparse.ArgumentParser: if not parser: parser = argparse.ArgumentParser( prog="cloudinit-analyze", @@ -113,7 +115,7 @@ def get_parser(parser=None): return parser -def analyze_boot(name, args): +def analyze_boot(name: str, args: argparse.Namespace) -> int: """Report a list of how long different boot operations took. For Example: @@ -196,7 +198,7 @@ def analyze_boot(name, args): return status_code -def analyze_blame(name, args): +def analyze_blame(name: str, args: argparse.Namespace) -> None: """Report a list of records sorted by largest time delta. For example: @@ -223,7 +225,7 @@ def analyze_blame(name, args): clean_io(infh, outfh) -def analyze_show(name, args): +def analyze_show(name: str, args: argparse.Namespace) -> None: """Generate output records using the 'standard' format to printing events. Example output follows: @@ -260,14 +262,14 @@ def analyze_show(name, args): clean_io(infh, outfh) -def analyze_dump(name, args): +def analyze_dump(name: str, args: argparse.Namespace) -> None: """Dump cloud-init events in json format""" infh, outfh = configure_io(args) outfh.write(json_dumps(_get_events(infh)) + "\n") clean_io(infh, outfh) -def _get_events(infile): +def _get_events(infile: IO) -> List[Dict[str, Union[str, float]]]: rawdata = None events, rawdata = show.load_events_infile(infile) if not events: @@ -275,7 +277,7 @@ def _get_events(infile): return events -def configure_io(args): +def configure_io(args: argparse.Namespace) -> Tuple[IO, IO]: """Common parsing and setup of input/output files""" if args.infile == "-": infh = sys.stdin diff --git a/cloudinit/analyze/dump.py b/cloudinit/analyze/dump.py index 55d149c4432..6e94fb49375 100644 --- a/cloudinit/analyze/dump.py +++ b/cloudinit/analyze/dump.py @@ -3,10 +3,11 @@ import calendar import sys from datetime import datetime, timezone +from typing import Dict, List, Optional, Tuple, Union from cloudinit import atomic_helper, subp, util -stage_to_description = { +stage_to_description: Dict[str, str] = { "finished": "finished running cloud-init", "init-local": "starting search for local datasources", "init-network": "searching for network datasources", @@ -27,7 +28,7 @@ DEFAULT_FMT = "%b %d %H:%M:%S %Y" -def parse_timestamp(timestampstr): +def parse_timestamp(timestampstr: str) -> float: # default syslog time does not include the current year months = [calendar.month_abbr[m] for m in range(1, 13)] if timestampstr.split()[0] in months: @@ -54,7 +55,7 @@ def parse_timestamp(timestampstr): return float(timestamp) -def has_gnu_date(): +def has_gnu_date() -> bool: """GNU date includes a string containing the word GNU in it in help output. Posix date does not. Use this to indicate on Linux systems without GNU date that the extended parsing is not @@ -63,7 +64,7 @@ def has_gnu_date(): return "GNU" in subp.subp(["date", "--help"]).stdout -def parse_timestamp_from_date(timestampstr): +def parse_timestamp_from_date(timestampstr: str) -> float: if not util.is_Linux() and subp.which("gdate"): date = "gdate" elif has_gnu_date(): @@ -77,7 +78,7 @@ def parse_timestamp_from_date(timestampstr): ) -def parse_ci_logline(line): +def parse_ci_logline(line: str) -> Optional[Dict[str, Union[str, float]]]: # Stage Starts: # Cloud-init v. 0.7.7 running 'init-local' at \ # Fri, 02 Sep 2016 19:28:07 +0000. Up 1.0 seconds. @@ -163,7 +164,9 @@ def parse_ci_logline(line): return event -def dump_events(cisource=None, rawdata=None): +def dump_events( + cisource: Optional[object] = None, rawdata: Optional[str] = None +) -> Tuple[List[Dict[str, Union[str, float]]], List[str]]: events = [] event = None CI_EVENT_MATCHES = ["start:", "finish:", "Cloud-init v."] @@ -189,7 +192,7 @@ def dump_events(cisource=None, rawdata=None): return events, data -def main(): +def main() -> str: if len(sys.argv) > 1: cisource = open(sys.argv[1]) else: diff --git a/cloudinit/analyze/show.py b/cloudinit/analyze/show.py index b3814c646fb..002d98e0a84 100644 --- a/cloudinit/analyze/show.py +++ b/cloudinit/analyze/show.py @@ -8,6 +8,7 @@ import json import sys import time +from typing import Dict, List, Optional, TextIO, Tuple, Union from cloudinit import subp, util from cloudinit.distros import uses_systemd @@ -31,7 +32,7 @@ # "timestamp": 1461164249.1590767 # } -format_key = { +format_key: Dict[str, str] = { "%d": "delta", "%D": "description", "%E": "elapsed", @@ -51,7 +52,7 @@ TIMESTAMP_UNKNOWN = (FAIL_CODE, -1, -1, -1) -def format_record(msg, event): +def format_record(msg: str, event: Dict[str, Union[str, float]]) -> str: for i, j in format_key.items(): if i in msg: # ensure consistent formatting of time values @@ -62,41 +63,47 @@ def format_record(msg, event): return msg.format(**event) -def event_name(event): +def event_name(event: Dict[str, Union[str, float]]) -> Optional[str]: if event: return event.get("name") return None -def event_type(event): +def event_type(event: Dict[str, Union[str, float]]) -> Optional[str]: if event: return event.get("event_type") return None -def event_parent(event): +def event_parent(event: Dict[str, Union[str, float]]) -> Optional[str]: if event: return event_name(event).split("/")[0] return None -def event_timestamp(event): +def event_timestamp(event: Dict[str, Union[str, float]]) -> float: return float(event.get("timestamp")) -def event_datetime(event): +def event_datetime(event: Dict[str, Union[str, float]]) -> datetime.datetime: return datetime.datetime.utcfromtimestamp(event_timestamp(event)) -def delta_seconds(t1, t2): +def delta_seconds(t1: datetime.datetime, t2: datetime.datetime) -> float: return (t2 - t1).total_seconds() -def event_duration(start, finish): +def event_duration( + start: Dict[str, Union[str, float]], finish: Dict[str, Union[str, float]] +) -> float: return delta_seconds(event_datetime(start), event_datetime(finish)) -def event_record(start_time, start, finish): +def event_record( + start_time: datetime.datetime, + start: Dict[str, Union[str, float]], + finish: Dict[str, Union[str, float]], +) -> Dict[str, Union[str, float]]: record = finish.copy() record.update( { @@ -109,7 +116,7 @@ def event_record(start_time, start, finish): return record -def total_time_record(total_time): +def total_time_record(total_time: float) -> str: return "Total Time: %3.5f seconds\n" % total_time @@ -118,7 +125,7 @@ class SystemctlReader: Class for dealing with all systemctl subp calls in a consistent manner. """ - def __init__(self, property, parameter=None): + def __init__(self, property: str, parameter: Optional[str] = None) -> None: self.epoch = None self.args = [subp.which("systemctl"), "show"] if parameter: @@ -129,7 +136,7 @@ def __init__(self, property, parameter=None): # requested from the object self.failure = self.subp() - def subp(self): + def subp(self) -> Optional[Union[str, Exception]]: """ Make a subp call based on set args and handle errors by setting failure code @@ -145,7 +152,7 @@ def subp(self): except Exception as systemctl_fail: return systemctl_fail - def parse_epoch_as_float(self): + def parse_epoch_as_float(self) -> float: """ If subp call succeeded, return the timestamp from subp as a float. @@ -166,7 +173,7 @@ def parse_epoch_as_float(self): return float(timestamp) / 1000000 -def dist_check_timestamp(): +def dist_check_timestamp() -> Tuple[str, float, float, float]: """ Determine which init system a particular linux distro is using. Each init system (systemd, etc) has a different way of @@ -188,7 +195,7 @@ def dist_check_timestamp(): return TIMESTAMP_UNKNOWN -def gather_timestamps_using_dmesg(): +def gather_timestamps_using_dmesg() -> Tuple[str, float, float, float]: """ Gather timestamps that corresponds to kernel begin initialization, kernel finish initialization using dmesg as opposed to systemctl @@ -219,7 +226,7 @@ def gather_timestamps_using_dmesg(): return TIMESTAMP_UNKNOWN -def gather_timestamps_using_systemd(): +def gather_timestamps_using_systemd() -> Tuple[str, float, float, float]: """ Gather timestamps that corresponds to kernel begin initialization, kernel finish initialization. and cloud-init systemd unit activation @@ -253,9 +260,9 @@ def gather_timestamps_using_systemd(): def generate_records( - events, - print_format="(%n) %d seconds in %I%D", -): + events: List[Dict[str, Union[str, float]]], + print_format: str = "(%n) %d seconds in %I%D", +) -> List[List[str]]: """ Take in raw events and create parent-child dependencies between events in order to order events in chronological order. @@ -326,7 +333,9 @@ def generate_records( return boot_records -def show_events(events, print_format): +def show_events( + events: List[Dict[str, Union[str, float]]], print_format: str +) -> List[List[str]]: """ A passthrough method that makes it easier to call generate_records() @@ -339,7 +348,9 @@ def show_events(events, print_format): return generate_records(events, print_format=print_format) -def load_events_infile(infile): +def load_events_infile( + infile: TextIO, +) -> Tuple[Optional[List[Dict[str, Union[str, float]]]], str]: """ Takes in a log file, read it, and convert to json. diff --git a/tools/.github-cla-signers b/tools/.github-cla-signers index 82dedc071f5..e1d24ed9d50 100644 --- a/tools/.github-cla-signers +++ b/tools/.github-cla-signers @@ -1,4 +1,5 @@ a-dubs +abdulganiyy aciba90 acourdavAkamai ader1990