Skip to content

Commit

Permalink
Refactor setting files (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsutaj authored May 14, 2024
1 parent 08fc1eb commit 852bd06
Show file tree
Hide file tree
Showing 9 changed files with 435 additions and 428 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,4 @@ ss-out/
constraints.hpp
virtualenv/
problemset/
.idea/
155 changes: 68 additions & 87 deletions statements_manager/src/convert_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@
import pdfkit
from googleapiclient.discovery import build

from statements_manager.src.execute_config import ProblemSetConfig
from statements_manager.src.params_maker.lang_to_class import lang_to_class
from statements_manager.src.renderer import Renderer
from statements_manager.src.statement_location_mode import StatementLocationMode
from statements_manager.src.utils import create_token, dict_merge
from statements_manager.template import (
default_sample_template_html,
default_template_html,
template_pdf_options,
)

logger: Logger = getLogger(__name__)

Expand All @@ -40,75 +37,60 @@ def need_to_save(


class ConvertTaskRunner:
def __init__(
self,
problem_attr: dict[str, Any],
template_attr: dict[str, Any],
pdf_attr_raw: dict[str, Any],
):
def __init__(self, problemset_config: ProblemSetConfig):
self._cwd = Path.cwd()
self.problem_attr: dict[str, Any] = problem_attr
self.pdf_attr_raw: dict[str, Any] = pdf_attr_raw
self.problemset_config = problemset_config
self.renderer = Renderer(
template_attr.get("template_html", default_template_html),
template_attr.get("sample_template_html", default_sample_template_html),
template_attr.get("preprocess_path", None),
template_attr.get("postprocess_path", None),
problemset_config.template_html,
problemset_config.sample_template_html,
problemset_config.template.preprocess_path,
problemset_config.template.postprocess_path,
)
self.problemset_dir = template_attr["output_path"]
self.problemset_dir = problemset_config.output_path

def get_local_contents(self, problem_id: str) -> Tuple[ContentsStatus, str]:
try:
with open(self.problem_attr[problem_id]["statement_path"]) as f:
with open(
self.problemset_config.get_problem(problem_id).statement.path
) as f:
return (ContentsStatus.OK, f.read())
except EnvironmentError:
logger.error(
f"{self.problem_attr[problem_id]['statement_path']} does not exist"
)
logger.error(f"problem_id {problem_id}: statement path does not exist")
return (ContentsStatus.NG, "")

def get_docs_contents(self, problem_id: str) -> Tuple[ContentsStatus, str]:
statement_path = self.problem_attr[problem_id]["statement_path"]
setting_dir_candidates = [
pathlib.Path.home() / ".ss-manager",
pathlib.Path(self.problem_attr[problem_id]["token_path"]).parent,
]
for setting_dir in setting_dir_candidates:
if not setting_dir.exists():
continue
statement_path = self.problemset_config.get_problem(problem_id).statement.path
setting_dir = pathlib.Path.home() / ".ss-manager"
try:
token = create_token(
creds_path=str(setting_dir / "credentials.json"),
token_path=str(setting_dir / "token.pickle"),
)
if token is None:
raise FileNotFoundError("token not found")

try:
token = create_token(
creds_path=str(setting_dir / "credentials.json"),
token_path=str(setting_dir / "token.pickle"),
)
if token is None:
logger.info(
f"trying to get docs file using token ({setting_dir / 'token.pickle'})"
)
service = build("docs", "v1", credentials=token)
document = service.documents().get(documentId=statement_path).execute()
contents = ""
for content in document.get("body")["content"]:
if "paragraph" not in content:
continue

logger.info(
f"trying to get docs file using token ({setting_dir / 'token.pickle'})"
)
service = build("docs", "v1", credentials=token)
document = service.documents().get(documentId=statement_path).execute()
contents = ""
for content in document.get("body")["content"]:
if "paragraph" not in content:
continue
for element in content["paragraph"]["elements"]:
statement = element["textRun"]["content"]
if "suggestedInsertionIds" not in element["textRun"]:
contents += statement
else:
logger.warning(
f"proposed element for addition (ignored in rendering): {statement}"
)
if "suggestedDeletionIds" in element["textRun"]:
logger.warning(
f"proposed element for deletion: {statement}"
)
return (ContentsStatus.OK, contents)
except Exception as e:
logger.error(f"error occured! ({setting_dir}): {e}")
for element in content["paragraph"]["elements"]:
statement = element["textRun"]["content"]
if "suggestedInsertionIds" not in element["textRun"]:
contents += statement
else:
logger.warning(
f"proposed element for addition (ignored in rendering): {statement}"
)
if "suggestedDeletionIds" in element["textRun"]:
logger.warning(f"proposed element for deletion: {statement}")
return (ContentsStatus.OK, contents)
except Exception as e:
logger.error(f"error occured! ({setting_dir}): {e}")

# どのパスでも生成できなかったらエラー
logger.error("cannot get docs contents")
Expand All @@ -121,13 +103,14 @@ def get_docs_contents(self, problem_id: str) -> Tuple[ContentsStatus, str]:

# ローカルまたは Google Docs から問題文のテキストファイルを取得
def get_contents(self, problem_id: str) -> Tuple[ContentsStatus, str]:
if self.problem_attr[problem_id]["mode"] == "local":
mode = self.problemset_config.get_problem(problem_id).statement.mode
if mode == StatementLocationMode.LOCAL:
return self.get_local_contents(problem_id)
elif self.problem_attr[problem_id]["mode"] == "docs":
elif mode == StatementLocationMode.DOCS:
return self.get_docs_contents(problem_id)
else:
logger.error(f"unknown mode: {self.problem_attr[problem_id]['mode']}")
raise ValueError(f"unknown mode: {self.problem_attr[problem_id]['mode']}")
logger.error(f"unknown mode: {mode}")
raise ValueError(f"unknown mode: {mode}")

def save_file(self, text: str, output_path: str):
with open(output_path, "w") as f:
Expand All @@ -136,22 +119,23 @@ def save_file(self, text: str, output_path: str):
# 制約パラメータファイル (e.g. constraints.hpp) の作成
def create_params_file(self, problem_id: str) -> None:
logger.info("create params file")
problem_config = self.problemset_config.get_problem(problem_id)
if (
"params_path" in self.problem_attr[problem_id]
and "constraints" in self.problem_attr[problem_id]
problem_config.params_path is not None
and problem_config.constraints is not None
):
ext: str = Path(self.problem_attr[problem_id]["params_path"]).suffix
ext: str = Path(problem_config.params_path).suffix
if ext in lang_to_class:
params_maker = lang_to_class[ext](
self.problem_attr[problem_id]["constraints"],
self.problem_attr[problem_id]["params_path"],
problem_config.constraints,
problem_config.params_path,
)
params_maker.run()
else:
logger.warning(
f"skip creating params: no language config which matches '{ext}'"
)
elif "constraints" not in self.problem_attr[problem_id]:
elif problem_config.constraints is None:
logger.warning("skip creating params: constraints are not set")
else:
logger.warning("skip creating params: params_path is not set")
Expand All @@ -163,8 +147,9 @@ def copy_assets(
output_path: Path,
) -> dict[str, str]:
cache = {}
if "assets_path" in self.problem_attr[problem_id]:
assets_src_path = Path(self.problem_attr[problem_id]["assets_path"])
problem_config = self.problemset_config.get_problem(problem_id)
if problem_config.assets_path is not None:
assets_src_path = Path(problem_config.assets_path)
assets_dst_path = output_path
if assets_src_path.exists():
logger.info("copy assets file")
Expand All @@ -182,20 +167,16 @@ def copy_assets(

else:
logger.warning(
f"assets_path '{self.problem_attr[problem_id]['assets_path']}' does not exist."
f"assets_path '{problem_config.assets_path}' does not exist."
)
return cache

def make_pdf_attr(self, is_problemset: bool) -> dict[str, Any]:
if "common" in self.pdf_attr_raw:
pdf_attr = self.pdf_attr_raw["common"]
else:
pdf_attr = template_pdf_options

pdf_attr = self.problemset_config.pdf_config.common
if is_problemset:
pdf_attr.update(self.pdf_attr_raw.get("problemset", {}))
pdf_attr.update(self.problemset_config.pdf_config.problemset)
else:
pdf_attr.update(self.pdf_attr_raw.get("problem", {}))
pdf_attr.update(self.problemset_config.pdf_config.problem)
return pdf_attr

def run_rendering(
Expand All @@ -215,7 +196,7 @@ def run_rendering(
logger.info(f"saving replaced {output_ext}")
if output_ext == "html":
html = self.renderer.generate_html(
problem_attr=self.problem_attr,
problemset_config=self.problemset_config,
problem_ids=problem_ids,
is_problemset=is_problemset,
)
Expand All @@ -232,7 +213,7 @@ def run_rendering(
elif output_ext == "pdf":
pdf_attr = self.make_pdf_attr(is_problemset)
html = self.renderer.generate_html_for_pdf(
problem_attr=self.problem_attr,
problemset_config=self.problemset_config,
problem_ids=problem_ids,
is_problemset=is_problemset,
pdf_path=output_path,
Expand All @@ -252,7 +233,7 @@ def run_rendering(
logger.warning("skip dumping pdf: same result as before")
elif output_ext == "md":
md = self.renderer.generate_markdown(
problem_attr=self.problem_attr,
problemset_config=self.problemset_config,
problem_ids=problem_ids,
is_problemset=is_problemset,
)
Expand Down Expand Up @@ -284,7 +265,7 @@ def run(
problemset_cache: dict[str, Any] = {}
for problem_id in problem_ids:
logger.info(f"rendering [problem id: {problem_id}]")

problem_config = self.problemset_config.get_problem(problem_id)
self.create_params_file(problem_id)
if constraints_only:
continue
Expand All @@ -296,10 +277,10 @@ def run(
continue

valid_problem_ids.append(problem_id)
self.problem_attr[problem_id]["raw_statement"] = raw_statement
problem_config.statement.raw_text = raw_statement

# 問題文ファイル出力先
output_dir = self.problem_attr[problem_id]["output_path"]
output_dir = pathlib.Path(problem_config.output_path)
if output_dir.exists():
logger.warning(f"output directory '{output_dir}' already exists.")
else:
Expand Down
Loading

0 comments on commit 852bd06

Please sign in to comment.