From 50b5750017a0ff45647e0244aaff11a197a9c793 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Wed, 9 Aug 2023 01:23:17 +0100 Subject: [PATCH 01/24] initial draft commit --- scripts/upgrage-legacy-rules.py | 145 ++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100755 scripts/upgrage-legacy-rules.py diff --git a/scripts/upgrage-legacy-rules.py b/scripts/upgrage-legacy-rules.py new file mode 100755 index 000000000..a9509efa9 --- /dev/null +++ b/scripts/upgrage-legacy-rules.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + + +import sys +import argparse +import textwrap +from typing import List, Optional +from pathlib import Path + +import yaml + +from capa.main import collect_rule_file_paths +from capa.rules import Rule +from capa.features.address import NO_ADDRESS + +dynamic_features = ("api", "string", "substring", "number", "description", "regex", "match", "os") +engine_words = ("and", "or", "optional", "not") +static_scopes = ("function", "basic block", "instruction") +dynamic_scopes = ("thread",) + + +def rec_features_list(static, context=False): + """ + takes in a list of static features, and returns it alongside a list of dynamic-only features + """ + dynamic = [] + for node in static: + for key, value in node.items(): + pass + if isinstance(value, list): + # is either subscope or ceng + if key in (*static_scopes, *dynamic_scopes): + # is subscope + stat, dyn = rec_scope(key, value, context) + if not context and dyn: + dynamic.append({"or": [stat, dyn]}) + elif context == "d" and dyn: + dynamic.append(dyn) + elif key in engine_words or key.endswith("or more"): + # is ceng + stat, dyn = rec_bool(key, value, context) + if dyn: + dynamic.append(dyn) + else: + raise ValueError(f"key: {key}, value: {value}") + if key.startswith("count"): + key = key.split("(")[1].split(")")[0] + if key in dynamic_features: + dynamic.append(node) + return static, dynamic + + +def rec_scope(key, value, context=False): + """ + takes in a static subscope, and returns it alongside its dynamic counterpart. + """ + if len(value) > 1 or (key == "instruction" and key not in engine_words): + static, _ = rec_bool("and", value, "s") + _, dynamic = rec_bool("and", value, "d") + else: + static, _ = rec_features_list(value, "s") + _, dynamic = rec_features_list(value, "d") + return {key: static}, {"thread": dynamic} + + +def rec_bool(key, value, context=False): + """ + takes in a capa logical statement and returns a static and dynamic variation of it. + """ + stat, dyn = rec_features_list(value, context) + if key == "and" and sorted(map(lambda s: s.keys(), stat)) != sorted(map(lambda s: s.keys(), dyn)): + return {key: value}, {} + if dyn: + return {key: value}, {key: dyn} + return {key: value}, {} + + +def upgrade_rule(content): + features = content["rule"]["features"] + print(f"original: {features[0]}\n") + for key, value in features[0].items(): + pass + if key in static_scopes: + print(f"modified: {rec_scope(key, value)[1]}") + elif key in engine_words: + print(f"modified: {rec_bool(key, value)[1]}") + else: + print(f"modified: {rec_features_list([{key: value}])[1]}") + + print("\n\n") + + +def main(argv: Optional[List[str]] = None): + desc = ( + "Upgrade legacy-format rulesets into the new rules format which supports static and dynamic analysis flavors." + ) + parser = argparse.ArgumentParser(description=desc, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--old-rules-path", default="../rules", help="path to the legacy ruleset") + parser.add_argument("--new-rules-save-path", default="../upgraded-rules/", help="where to save the upgraded rules") + args = parser.parse_args(args=argv) + + # check args + old_rules_path = Path(args.old_rules_path) + new_rules_save_path = Path(args.new_rules_save_path) + if old_rules_path == new_rules_save_path: + print( + textwrap.dedent( + """ + WARNING: you've specified the same directory as the old-rules' path and the new rules' save path, + which will cause this script to overwrite your old rules with the new upgraded ones. + Are you sure you want proceed with overwritting the old rules [O]verwrite/[E]xit: + """ + ) + ) + response = "" + while response not in ("o", "e"): + response = input().lower() + if response == "o": + print("Old rules' folder will be overwritten.") + elif response == "e": + print("The ruleset will not been upgraded.") + sys.exit(0) + else: + print("Please provide a valid answer [O]verwrite/[E]xit: ") + + # Get rules + rule_file_paths: List[Path] = collect_rule_file_paths([old_rules_path]) + rule_contents = [rule_path.read_bytes() for rule_path in rule_file_paths] + + rules = [] # type: List[Rule] + for path, content in zip(rule_file_paths, rule_contents): + content = content.decode("utf-8") + content = yaml.load(content, Loader=yaml.Loader) + upgrade_rule(content) + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) From 7c8f0aea89488ee3aad9450d0da966ef35a6a7c9 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Wed, 9 Aug 2023 01:24:03 +0100 Subject: [PATCH 02/24] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92bec467f..a469f8f4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - Add dynamic capability extraction @yelhamer - Add support for mixed-scopes rules @yelhamer - Add a call scope @yelhamer +- Add a rule migration script @yelhamer ### Breaking Changes From cd776eaee38a778397ecb79780beeba764d2fe38 Mon Sep 17 00:00:00 2001 From: Yacine <16624109+yelhamer@users.noreply.github.com> Date: Wed, 9 Aug 2023 08:42:11 +0100 Subject: [PATCH 03/24] upse uppercase for constants Co-authored-by: Willi Ballenthin --- scripts/upgrage-legacy-rules.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/upgrage-legacy-rules.py b/scripts/upgrage-legacy-rules.py index a9509efa9..b195c0067 100755 --- a/scripts/upgrage-legacy-rules.py +++ b/scripts/upgrage-legacy-rules.py @@ -20,10 +20,10 @@ from capa.rules import Rule from capa.features.address import NO_ADDRESS -dynamic_features = ("api", "string", "substring", "number", "description", "regex", "match", "os") -engine_words = ("and", "or", "optional", "not") -static_scopes = ("function", "basic block", "instruction") -dynamic_scopes = ("thread",) +DYNAMIC_FEATURES = ("api", "string", "substring", "number", "description", "regex", "match", "os") +ENGINE_STATEMENTS = ("and", "or", "optional", "not") +STATIC_SCOPES = ("function", "basic block", "instruction") +DYNAMIC_SCOPES = ("thread",) def rec_features_list(static, context=False): From dce17f2fba0009f22dda7c4a8536ed36255bf6d8 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Wed, 9 Aug 2023 09:39:53 +0100 Subject: [PATCH 04/24] fix typo in script naming --- scripts/upgrade-legacy-rules.py | 146 ++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100755 scripts/upgrade-legacy-rules.py diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py new file mode 100755 index 000000000..8d475b407 --- /dev/null +++ b/scripts/upgrade-legacy-rules.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + + +import sys +import argparse +import textwrap +from typing import List, Optional +from pathlib import Path + +import yaml + +from capa.main import collect_rule_file_paths +from capa.rules import Rule +from capa.features.address import NO_ADDRESS + +DYNAMIC_FEATURES = ("api", "string", "substring", "number", "description", "regex", "match", "os") +ENGINE_STATEMENTS = ("and", "or", "optional", "not") +STATIC_SCOPES = ("function", "basic block", "instruction") +DYNAMIC_SCOPES = ("thread",) + + +def rec_features_list(static, context=False): + """ + takes in a list of static features, and returns it alongside a list of dynamic-only features + """ + dynamic = [] + for node in static: + for key, value in node.items(): + pass + if isinstance(value, list): + # is either subscope or ceng + if key in (*static_scopes, *dynamic_scopes): + # is subscope + stat, dyn = rec_scope(key, value, context) + if not context and dyn: + dynamic.append({"or": [stat, dyn]}) + elif context == "d" and dyn: + dynamic.append(dyn) + elif key in engine_words or key.endswith("or more"): + # is ceng + stat, dyn = rec_bool(key, value, context) + if dyn: + dynamic.append(dyn) + else: + raise ValueError(f"key: {key}, value: {value}") + if key.startswith("count"): + key = key.split("(")[1].split(")")[0] + if key in dynamic_features: + dynamic.append(node) + return static, dynamic + + +def rec_scope(key, value, context=False): + """ + takes in a static subscope, and returns it alongside its dynamic counterpart. + """ + if len(value) > 1 or (key == "instruction" and key not in engine_words): + static, _ = rec_bool("and", value, "s") + _, dynamic = rec_bool("and", value, "d") + else: + static, _ = rec_features_list(value, "s") + _, dynamic = rec_features_list(value, "d") + return {key: static}, {"thread": dynamic} + + +def rec_bool(key, value, context=False): + """ + takes in a capa logical statement and returns a static and dynamic variation of it. + """ + stat, dyn = rec_features_list(value, context) + if key == "and" and len(stat) != len(dyn): + print(sorted(map(lambda s: s.keys(), dyn))) + return {key: value}, {} + if dyn: + return {key: value}, {key: dyn} + return {key: value}, {} + + +def upgrade_rule(content): + features = content["rule"]["features"] + print(f"original: {features[0]}\n") + for key, value in features[0].items(): + pass + if key in static_scopes: + print(f"modified: {rec_scope(key, value)[1]}") + elif key in engine_words: + print(f"modified: {rec_bool(key, value)[1]}") + else: + print(f"modified: {rec_features_list([{key: value}])[1]}") + + print("\n\n") + + +def main(argv: Optional[List[str]] = None): + desc = ( + "Upgrade legacy-format rulesets into the new rules format which supports static and dynamic analysis flavors." + ) + parser = argparse.ArgumentParser(description=desc, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--old-rules-path", default="../rules", help="path to the legacy ruleset") + parser.add_argument("--new-rules-save-path", default="../upgraded-rules/", help="where to save the upgraded rules") + args = parser.parse_args(args=argv) + + # check args + old_rules_path = Path(args.old_rules_path) + new_rules_save_path = Path(args.new_rules_save_path) + if old_rules_path == new_rules_save_path: + print( + textwrap.dedent( + """ + WARNING: you've specified the same directory as the old-rules' path and the new rules' save path, + which will cause this script to overwrite your old rules with the new upgraded ones. + Are you sure you want proceed with overwritting the old rules [O]verwrite/[E]xit: + """ + ) + ) + response = "" + while response not in ("o", "e"): + response = input().lower() + if response == "o": + print("Old rules' folder will be overwritten.") + elif response == "e": + print("The ruleset will not been upgraded.") + sys.exit(0) + else: + print("Please provide a valid answer [O]verwrite/[E]xit: ") + + # Get rules + rule_file_paths: List[Path] = collect_rule_file_paths([old_rules_path]) + rule_contents = [rule_path.read_bytes() for rule_path in rule_file_paths] + + rules = [] # type: List[Rule] + for path, content in zip(rule_file_paths, rule_contents): + content = content.decode("utf-8") + content = yaml.load(content, Loader=yaml.Loader) + upgrade_rule(content) + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) From 4392c2a5e1b4b341b12469bce9db7b3a261e009d Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 10 Aug 2023 00:46:12 +0100 Subject: [PATCH 05/24] finished-up script --- scripts/upgrade-legacy-rules.py | 114 +++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 31 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 8d475b407..932541d11 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -11,22 +11,32 @@ import sys import argparse import textwrap -from typing import List, Optional +from typing import List, Union, Literal, Optional from pathlib import Path import yaml +from typing_extensions import TypeAlias from capa.main import collect_rule_file_paths -from capa.rules import Rule from capa.features.address import NO_ADDRESS -DYNAMIC_FEATURES = ("api", "string", "substring", "number", "description", "regex", "match", "os") +DYNAMIC_FEATURES = ("api", "string", "substring", "number", "description", "regex", "match", "os") +DYNAMIC_CHARACTERISTICS = ("embedded-pe",) ENGINE_STATEMENTS = ("and", "or", "optional", "not") STATIC_SCOPES = ("function", "basic block", "instruction") DYNAMIC_SCOPES = ("thread",) +GET_DYNAMIC_EQUIV = { + "instruction": "call", + "basic block": "thread", + "function": "process", + "file": "file", +} -def rec_features_list(static, context=False): +context: TypeAlias = Union[Literal["static"], Literal["dynamic"]] + + +def rec_features_list(static: List[dict], context=False): """ takes in a list of static features, and returns it alongside a list of dynamic-only features """ @@ -36,14 +46,14 @@ def rec_features_list(static, context=False): pass if isinstance(value, list): # is either subscope or ceng - if key in (*static_scopes, *dynamic_scopes): + if key in (*STATIC_SCOPES, *DYNAMIC_SCOPES): # is subscope - stat, dyn = rec_scope(key, value, context) + stat, dyn = rec_scope(key, value) if not context and dyn: dynamic.append({"or": [stat, dyn]}) - elif context == "d" and dyn: + elif context == "dynamic" and dyn: dynamic.append(dyn) - elif key in engine_words or key.endswith("or more"): + elif key in ENGINE_STATEMENTS or key.endswith("or more"): # is ceng stat, dyn = rec_bool(key, value, context) if dyn: @@ -52,22 +62,25 @@ def rec_features_list(static, context=False): raise ValueError(f"key: {key}, value: {value}") if key.startswith("count"): key = key.split("(")[1].split(")")[0] - if key in dynamic_features: + if key.startswith("characteristic"): + if value in DYNAMIC_CHARACTERISTICS: + dynamic.append(node) + if key in DYNAMIC_FEATURES: dynamic.append(node) return static, dynamic -def rec_scope(key, value, context=False): +def rec_scope(key, value): """ takes in a static subscope, and returns it alongside its dynamic counterpart. """ - if len(value) > 1 or (key == "instruction" and key not in engine_words): - static, _ = rec_bool("and", value, "s") - _, dynamic = rec_bool("and", value, "d") + if len(value) > 1 or (key == "instruction" and key not in ENGINE_STATEMENTS): + _, dynamic = rec_features_list([{"and": value}], context="dynamic") else: - static, _ = rec_features_list(value, "s") - _, dynamic = rec_features_list(value, "d") - return {key: static}, {"thread": dynamic} + _, dynamic = rec_features_list(value, context="dynamic") + if dynamic: + return {key: value}, {GET_DYNAMIC_EQUIV[key]: dynamic} + return {key: value}, {} def rec_bool(key, value, context=False): @@ -76,26 +89,54 @@ def rec_bool(key, value, context=False): """ stat, dyn = rec_features_list(value, context) if key == "and" and len(stat) != len(dyn): - print(sorted(map(lambda s: s.keys(), dyn))) return {key: value}, {} if dyn: return {key: value}, {key: dyn} return {key: value}, {} +class NoAliasDumper(yaml.SafeDumper): + def ignore_aliases(self, data): + return True + + def increase_indent(self, flow=False, indentless=False): + return super(NoAliasDumper, self).increase_indent(flow, indentless) + + +def update_meta(meta, has_dyn=True): + new_meta = {} + for key, value in meta.items(): + if key != "scope": + if isinstance(value, list): + new_meta[key] = {"~": value} + else: + new_meta[key] = value + continue + if has_dyn: + new_meta["scopes"] = {"static": value, "dynamic": GET_DYNAMIC_EQUIV[value]} + else: + new_meta["scopes"] = {"static": value} + return new_meta + + def upgrade_rule(content): features = content["rule"]["features"] - print(f"original: {features[0]}\n") + for key, value in features[0].items(): pass - if key in static_scopes: - print(f"modified: {rec_scope(key, value)[1]}") - elif key in engine_words: - print(f"modified: {rec_bool(key, value)[1]}") + stat, dyn = rec_features_list([{key: value}]) + + meta = update_meta(content["rule"]["meta"], has_dyn=dyn) + if dyn: + features = dyn else: - print(f"modified: {rec_features_list([{key: value}])[1]}") + features = stat - print("\n\n") + content["rule"] = {"meta": meta, "features": {"~": features}} + + upgraded_rule = yaml.dump(content, Dumper=NoAliasDumper, sort_keys=False).split("\n") + upgraded_rule = "\n".join(list(filter(lambda line: "~" not in line, upgraded_rule))) + print(upgraded_rule) def main(argv: Optional[List[str]] = None): @@ -103,19 +144,26 @@ def main(argv: Optional[List[str]] = None): "Upgrade legacy-format rulesets into the new rules format which supports static and dynamic analysis flavors." ) parser = argparse.ArgumentParser(description=desc, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument("--old-rules-path", default="../rules", help="path to the legacy ruleset") - parser.add_argument("--new-rules-save-path", default="../upgraded-rules/", help="where to save the upgraded rules") + parser.add_argument( + "--old-rules-path", default=Path(__file__).parents[1].joinpath("rules"), help="path to the legacy ruleset" + ) + parser.add_argument( + "--save-path", + default=Path(__file__).parents[1].joinpath("upgraded-rules"), + help="where to save the upgraded rules", + ) args = parser.parse_args(args=argv) # check args old_rules_path = Path(args.old_rules_path) - new_rules_save_path = Path(args.new_rules_save_path) + new_rules_save_path = Path(args.save_path) + if old_rules_path == new_rules_save_path: print( textwrap.dedent( """ - WARNING: you've specified the same directory as the old-rules' path and the new rules' save path, - which will cause this script to overwrite your old rules with the new upgraded ones. + WARNING: you've specified the same directory for the old-rules' path and the new rules' save path. + This will cause this script to overwrite your old rules with the new upgraded ones. Are you sure you want proceed with overwritting the old rules [O]verwrite/[E]xit: """ ) @@ -135,11 +183,15 @@ def main(argv: Optional[List[str]] = None): rule_file_paths: List[Path] = collect_rule_file_paths([old_rules_path]) rule_contents = [rule_path.read_bytes() for rule_path in rule_file_paths] - rules = [] # type: List[Rule] for path, content in zip(rule_file_paths, rule_contents): content = content.decode("utf-8") content = yaml.load(content, Loader=yaml.Loader) - upgrade_rule(content) + new_rule = upgrade_rule(content) + save_path = Path(new_rules_save_path) + save_path = save_path.joinpath(path.relative_to(old_rules_path)) + save_path.parents[0].mkdir(parents=True, exist_ok=True) + with save_path.open("w", encoding="utf-8") as f: + f.write(new_rule) if __name__ == "__main__": From 1be70355d820b346273cf225106ff1f56f9ef7b8 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 10 Aug 2023 00:47:10 +0100 Subject: [PATCH 06/24] removed old script with wrong name from the PR --- scripts/upgrage-legacy-rules.py | 145 -------------------------------- 1 file changed, 145 deletions(-) delete mode 100755 scripts/upgrage-legacy-rules.py diff --git a/scripts/upgrage-legacy-rules.py b/scripts/upgrage-legacy-rules.py deleted file mode 100755 index b195c0067..000000000 --- a/scripts/upgrage-legacy-rules.py +++ /dev/null @@ -1,145 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: [package root]/LICENSE.txt -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under the License. - - -import sys -import argparse -import textwrap -from typing import List, Optional -from pathlib import Path - -import yaml - -from capa.main import collect_rule_file_paths -from capa.rules import Rule -from capa.features.address import NO_ADDRESS - -DYNAMIC_FEATURES = ("api", "string", "substring", "number", "description", "regex", "match", "os") -ENGINE_STATEMENTS = ("and", "or", "optional", "not") -STATIC_SCOPES = ("function", "basic block", "instruction") -DYNAMIC_SCOPES = ("thread",) - - -def rec_features_list(static, context=False): - """ - takes in a list of static features, and returns it alongside a list of dynamic-only features - """ - dynamic = [] - for node in static: - for key, value in node.items(): - pass - if isinstance(value, list): - # is either subscope or ceng - if key in (*static_scopes, *dynamic_scopes): - # is subscope - stat, dyn = rec_scope(key, value, context) - if not context and dyn: - dynamic.append({"or": [stat, dyn]}) - elif context == "d" and dyn: - dynamic.append(dyn) - elif key in engine_words or key.endswith("or more"): - # is ceng - stat, dyn = rec_bool(key, value, context) - if dyn: - dynamic.append(dyn) - else: - raise ValueError(f"key: {key}, value: {value}") - if key.startswith("count"): - key = key.split("(")[1].split(")")[0] - if key in dynamic_features: - dynamic.append(node) - return static, dynamic - - -def rec_scope(key, value, context=False): - """ - takes in a static subscope, and returns it alongside its dynamic counterpart. - """ - if len(value) > 1 or (key == "instruction" and key not in engine_words): - static, _ = rec_bool("and", value, "s") - _, dynamic = rec_bool("and", value, "d") - else: - static, _ = rec_features_list(value, "s") - _, dynamic = rec_features_list(value, "d") - return {key: static}, {"thread": dynamic} - - -def rec_bool(key, value, context=False): - """ - takes in a capa logical statement and returns a static and dynamic variation of it. - """ - stat, dyn = rec_features_list(value, context) - if key == "and" and sorted(map(lambda s: s.keys(), stat)) != sorted(map(lambda s: s.keys(), dyn)): - return {key: value}, {} - if dyn: - return {key: value}, {key: dyn} - return {key: value}, {} - - -def upgrade_rule(content): - features = content["rule"]["features"] - print(f"original: {features[0]}\n") - for key, value in features[0].items(): - pass - if key in static_scopes: - print(f"modified: {rec_scope(key, value)[1]}") - elif key in engine_words: - print(f"modified: {rec_bool(key, value)[1]}") - else: - print(f"modified: {rec_features_list([{key: value}])[1]}") - - print("\n\n") - - -def main(argv: Optional[List[str]] = None): - desc = ( - "Upgrade legacy-format rulesets into the new rules format which supports static and dynamic analysis flavors." - ) - parser = argparse.ArgumentParser(description=desc, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument("--old-rules-path", default="../rules", help="path to the legacy ruleset") - parser.add_argument("--new-rules-save-path", default="../upgraded-rules/", help="where to save the upgraded rules") - args = parser.parse_args(args=argv) - - # check args - old_rules_path = Path(args.old_rules_path) - new_rules_save_path = Path(args.new_rules_save_path) - if old_rules_path == new_rules_save_path: - print( - textwrap.dedent( - """ - WARNING: you've specified the same directory as the old-rules' path and the new rules' save path, - which will cause this script to overwrite your old rules with the new upgraded ones. - Are you sure you want proceed with overwritting the old rules [O]verwrite/[E]xit: - """ - ) - ) - response = "" - while response not in ("o", "e"): - response = input().lower() - if response == "o": - print("Old rules' folder will be overwritten.") - elif response == "e": - print("The ruleset will not been upgraded.") - sys.exit(0) - else: - print("Please provide a valid answer [O]verwrite/[E]xit: ") - - # Get rules - rule_file_paths: List[Path] = collect_rule_file_paths([old_rules_path]) - rule_contents = [rule_path.read_bytes() for rule_path in rule_file_paths] - - rules = [] # type: List[Rule] - for path, content in zip(rule_file_paths, rule_contents): - content = content.decode("utf-8") - content = yaml.load(content, Loader=yaml.Loader) - upgrade_rule(content) - - -if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) From 3368e1edb9c9bc1a852a7d0d1170d29c95d0a471 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Mon, 14 Aug 2023 00:50:55 +0300 Subject: [PATCH 07/24] added type annotation and logging --- scripts/upgrade-legacy-rules.py | 65 ++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 932541d11..faa7a3466 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -9,16 +9,16 @@ import sys +import logging import argparse import textwrap -from typing import List, Union, Literal, Optional +from typing import Any, Dict, List, Tuple, Union, Literal, Optional # noqa: F401 from pathlib import Path import yaml from typing_extensions import TypeAlias from capa.main import collect_rule_file_paths -from capa.features.address import NO_ADDRESS DYNAMIC_FEATURES = ("api", "string", "substring", "number", "description", "regex", "match", "os") DYNAMIC_CHARACTERISTICS = ("embedded-pe",) @@ -35,12 +35,14 @@ context: TypeAlias = Union[Literal["static"], Literal["dynamic"]] +logger = logging.getLogger("capa.show-features") -def rec_features_list(static: List[dict], context=False): + +def rec_features_list(static: List[dict], context=False) -> tuple[List[Dict], List[Dict]]: """ takes in a list of static features, and returns it alongside a list of dynamic-only features """ - dynamic = [] + dynamic = [] # type: List[Dict] for node in static: for key, value in node.items(): pass @@ -70,7 +72,7 @@ def rec_features_list(static: List[dict], context=False): return static, dynamic -def rec_scope(key, value): +def rec_scope(key: str, value: List) -> Tuple[Dict[str, List], Dict[str, Optional[List]]]: """ takes in a static subscope, and returns it alongside its dynamic counterpart. """ @@ -83,7 +85,7 @@ def rec_scope(key, value): return {key: value}, {} -def rec_bool(key, value, context=False): +def rec_bool(key, value, context=False) -> Tuple[Dict[str, List], Dict[str, Optional[List]]]: """ takes in a capa logical statement and returns a static and dynamic variation of it. """ @@ -96,6 +98,7 @@ def rec_bool(key, value, context=False): class NoAliasDumper(yaml.SafeDumper): + # This is used to get rid of aliases in yaml.dump()'s output def ignore_aliases(self, data): return True @@ -103,8 +106,12 @@ def increase_indent(self, flow=False, indentless=False): return super(NoAliasDumper, self).increase_indent(flow, indentless) -def update_meta(meta, has_dyn=True): - new_meta = {} +def update_meta(meta, has_dyn=True) -> Dict[str, Union[List, Dict, str]]: + """ + Takes in a meta field with the old `scope` keyword, + and replaces it with the `scopes` keyword while maintaining meta's keys order. + """ + new_meta = {} # type: Dict[str, Union[List, Dict, str]] for key, value in meta.items(): if key != "scope": if isinstance(value, list): @@ -119,7 +126,10 @@ def update_meta(meta, has_dyn=True): return new_meta -def upgrade_rule(content): +def upgrade_rule(content) -> str: + """ + Takes in an old rule and returns its equivalent in the new rule format. + """ features = content["rule"]["features"] for key, value in features[0].items(): @@ -136,10 +146,10 @@ def upgrade_rule(content): upgraded_rule = yaml.dump(content, Dumper=NoAliasDumper, sort_keys=False).split("\n") upgraded_rule = "\n".join(list(filter(lambda line: "~" not in line, upgraded_rule))) - print(upgraded_rule) + return upgraded_rule -def main(argv: Optional[List[str]] = None): +def main(argv: Optional[List[str]] = None) -> int: desc = ( "Upgrade legacy-format rulesets into the new rules format which supports static and dynamic analysis flavors." ) @@ -162,9 +172,9 @@ def main(argv: Optional[List[str]] = None): print( textwrap.dedent( """ - WARNING: you've specified the same directory for the old-rules' path and the new rules' save path. - This will cause this script to overwrite your old rules with the new upgraded ones. - Are you sure you want proceed with overwritting the old rules [O]verwrite/[E]xit: + WARNING: you've specified the same directory for the old-rules' path and the new rules' save path. + This will cause this script to overwrite your old rules with the new upgraded ones. + Are you sure you want proceed with overwritting the old rules [O]verwrite/[E]xit: """ ) ) @@ -175,7 +185,7 @@ def main(argv: Optional[List[str]] = None): print("Old rules' folder will be overwritten.") elif response == "e": print("The ruleset will not been upgraded.") - sys.exit(0) + return 0 else: print("Please provide a valid answer [O]verwrite/[E]xit: ") @@ -184,14 +194,27 @@ def main(argv: Optional[List[str]] = None): rule_contents = [rule_path.read_bytes() for rule_path in rule_file_paths] for path, content in zip(rule_file_paths, rule_contents): - content = content.decode("utf-8") - content = yaml.load(content, Loader=yaml.Loader) + """ + This loop goes through the list of rules and does the following: + 1. Get the current rule's content. + 2. Get its dynamic-format equivalent. + 3. Compute its save path and save it there. + """ + content = yaml.load(content.decode("utf-8"), Loader=yaml.Loader) new_rule = upgrade_rule(content) - save_path = Path(new_rules_save_path) - save_path = save_path.joinpath(path.relative_to(old_rules_path)) + save_path = Path(new_rules_save_path).joinpath(path.relative_to(old_rules_path)) save_path.parents[0].mkdir(parents=True, exist_ok=True) - with save_path.open("w", encoding="utf-8") as f: - f.write(new_rule) + try: + with save_path.open("w", encoding="utf-8") as f: + f.write(new_rule) + except IOError as e: + logger.error(f"{e}") + return -1 + else: + logger.error(f"updated rule: {path}") + + print(f"Successfully updated {len(rule_file_paths)} rules.") + return 0 if __name__ == "__main__": From f9946df567b12a1e22e02219a95c293da16fb263 Mon Sep 17 00:00:00 2001 From: Yacine <16624109+yelhamer@users.noreply.github.com> Date: Mon, 14 Aug 2023 11:27:05 +0200 Subject: [PATCH 08/24] Update scripts/upgrade-legacy-rules.py Co-authored-by: Moritz --- scripts/upgrade-legacy-rules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index faa7a3466..d8ee65ba0 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -35,7 +35,7 @@ context: TypeAlias = Union[Literal["static"], Literal["dynamic"]] -logger = logging.getLogger("capa.show-features") +logger = logging.getLogger(__name__) def rec_features_list(static: List[dict], context=False) -> tuple[List[Dict], List[Dict]]: From 6428de2f2a316e07e9b34ae6a0fd7885b78bbb7d Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Wed, 16 Aug 2023 00:45:16 +0200 Subject: [PATCH 09/24] upgrade-legacy-rules.py: refactor code and fix missing features --- scripts/upgrade-legacy-rules.py | 76 +++++++++++++++++++++++++-------- 1 file changed, 58 insertions(+), 18 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index faa7a3466..44350ccda 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -19,6 +19,7 @@ from typing_extensions import TypeAlias from capa.main import collect_rule_file_paths +from capa.rules import Rule DYNAMIC_FEATURES = ("api", "string", "substring", "number", "description", "regex", "match", "os") DYNAMIC_CHARACTERISTICS = ("embedded-pe",) @@ -50,15 +51,23 @@ def rec_features_list(static: List[dict], context=False) -> tuple[List[Dict], Li # is either subscope or ceng if key in (*STATIC_SCOPES, *DYNAMIC_SCOPES): # is subscope - stat, dyn = rec_scope(key, value) - if not context and dyn: - dynamic.append({"or": [stat, dyn]}) + stat, dyn = rec_scope(key, value, context=context) + if not context: + if dyn: + dynamic.append({"or": [stat, dyn]}) + else: + dynamic.append(stat) elif context == "dynamic" and dyn: dynamic.append(dyn) elif key in ENGINE_STATEMENTS or key.endswith("or more"): # is ceng - stat, dyn = rec_bool(key, value, context) - if dyn: + stat, dyn = rec_bool(key, value, context=context) + if not context: + if dyn: + dynamic.append(dyn) + else: + dynamic.append(stat) + elif context == "dynamic" and dyn: dynamic.append(dyn) else: raise ValueError(f"key: {key}, value: {value}") @@ -72,17 +81,38 @@ def rec_features_list(static: List[dict], context=False) -> tuple[List[Dict], Li return static, dynamic -def rec_scope(key: str, value: List) -> Tuple[Dict[str, List], Dict[str, Optional[List]]]: +def rec_scope(key: str, value: List, context=False) -> Tuple[Dict[str, List], Dict[str, Optional[List]]]: """ takes in a static subscope, and returns it alongside its dynamic counterpart. """ - if len(value) > 1 or (key == "instruction" and key not in ENGINE_STATEMENTS): - _, dynamic = rec_features_list([{"and": value}], context="dynamic") + if context == "static": + if key == "instruction": + stat, _ = rec_features_list([{"and": value}], context=context) + stat = stat[0]["and"] + else: + stat, _ = rec_bool(key, value, context=context) + return {key: stat}, {} + elif context == "dynamic": + if key == "instruction": + _, dyn = rec_features_list([{"and": value}], context=context) + else: + _, dyn = rec_bool(key, value, context=context) + if dyn: + return {}, {GET_DYNAMIC_EQUIV[key]: dyn} + else: + return {}, {} else: - _, dynamic = rec_features_list(value, context="dynamic") - if dynamic: - return {key: value}, {GET_DYNAMIC_EQUIV[key]: dynamic} - return {key: value}, {} + if key == "instruction": + stat, _ = rec_features_list([{"and": value}], context="static") + _, dyn = rec_features_list([{"and": value}], context="dynamic") + stat = stat[0]["and"] + else: + stat, _ = rec_features_list(value, context="static") + _, dyn = rec_features_list(value, context="dynamic") + if dyn: + return {key: stat}, {GET_DYNAMIC_EQUIV[key]: dyn} + else: + return {key: stat}, {} def rec_bool(key, value, context=False) -> Tuple[Dict[str, List], Dict[str, Optional[List]]]: @@ -90,11 +120,20 @@ def rec_bool(key, value, context=False) -> Tuple[Dict[str, List], Dict[str, Opti takes in a capa logical statement and returns a static and dynamic variation of it. """ stat, dyn = rec_features_list(value, context) - if key == "and" and len(stat) != len(dyn): - return {key: value}, {} - if dyn: - return {key: value}, {key: dyn} - return {key: value}, {} + if context == "static": + return {key: stat}, {} + elif context == "dynamic": + if key == "and" and len(stat) != len(dyn): + return {}, {} + elif dyn: + return {}, {key: dyn} + else: + return {}, {} + else: + if dyn: + return {key: stat}, {key: dyn} + else: + return {key: stat}, {} class NoAliasDumper(yaml.SafeDumper): @@ -146,7 +185,8 @@ def upgrade_rule(content) -> str: upgraded_rule = yaml.dump(content, Dumper=NoAliasDumper, sort_keys=False).split("\n") upgraded_rule = "\n".join(list(filter(lambda line: "~" not in line, upgraded_rule))) - return upgraded_rule + if Rule.from_yaml(upgraded_rule): + return upgraded_rule def main(argv: Optional[List[str]] = None) -> int: From dabef680c1794a172dd163a5684e5910904f384e Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Wed, 16 Aug 2023 10:36:31 +0200 Subject: [PATCH 10/24] removed newline identation --- scripts/upgrade-legacy-rules.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index a149d9d78..34d8d65a3 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -181,10 +181,11 @@ def upgrade_rule(content) -> str: else: features = stat + print(content) content["rule"] = {"meta": meta, "features": {"~": features}} - - upgraded_rule = yaml.dump(content, Dumper=NoAliasDumper, sort_keys=False).split("\n") + upgraded_rule = yaml.dump(content, Dumper=NoAliasDumper, sort_keys=False, width=float("inf")).split("\n") upgraded_rule = "\n".join(list(filter(lambda line: "~" not in line, upgraded_rule))) + print(upgraded_rule) if Rule.from_yaml(upgraded_rule): return upgraded_rule @@ -232,7 +233,6 @@ def main(argv: Optional[List[str]] = None) -> int: # Get rules rule_file_paths: List[Path] = collect_rule_file_paths([old_rules_path]) rule_contents = [rule_path.read_bytes() for rule_path in rule_file_paths] - for path, content in zip(rule_file_paths, rule_contents): """ This loop goes through the list of rules and does the following: From fb62d88c18ccde24ce605d0f2a6255bb54df7c09 Mon Sep 17 00:00:00 2001 From: Yacine <16624109+yelhamer@users.noreply.github.com> Date: Wed, 16 Aug 2023 11:20:56 +0200 Subject: [PATCH 11/24] Update scripts/upgrade-legacy-rules.py Co-authored-by: Moritz --- scripts/upgrade-legacy-rules.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 912aa549b..6e9724501 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -250,10 +250,10 @@ def main(argv: Optional[List[str]] = None) -> int: with save_path.open("w", encoding="utf-8") as f: f.write(new_rule) except IOError as e: - logger.error("%s" % e) + logger.error("%s", e) return -1 else: - logger.error("updated rule: %s" % path) + logger.error("updated rule: %s", path) print(f"Successfully updated {len(rule_file_paths)} rules.") return 0 From c0f7ac87b56335dad7676aa03ed13024d746d999 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Wed, 16 Aug 2023 11:53:00 +0200 Subject: [PATCH 12/24] added corner-case for `or` statements --- scripts/upgrade-legacy-rules.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 34d8d65a3..bae8dc857 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -65,8 +65,6 @@ def rec_features_list(static: List[dict], context=False) -> tuple[List[Dict], Li if not context: if dyn: dynamic.append(dyn) - else: - dynamic.append(stat) elif context == "dynamic" and dyn: dynamic.append(dyn) else: @@ -125,12 +123,18 @@ def rec_bool(key, value, context=False) -> Tuple[Dict[str, List], Dict[str, Opti elif context == "dynamic": if key == "and" and len(stat) != len(dyn): return {}, {} + elif key == "or" and len(dyn) == len(list(filter(lambda x: x.get("description"), dyn))): + return {}, {} elif dyn: return {}, {key: dyn} else: return {}, {} else: - if dyn: + if key == "and" and len(stat) != len(dyn): + return {key: stat}, {} + elif key == "or" and len(dyn) == len(list(filter(lambda x: x.get("description"), dyn))): + return {}, {} + elif dyn: return {key: stat}, {key: dyn} else: return {key: stat}, {} @@ -181,11 +185,9 @@ def upgrade_rule(content) -> str: else: features = stat - print(content) content["rule"] = {"meta": meta, "features": {"~": features}} upgraded_rule = yaml.dump(content, Dumper=NoAliasDumper, sort_keys=False, width=float("inf")).split("\n") upgraded_rule = "\n".join(list(filter(lambda line: "~" not in line, upgraded_rule))) - print(upgraded_rule) if Rule.from_yaml(upgraded_rule): return upgraded_rule From 81a592d700edfbe848872697759f6bfecc311b2e Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Wed, 16 Aug 2023 12:01:09 +0200 Subject: [PATCH 13/24] use %s instead of f-string in logging --- scripts/upgrade-legacy-rules.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index bae8dc857..912aa549b 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -45,13 +45,13 @@ def rec_features_list(static: List[dict], context=False) -> tuple[List[Dict], Li """ dynamic = [] # type: List[Dict] for node in static: - for key, value in node.items(): + for _key, _value in node.items(): pass - if isinstance(value, list): + if isinstance(_value, list): # is either subscope or ceng - if key in (*STATIC_SCOPES, *DYNAMIC_SCOPES): + if _key in (*STATIC_SCOPES, *DYNAMIC_SCOPES): # is subscope - stat, dyn = rec_scope(key, value, context=context) + stat, dyn = rec_scope(_key, _value, context=context) if not context: if dyn: dynamic.append({"or": [stat, dyn]}) @@ -59,22 +59,22 @@ def rec_features_list(static: List[dict], context=False) -> tuple[List[Dict], Li dynamic.append(stat) elif context == "dynamic" and dyn: dynamic.append(dyn) - elif key in ENGINE_STATEMENTS or key.endswith("or more"): + elif _key in ENGINE_STATEMENTS or _key.endswith("or more"): # is ceng - stat, dyn = rec_bool(key, value, context=context) + stat, dyn = rec_bool(_key, _value, context=context) if not context: if dyn: dynamic.append(dyn) elif context == "dynamic" and dyn: dynamic.append(dyn) else: - raise ValueError(f"key: {key}, value: {value}") - if key.startswith("count"): - key = key.split("(")[1].split(")")[0] - if key.startswith("characteristic"): - if value in DYNAMIC_CHARACTERISTICS: + raise ValueError(f"key: {_key}, value: {_value}") + if _key.startswith("count"): + _key = _key.split("(")[1].split(")")[0] + if _key.startswith("characteristic"): + if _value in DYNAMIC_CHARACTERISTICS: dynamic.append(node) - if key in DYNAMIC_FEATURES: + if _key in DYNAMIC_FEATURES: dynamic.append(node) return static, dynamic @@ -250,10 +250,10 @@ def main(argv: Optional[List[str]] = None) -> int: with save_path.open("w", encoding="utf-8") as f: f.write(new_rule) except IOError as e: - logger.error(f"{e}") + logger.error("%s" % e) return -1 else: - logger.error(f"updated rule: {path}") + logger.error("updated rule: %s" % path) print(f"Successfully updated {len(rule_file_paths)} rules.") return 0 From 8fd88cb89c671d598bd4317655249cdfaa2f5a18 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Wed, 16 Aug 2023 13:01:45 +0200 Subject: [PATCH 14/24] perserve numbers' hexadecimal representation --- scripts/upgrade-legacy-rules.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 912aa549b..d528051f9 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -8,6 +8,7 @@ # See the License for the specific language governing permissions and limitations under the License. +import re import sys import logging import argparse @@ -75,7 +76,7 @@ def rec_features_list(static: List[dict], context=False) -> tuple[List[Dict], Li if _value in DYNAMIC_CHARACTERISTICS: dynamic.append(node) if _key in DYNAMIC_FEATURES: - dynamic.append(node) + dynamic.append({_key: _value}) return static, dynamic @@ -188,6 +189,7 @@ def upgrade_rule(content) -> str: content["rule"] = {"meta": meta, "features": {"~": features}} upgraded_rule = yaml.dump(content, Dumper=NoAliasDumper, sort_keys=False, width=float("inf")).split("\n") upgraded_rule = "\n".join(list(filter(lambda line: "~" not in line, upgraded_rule))) + upgraded_rule = re.sub(r"number: '(\d+|0[xX][0-9a-fA-F]+)'", r"number: \1", upgraded_rule) if Rule.from_yaml(upgraded_rule): return upgraded_rule @@ -242,7 +244,7 @@ def main(argv: Optional[List[str]] = None) -> int: 2. Get its dynamic-format equivalent. 3. Compute its save path and save it there. """ - content = yaml.load(content.decode("utf-8"), Loader=yaml.Loader) + content = yaml.load(content.decode("utf-8"), Loader=yaml.BaseLoader) new_rule = upgrade_rule(content) save_path = Path(new_rules_save_path).joinpath(path.relative_to(old_rules_path)) save_path.parents[0].mkdir(parents=True, exist_ok=True) From 10d852d238d970a14c90c47693bd2d242282870f Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Wed, 16 Aug 2023 13:08:43 +0200 Subject: [PATCH 15/24] add double quotes around strings --- scripts/upgrade-legacy-rules.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 9cf5caaf0..4ad625428 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -190,6 +190,8 @@ def upgrade_rule(content) -> str: upgraded_rule = yaml.dump(content, Dumper=NoAliasDumper, sort_keys=False, width=float("inf")).split("\n") upgraded_rule = "\n".join(list(filter(lambda line: "~" not in line, upgraded_rule))) upgraded_rule = re.sub(r"number: '(\d+|0[xX][0-9a-fA-F]+)'", r"number: \1", upgraded_rule) + upgraded_rule = re.sub(r"string: (.*)", r'string: "\1"', upgraded_rule) + print(upgraded_rule) if Rule.from_yaml(upgraded_rule): return upgraded_rule From cc9975dac43171e9b7a9d77071f9e70e3230d40d Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 17 Aug 2023 10:09:58 +0200 Subject: [PATCH 16/24] handle escape characters --- scripts/upgrade-legacy-rules.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 4ad625428..e3e61897d 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -170,15 +170,19 @@ def update_meta(meta, has_dyn=True) -> Dict[str, Union[List, Dict, str]]: return new_meta +def format_escapes(s): + return s.replace("\\", "\\\\") + + def upgrade_rule(content) -> str: """ Takes in an old rule and returns its equivalent in the new rule format. """ features = content["rule"]["features"] - for key, value in features[0].items(): + for _key, _value in features[0].items(): pass - stat, dyn = rec_features_list([{key: value}]) + stat, dyn = rec_features_list([{_key: _value}]) meta = update_meta(content["rule"]["meta"], has_dyn=dyn) if dyn: @@ -190,8 +194,11 @@ def upgrade_rule(content) -> str: upgraded_rule = yaml.dump(content, Dumper=NoAliasDumper, sort_keys=False, width=float("inf")).split("\n") upgraded_rule = "\n".join(list(filter(lambda line: "~" not in line, upgraded_rule))) upgraded_rule = re.sub(r"number: '(\d+|0[xX][0-9a-fA-F]+)'", r"number: \1", upgraded_rule) - upgraded_rule = re.sub(r"string: (.*)", r'string: "\1"', upgraded_rule) - print(upgraded_rule) + upgraded_rule = re.sub( + r"(string|substring|regex): (.*)", + lambda x: f"{x.group(1)}: " + (f'"{format_escapes(x.group(2))}"' if '"' not in x.group(2) else x.group(2)), + upgraded_rule, + ) if Rule.from_yaml(upgraded_rule): return upgraded_rule From dd14824742eaae09289ae3e0ffac2f46f38e2b34 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 17 Aug 2023 10:38:16 +0200 Subject: [PATCH 17/24] fixed several issues --- scripts/upgrade-legacy-rules.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index e3e61897d..83433feec 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -70,13 +70,29 @@ def rec_features_list(static: List[dict], context=False) -> tuple[List[Dict], Li dynamic.append(dyn) else: raise ValueError(f"key: {_key}, value: {_value}") - if _key.startswith("count"): - _key = _key.split("(")[1].split(")")[0] + if _key == "offset": + if isinstance(_value, str) and "=" not in _value: + try: + node[_key] = int(node[_key]) + except: + node[_key] = int(node[_key], 16) if _key.startswith("characteristic"): if _value in DYNAMIC_CHARACTERISTICS: dynamic.append(node) + if _key == "string": + node[_key] = node[_key].replace("\n", "\\n") + if _key.startswith("count"): + if isinstance(node[_key], str) and "or more" not in node[_key]: + try: + node[_key] = int(node[_key]) + except: + try: + node[_key] = int(node[_key], 16) + except: + pass + _key = _key.split("(")[1].split(")")[0] if _key in DYNAMIC_FEATURES: - dynamic.append({_key: _value}) + dynamic.append(node) return static, dynamic @@ -170,7 +186,10 @@ def update_meta(meta, has_dyn=True) -> Dict[str, Union[List, Dict, str]]: return new_meta -def format_escapes(s): +def format_escapes(s: str): + s = s.replace("\n", "\\n") + if s.startswith("'") and s.endswith("'"): + s = s[1:-1] return s.replace("\\", "\\\\") @@ -199,6 +218,7 @@ def upgrade_rule(content) -> str: lambda x: f"{x.group(1)}: " + (f'"{format_escapes(x.group(2))}"' if '"' not in x.group(2) else x.group(2)), upgraded_rule, ) + print(upgraded_rule) if Rule.from_yaml(upgraded_rule): return upgraded_rule @@ -254,6 +274,7 @@ def main(argv: Optional[List[str]] = None) -> int: 3. Compute its save path and save it there. """ content = yaml.load(content.decode("utf-8"), Loader=yaml.BaseLoader) + print(path) new_rule = upgrade_rule(content) save_path = Path(new_rules_save_path).joinpath(path.relative_to(old_rules_path)) save_path.parents[0].mkdir(parents=True, exist_ok=True) From 1170c557ab727fa9f9db224305d9e1e27d29e1e7 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 17 Aug 2023 10:40:21 +0200 Subject: [PATCH 18/24] remove debugging statements --- scripts/upgrade-legacy-rules.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 83433feec..a40adbf43 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -186,7 +186,7 @@ def update_meta(meta, has_dyn=True) -> Dict[str, Union[List, Dict, str]]: return new_meta -def format_escapes(s: str): +def format_string(s: str): s = s.replace("\n", "\\n") if s.startswith("'") and s.endswith("'"): s = s[1:-1] @@ -215,10 +215,9 @@ def upgrade_rule(content) -> str: upgraded_rule = re.sub(r"number: '(\d+|0[xX][0-9a-fA-F]+)'", r"number: \1", upgraded_rule) upgraded_rule = re.sub( r"(string|substring|regex): (.*)", - lambda x: f"{x.group(1)}: " + (f'"{format_escapes(x.group(2))}"' if '"' not in x.group(2) else x.group(2)), + lambda x: f"{x.group(1)}: " + (f'"{format_string(x.group(2))}"' if '"' not in x.group(2) else x.group(2)), upgraded_rule, ) - print(upgraded_rule) if Rule.from_yaml(upgraded_rule): return upgraded_rule @@ -274,7 +273,6 @@ def main(argv: Optional[List[str]] = None) -> int: 3. Compute its save path and save it there. """ content = yaml.load(content.decode("utf-8"), Loader=yaml.BaseLoader) - print(path) new_rule = upgrade_rule(content) save_path = Path(new_rules_save_path).joinpath(path.relative_to(old_rules_path)) save_path.parents[0].mkdir(parents=True, exist_ok=True) From dc151c0dcba2db303a21f9c3a22c8ee7de1dc68a Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 17 Aug 2023 10:58:23 +0200 Subject: [PATCH 19/24] bugfix for or ceng statements --- scripts/upgrade-legacy-rules.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index a40adbf43..9d0ad2bf7 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -151,6 +151,8 @@ def rec_bool(key, value, context=False) -> Tuple[Dict[str, List], Dict[str, Opti return {key: stat}, {} elif key == "or" and len(dyn) == len(list(filter(lambda x: x.get("description"), dyn))): return {}, {} + elif key == "or" and len(dyn) != len(stat): + return {}, {key: dyn + [x for x in stat if x not in dyn]} elif dyn: return {key: stat}, {key: dyn} else: @@ -215,9 +217,11 @@ def upgrade_rule(content) -> str: upgraded_rule = re.sub(r"number: '(\d+|0[xX][0-9a-fA-F]+)'", r"number: \1", upgraded_rule) upgraded_rule = re.sub( r"(string|substring|regex): (.*)", - lambda x: f"{x.group(1)}: " + (f'"{format_string(x.group(2))}"' if '"' not in x.group(2) else x.group(2)), + lambda x: f"{x.group(1)}: " + + (f'"{format_string(x.group(2))}"' if ('"' not in x.group(2) and "\\" not in x.group(2)) else x.group(2)), upgraded_rule, ) + print(upgraded_rule) if Rule.from_yaml(upgraded_rule): return upgraded_rule @@ -273,6 +277,7 @@ def main(argv: Optional[List[str]] = None) -> int: 3. Compute its save path and save it there. """ content = yaml.load(content.decode("utf-8"), Loader=yaml.BaseLoader) + print(path) new_rule = upgrade_rule(content) save_path = Path(new_rules_save_path).joinpath(path.relative_to(old_rules_path)) save_path.parents[0].mkdir(parents=True, exist_ok=True) From 3175725c96490bc53a0f9a1b779e3071c0f51517 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 17 Aug 2023 21:02:13 +0200 Subject: [PATCH 20/24] update --- scripts/upgrade-legacy-rules.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 9d0ad2bf7..0d3794e92 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -22,7 +22,7 @@ from capa.main import collect_rule_file_paths from capa.rules import Rule -DYNAMIC_FEATURES = ("api", "string", "substring", "number", "description", "regex", "match", "os") +DYNAMIC_FEATURES = ("api", "string", "substring", "number", "description", "regex", "match", "os", "arch") DYNAMIC_CHARACTERISTICS = ("embedded-pe",) ENGINE_STATEMENTS = ("and", "or", "optional", "not") STATIC_SCOPES = ("function", "basic block", "instruction") @@ -139,7 +139,7 @@ def rec_bool(key, value, context=False) -> Tuple[Dict[str, List], Dict[str, Opti return {key: stat}, {} elif context == "dynamic": if key == "and" and len(stat) != len(dyn): - return {}, {} + return {key: stat}, {} elif key == "or" and len(dyn) == len(list(filter(lambda x: x.get("description"), dyn))): return {}, {} elif dyn: @@ -218,7 +218,7 @@ def upgrade_rule(content) -> str: upgraded_rule = re.sub( r"(string|substring|regex): (.*)", lambda x: f"{x.group(1)}: " - + (f'"{format_string(x.group(2))}"' if ('"' not in x.group(2) and "\\" not in x.group(2)) else x.group(2)), + + (x.group(2) if ('"' not in x.group(2) and "\\" not in x.group(2)) else f'"{format_string(x.group(2))}"'), upgraded_rule, ) print(upgraded_rule) From 2ac44235db0482a0bb38bbcbeb209d982890cc9d Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Sun, 20 Aug 2023 15:10:50 +0200 Subject: [PATCH 21/24] fix author's quoting --- scripts/upgrade-legacy-rules.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 0d3794e92..5d1a0f3c5 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -175,6 +175,8 @@ def update_meta(meta, has_dyn=True) -> Dict[str, Union[List, Dict, str]]: """ new_meta = {} # type: Dict[str, Union[List, Dict, str]] for key, value in meta.items(): + if key == "authors": + value = list(map(lambda author: f'"{author}"', value)) if key != "scope": if isinstance(value, list): new_meta[key] = {"~": value} @@ -218,10 +220,18 @@ def upgrade_rule(content) -> str: upgraded_rule = re.sub( r"(string|substring|regex): (.*)", lambda x: f"{x.group(1)}: " - + (x.group(2) if ('"' not in x.group(2) and "\\" not in x.group(2)) else f'"{format_string(x.group(2))}"'), + + ( + f'"{format_string(x.group(2))}"' + if ((x.group(2).startswith("'") and x.group(2).endswith("'")) or "\\" in x.group(2)) + else x.group(2) + ), + upgraded_rule, + ) + upgraded_rule = re.sub( + r" meta:\n([\S\s]*) features:", + lambda x: " meta:\n" + x.group(1).replace("'", "") + " features:", upgraded_rule, ) - print(upgraded_rule) if Rule.from_yaml(upgraded_rule): return upgraded_rule @@ -277,7 +287,6 @@ def main(argv: Optional[List[str]] = None) -> int: 3. Compute its save path and save it there. """ content = yaml.load(content.decode("utf-8"), Loader=yaml.BaseLoader) - print(path) new_rule = upgrade_rule(content) save_path = Path(new_rules_save_path).joinpath(path.relative_to(old_rules_path)) save_path.parents[0].mkdir(parents=True, exist_ok=True) From a9c049e91a26a3da648ca8c828b52aefdcfd1f57 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Sun, 20 Aug 2023 15:32:22 +0200 Subject: [PATCH 22/24] fix strings issue --- scripts/upgrade-legacy-rules.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 5d1a0f3c5..bbb122d4d 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -192,9 +192,10 @@ def update_meta(meta, has_dyn=True) -> Dict[str, Union[List, Dict, str]]: def format_string(s: str): s = s.replace("\n", "\\n") + s = s.replace("\\", "\\\\") if s.startswith("'") and s.endswith("'"): s = s[1:-1] - return s.replace("\\", "\\\\") + return s.replace('"', '\\"') def upgrade_rule(content) -> str: @@ -221,9 +222,9 @@ def upgrade_rule(content) -> str: r"(string|substring|regex): (.*)", lambda x: f"{x.group(1)}: " + ( - f'"{format_string(x.group(2))}"' - if ((x.group(2).startswith("'") and x.group(2).endswith("'")) or "\\" in x.group(2)) - else x.group(2) + x.group(2) + if (x.group(2).startswith("/") and (x.group(2).endswith("/") or x.group(2).endswith("/i"))) + else f'"{format_string(x.group(2))}"' ), upgraded_rule, ) From 7a68a180775401aee47e1645024a133f3048dd5f Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Mon, 21 Aug 2023 09:08:23 +0200 Subject: [PATCH 23/24] fix autor's quotes --- scripts/upgrade-legacy-rules.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index bbb122d4d..16e1f085f 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -175,8 +175,6 @@ def update_meta(meta, has_dyn=True) -> Dict[str, Union[List, Dict, str]]: """ new_meta = {} # type: Dict[str, Union[List, Dict, str]] for key, value in meta.items(): - if key == "authors": - value = list(map(lambda author: f'"{author}"', value)) if key != "scope": if isinstance(value, list): new_meta[key] = {"~": value} @@ -229,8 +227,15 @@ def upgrade_rule(content) -> str: upgraded_rule, ) upgraded_rule = re.sub( - r" meta:\n([\S\s]*) features:", - lambda x: " meta:\n" + x.group(1).replace("'", "") + " features:", + r"( authors:\n)([\S\s]*)(\n\s*\w)", + lambda x: x.group(1) + + re.sub( + r"(^\s*- )(.*)", + lambda y: y.group(1) + (f'"{y.group(2)[1:-1]}"' if y.group(2).startswith("'@") else y.group(2)), + x.group(2), + flags=re.M, + ) + + x.group(3), upgraded_rule, ) if Rule.from_yaml(upgraded_rule): @@ -262,9 +267,9 @@ def main(argv: Optional[List[str]] = None) -> int: """ WARNING: you've specified the same directory for the old-rules' path and the new rules' save path. This will cause this script to overwrite your old rules with the new upgraded ones. - Are you sure you want proceed with overwritting the old rules [O]verwrite/[E]xit: - """ - ) + Are you sure you want proceed with overwritting the old rules [O]verwrite/[E]xit: """ + ), + end="", ) response = "" while response not in ("o", "e"): @@ -275,7 +280,7 @@ def main(argv: Optional[List[str]] = None) -> int: print("The ruleset will not been upgraded.") return 0 else: - print("Please provide a valid answer [O]verwrite/[E]xit: ") + print("Please provide a valid answer [O]verwrite/[E]xit: ", end="") # Get rules rule_file_paths: List[Path] = collect_rule_file_paths([old_rules_path]) From 06eaafd20b432e563e78bef429d63454b24d5752 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Mon, 21 Aug 2023 19:14:17 +0200 Subject: [PATCH 24/24] other fixes --- scripts/upgrade-legacy-rules.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/scripts/upgrade-legacy-rules.py b/scripts/upgrade-legacy-rules.py index 16e1f085f..910a4bd3d 100755 --- a/scripts/upgrade-legacy-rules.py +++ b/scripts/upgrade-legacy-rules.py @@ -22,7 +22,7 @@ from capa.main import collect_rule_file_paths from capa.rules import Rule -DYNAMIC_FEATURES = ("api", "string", "substring", "number", "description", "regex", "match", "os", "arch") +DYNAMIC_FEATURES = ("api", "string", "substring", "number", "description", "regex", "match", "os", "arch", "format") DYNAMIC_CHARACTERISTICS = ("embedded-pe",) ENGINE_STATEMENTS = ("and", "or", "optional", "not") STATIC_SCOPES = ("function", "basic block", "instruction") @@ -147,7 +147,8 @@ def rec_bool(key, value, context=False) -> Tuple[Dict[str, List], Dict[str, Opti else: return {}, {} else: - if key == "and" and len(stat) != len(dyn): + _, dyn_ = rec_features_list(value, context="dynamic") + if key == "and" and len(stat) != len(dyn_): return {key: stat}, {} elif key == "or" and len(dyn) == len(list(filter(lambda x: x.get("description"), dyn))): return {}, {} @@ -238,6 +239,7 @@ def upgrade_rule(content) -> str: + x.group(3), upgraded_rule, ) + upgraded_rule = re.sub(r" comment (.*) ", r"# \1", upgraded_rule) if Rule.from_yaml(upgraded_rule): return upgraded_rule @@ -292,7 +294,23 @@ def main(argv: Optional[List[str]] = None) -> int: 2. Get its dynamic-format equivalent. 3. Compute its save path and save it there. """ - content = yaml.load(content.decode("utf-8"), Loader=yaml.BaseLoader) + content = content.decode("utf-8") + content = re.sub( + r"([\s]*-.*) # (.*)\n$", + lambda x: x.group(1) + + ( + "" + if ( + x.group(1).endswith("or:") + or not re.search(r"\s*- (string|substring|regex): /.*/i?", x.group(1), flags=re.M) + ) + else f" comment {x.group(2)}" + ), + content, + flags=re.M, + ) + content = re.sub(r"^\s*description: .*\n$", "", content, flags=re.M) + content = yaml.load(content, Loader=yaml.BaseLoader) new_rule = upgrade_rule(content) save_path = Path(new_rules_save_path).joinpath(path.relative_to(old_rules_path)) save_path.parents[0].mkdir(parents=True, exist_ok=True)