Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop script to highlight features unused during matching #1683

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### New Features
- ELF: implement file import and export name extractor #1607 #1608 @Aayush-Goel-04
- bump pydantic from 1.10.9 to 2.1.1 #1582 @Aayush-Goel-04
- develop script to highlight the features that are not used during matching #331 @Aayush-Goel-04

### Breaking Changes

Expand Down
27 changes: 27 additions & 0 deletions capa/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,33 @@ def extract_subscope_rules(self):

yield from self._extract_subscope_rules_rec(self.statement)

def _extract_all_features_rec(self, statement) -> Set[Feature]:
feature_set: Set[Feature] = set()

for child in statement.get_children():
if isinstance(child, Statement):
feature_set.update(self._extract_all_features_rec(child))
else:
feature_set.add(child)
return feature_set

def extract_all_features(self) -> Set[Feature]:
"""
recursively extracts all feature statements in this rule.
returns:
set: A set of all feature statements contained within this rule.
"""
if not isinstance(self.statement, ceng.Statement):
# For rules with single feature like
# anti-analysis\obfuscation\obfuscated-with-advobfuscator.yml
# contains a single feature - substring , which is of type String
return {
self.statement,
}

return self._extract_all_features_rec(self.statement)

def evaluate(self, features: FeatureSet, short_circuit=True):
capa.perf.counters["evaluate.feature"] += 1
capa.perf.counters["evaluate.feature.rule"] += 1
Expand Down
36 changes: 6 additions & 30 deletions scripts/detect_duplicate_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,56 +8,33 @@
import sys
import logging
import argparse
from typing import Set
from pathlib import Path

import capa.main
import capa.rules
import capa.engine as ceng
from capa.features.common import Feature

logger = logging.getLogger("detect_duplicate_features")


def get_child_features(feature: ceng.Statement) -> list:
"""
Recursively extracts all feature statements from a given rule statement.
Args:
feature (capa.engine.Statement): The feature statement to extract features from.
Returns:
list: A list of all feature statements contained within the given feature statement.
"""
children = []

if isinstance(feature, (ceng.And, ceng.Or, ceng.Some)):
for child in feature.children:
children.extend(get_child_features(child))
elif isinstance(feature, (ceng.Subscope, ceng.Range, ceng.Not)):
children.extend(get_child_features(feature.child))
else:
children.append(feature)
return children


def get_features(rule_path: str) -> list:
def get_features(rule_path: str) -> Set[Feature]:
"""
Extracts all features from a given rule file.
Args:
rule_path (str): The path to the rule file to extract features from.
Returns:
list: A list of all feature statements contained within the rule file.
set: A set of all feature statements contained within the rule file.
"""
feature_list = []
with Path(rule_path).open("r", encoding="utf-8") as f:
try:
new_rule = capa.rules.Rule.from_yaml(f.read())
feature_list = get_child_features(new_rule.statement)
return new_rule.extract_all_features()
except Exception as e:
logger.error("Error: New rule %s %s %s", rule_path, str(type(e)), str(e))
sys.exit(-1)
return feature_list


def find_overlapping_rules(new_rule_path, rules_path):
Expand All @@ -67,15 +44,14 @@ def find_overlapping_rules(new_rule_path, rules_path):

# Loads features of new rule in a list.
new_rule_features = get_features(new_rule_path)

count = 0
overlapping_rules = []

# capa.rules.RuleSet stores all rules in given paths
ruleset = capa.main.get_rules(rules_path)

for rule_name, rule in ruleset.rules.items():
rule_features = get_child_features(rule.statement)
rule_features = rule.extract_all_features()

if not len(rule_features):
continue
Expand Down
218 changes: 218 additions & 0 deletions scripts/show-unused-features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import typing
import logging
import argparse
from typing import Set, Tuple
from pathlib import Path
from collections import Counter

import tabulate
from termcolor import colored

import capa.main
import capa.rules
import capa.helpers
import capa.features
import capa.exceptions
import capa.render.verbose as v
import capa.features.common
import capa.features.freeze
import capa.features.address
import capa.features.extractors.pefile
import capa.features.extractors.base_extractor
from capa.helpers import log_unsupported_runtime_error
from capa.features.common import Feature
from capa.features.extractors.base_extractor import FunctionHandle

logger = logging.getLogger("show-unused-features")


def format_address(addr: capa.features.address.Address) -> str:
return v.format_address(capa.features.freeze.Address.from_capa((addr)))


def get_rules_feature_set(rules_path) -> Set[Feature]:
ruleset = capa.main.get_rules(rules_path)
rules_feature_set: Set[Feature] = set()
for _, rule in ruleset.rules.items():
rules_feature_set.update(rule.extract_all_features())

return rules_feature_set


def get_file_features(
functions: Tuple[FunctionHandle, ...], extractor: capa.features.extractors.base_extractor.FeatureExtractor
) -> typing.Counter[Feature]:
feature_map: typing.Counter[Feature] = Counter()

