Skip to content

Commit

Permalink
remote: WIP
Browse files Browse the repository at this point in the history
update to execnet 2.0.2
  • Loading branch information
kammoh committed Jul 16, 2023
1 parent 3aad818 commit a739d9b
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ dependencies = [
"pyvcd ~= 0.3",
"GitPython >= 3.1.30",
"fabric >= 3.1.0",
"execnet @ git+https://github.com/pytest-dev/execnet.git@6dabb0f8bb498d0007c82bee011be36313849bef#egg=execnet",
"execnet >= 2.0.2",
"devtools >= 0.11",
]

Expand Down
106 changes: 69 additions & 37 deletions src/xeda/flow_runner/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ def get_env_var(conn, var):
return result.stdout.strip()


def get_login_env(conn) -> dict[str, str]:
result = conn.run("$SHELL -l -c env", hide=True, pty=False)
assert result.ok
lines_split = [line.split("=") for line in result.stdout.strip().split("\n")]
return {kv[0]: kv[1] for kv in lines_split if len(kv) == 2}


class RemoteRunner(FlowLauncher):
class Settings(FlowLauncher.Settings):
clean: bool = True
Expand Down Expand Up @@ -147,8 +154,19 @@ def run_remote(
)
)
conn = Connection(host=host, user=user, port=port)
remote_home = get_env_var(conn, "HOME")
remote_path = f"{remote_home}/.xeda/{design.name}_{design_hash[:DIR_NAME_HASH_LEN]}"
log.info(
"Connecting to %s%s%s...", f"{user}@" if user else "", host, f":{port}" if port else ""
)

remote_env = get_login_env(conn)
remote_env_path = remote_env.get("PATH", "")
remote_home = remote_env.get("HOME")
log.info("remote PATH=%s HOME=%s", remote_env_path, remote_home)

remote_xeda = f"{remote_home}/.xeda"
if not Transfer(conn).is_remote_dir(remote_xeda):
conn.sftp().mkdir(remote_xeda)
remote_path = f"{remote_xeda}/{design.name}_{design_hash[:DIR_NAME_HASH_LEN]}"
if not Transfer(conn).is_remote_dir(remote_path):
conn.sftp().mkdir(remote_path)
assert Transfer(conn).is_remote_dir(remote_path)
Expand All @@ -164,6 +182,34 @@ def run_remote(
),
)

ssh_opt = f"{host}"
if user:
ssh_opt = f"{user}@{ssh_opt}"
if port:
ssh_opt += f"-p {port}"

python_exec = "python3"

gw = execnet.makegateway(
f"ssh={ssh_opt}//chdir={remote_path}//env:PATH={remote_env_path}//python={python_exec}"
)
channel = gw.remote_exec(
"""
import sys, os
channel.send((sys.platform, tuple(sys.version_info), os.getpid()))
"""
)
platform, version_info, _ = channel.receive()
version_info_str = ".".join(str(v) for v in version_info)
log.info(
f"Hostname:{host} Platform:{platform} Python:{version_info_str} PATH={remote_env_path}"
)
PY_MIN_VERSION = (3, 8, 0)
assert version_info[0] == PY_MIN_VERSION[0] and (
version_info[1] > PY_MIN_VERSION[1]
or (version_info[1] == PY_MIN_VERSION[1] and version_info[2] >= PY_MIN_VERSION[2])
), f"Python {'.'.join(str(d) for d in PY_MIN_VERSION)} or newer is required to be installed on the remote but found version {version_info_str}"

run_path = self.get_flow_run_path(
design.name,
flow_name,
Expand All @@ -187,42 +233,24 @@ def run_remote(
flowrun_hash=flowrun_hash,
)
dump_json(all_settings, settings_json, backup=self.settings.backups)
results = None
try:
results_channel = gw.remote_exec(
remote_runner,
remote_path=remote_path,
zip_file=zip_file,
flow=flow_name,
design_file=design_file,
flow_settings=flow_settings,
)

ssh_opt = f"{host}"
if user:
ssh_opt = f"{user}@{ssh_opt}"
if port:
ssh_opt = f"-p {port} {ssh_opt}"

gw = execnet.makegateway(f"ssh={ssh_opt}//chdir={remote_path}")
channel = gw.remote_exec(
"""
import sys, os
channel.send((sys.platform, tuple(sys.version_info), os.getpid()))
"""
)
platform, version_info, _ = channel.receive()
version_info_str = ".".join(str(v) for v in version_info)
log.info(
f"Hostname:{host} Patform:{platform} Python:{version_info_str} PATH:{get_env_var(conn, 'PATH')} SHELL:{get_env_var(conn, 'SHELL')}"
)
PY_MIN_VERSION = (3, 8, 0)
assert version_info[0] == PY_MIN_VERSION[0] and (
version_info[1] > PY_MIN_VERSION[1]
or (version_info[1] == PY_MIN_VERSION[1] and version_info[2] >= PY_MIN_VERSION[2])
), f"Python {'.'.join(str(d) for d in PY_MIN_VERSION)} or newer is required to be installed on the remote but found version {version_info_str}"

results_channel = gw.remote_exec(
remote_runner,
remote_path=remote_path,
zip_file=zip_file,
flow=flow_name,
design_file=design_file,
flow_settings=flow_settings,
)
if not results_channel.isclosed():
results_str = results_channel.receive()
if results_str:
results = json.loads(results_str)
except execnet.gateway_base.RemoteError as e:
log.critical("Remote exception: %s", e.formatted)

results_str = results_channel.receive()
results = json.loads(results_str)
if results:
print_results(
results=results,
Expand All @@ -240,6 +268,7 @@ def run_remote(
if isinstance(artifacts, (dict)):
artifacts = list(artifacts.values())
artifacts_dir.mkdir(exist_ok=True, parents=True)
num_transferred = 0
for f in artifacts:
remote_path = f if os.path.isabs(f) else remote_run_path + "/" + f
rel_path = os.path.relpath(f, remote_run_path) if os.path.isabs(f) else f
Expand All @@ -251,7 +280,10 @@ def run_remote(
local_path.parent.mkdir(parents=True)
assert local_path.is_relative_to(artifacts_dir)
result = conn.get(remote_path, str(local_path))
log.info("Transferred artifact %s to %s", f, result.local)
log.debug("Transferred artifact %s to %s", f, result.local)
num_transferred += 1
if num_transferred > 0:
log.info("Transferred %d artifact(s) to %s", num_transferred, artifacts_dir)

dump_json(results, results_json_path, backup=True)
log.info("Results written to %s", results_json_path)
Expand Down
5 changes: 4 additions & 1 deletion src/xeda/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ def __init__(self, executable: str, tool: str, path: str, *args: Any, **kwargs:
self.path = path

def __str__(self) -> str:
return f"{self.__class__.__name__}: Executable '{self.exec}' (for {self.tool}) was not found (PATH={self.path})"
return f"Executable '{self.exec}' (for {self.tool}) was not found (PATH={self.path})"

def __repr__(self) -> str:
return f"{self.__class__.__name__}: {self.__str__()}"


class RemoteSettings(XedaBaseModel):
Expand Down

0 comments on commit a739d9b

Please sign in to comment.