for f in functions:
if extractor.is_library_function(f.address):
function_name = extractor.get_function_name(f.address)
logger.debug("skipping library function %s (%s)", format_address(f.address), function_name)
continue

for feature, _ in extractor.extract_function_features(f):
if capa.features.common.is_global_feature(feature):
continue
feature_map.update([feature])

for bb in extractor.get_basic_blocks(f):
for feature, _ in extractor.extract_basic_block_features(f, bb):
if capa.features.common.is_global_feature(feature):
continue
feature_map.update([feature])

for insn in extractor.get_instructions(f, bb):
for feature, _ in extractor.extract_insn_features(f, bb, insn):
if capa.features.common.is_global_feature(feature):
continue
feature_map.update([feature])
return feature_map


def get_colored(s: str):
if "(" in s and ")" in s:
s_split = s.split("(", 1)
s_color = colored(s_split[1][:-1], "cyan")
return f"{s_split[0]}({s_color})"
else:
return colored(s, "cyan")


def print_unused_features(feature_map: typing.Counter[Feature], rules_feature_set: Set[Feature]):
unused_features = []
for feature, count in reversed(feature_map.most_common()):
if feature in rules_feature_set:
continue
unused_features.append((str(count), get_colored(str(feature))))
print("\n")
print(tabulate.tabulate(unused_features, headers=["Count", "Feature"], tablefmt="plain"))
print("\n")


def main(argv=None):
if argv is None:
argv = sys.argv[1:]

parser = argparse.ArgumentParser(description="Show the features that capa doesn't have rules for yet")
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend", "rules"})

parser.add_argument("-F", "--function", type=str, help="Show features for specific function")
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)

if args.function and args.backend == "pefile":
print("pefile backend does not support extracting function features")
return -1

try:
taste = capa.helpers.get_file_taste(Path(args.sample))
except IOError as e:
logger.error("%s", str(e))
return -1

try:
sig_paths = capa.main.get_signatures(args.signatures)
except IOError as e:
logger.error("%s", str(e))
return -1

if (args.format == "freeze") or (
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
):
extractor = capa.features.freeze.load(Path(args.sample).read_bytes())
else:
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try:
extractor = capa.main.get_extractor(
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
)
except capa.exceptions.UnsupportedFormatError:
capa.helpers.log_unsupported_format_error()
return -1
except capa.exceptions.UnsupportedRuntimeError:
log_unsupported_runtime_error()
return -1

feature_map: typing.Counter[Feature] = Counter()

feature_map.update([feature for feature, _ in extractor.extract_global_features()])

function_handles: Tuple[FunctionHandle, ...]
if isinstance(extractor, capa.features.extractors.pefile.PefileFeatureExtractor):
# pefile extractor doesn't extract function features
function_handles = ()
else:
function_handles = tuple(extractor.get_functions())

if args.function:
if args.format == "freeze":
function_handles = tuple(filter(lambda fh: fh.address == args.function, function_handles))
else:
function_handles = tuple(filter(lambda fh: format_address(fh.address) == args.function, function_handles))

if args.function not in [format_address(fh.address) for fh in function_handles]:
print(f"{args.function} not a function")
return -1

if len(function_handles) == 0:
print(f"{args.function} not a function")
return -1

Aayush-Goel-04 marked this conversation as resolved.
Show resolved Hide resolved
feature_map.update(get_file_features(function_handles, extractor))

rules_feature_set = get_rules_feature_set(args.rules)

print_unused_features(feature_map, rules_feature_set)
return 0


def ida_main():
import idc

import capa.main
import capa.features.extractors.ida.extractor

function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START)
print(f"getting features for current function {hex(function)}")

extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor()
feature_map: typing.Counter[Feature] = Counter()

feature_map.update([feature for feature, _ in extractor.extract_file_features()])

function_handles = tuple(extractor.get_functions())

if function:
function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles))

if len(function_handles) == 0:
print(f"{hex(function)} not a function")
return -1

feature_map.update(get_file_features(function_handles, extractor))

rules_path = capa.main.get_default_root() / "rules"
rules_feature_set = get_rules_feature_set([rules_path])

print_unused_features(feature_map, rules_feature_set)

return 0


if __name__ == "__main__":
if capa.helpers.is_runtime_ida():
ida_main()
else:
sys.exit(main())
1 change: 1 addition & 0 deletions tests/test_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def get_rule_path():
pytest.param("show-capabilities-by-function.py", [get_file_path()]),
pytest.param("show-features.py", [get_file_path()]),
pytest.param("show-features.py", ["-F", "0x407970", get_file_path()]),
pytest.param("show-unused-features.py", [get_file_path()]),
pytest.param("capa_as_library.py", [get_file_path()]),
],
)
Expand Down
Loading