Skip to content

Commit

Permalink
Merge pull request #7 from markmckinnon/New-Plugin-Loader
Browse files Browse the repository at this point in the history
Update to new plugin loader
  • Loading branch information
markmckinnon authored Jul 16, 2022
2 parents fc781eb + 041ba7b commit 58cc722
Show file tree
Hide file tree
Showing 43 changed files with 430 additions and 206 deletions.
24 changes: 24 additions & 0 deletions hook-plugin_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import plugin_loader

# Hey PyInstaller? Yeah you! Take a look at these plugins! I know they're not actually imported anywhere but you
# better believe that they will be a runtime, so, if you wouldn't mind, it'd be fantastic if you pretended that
# they're imported normally and pick up *their* imports. OK? Great. Fantastic.

print("Hooking plugins for pyinstaller")

loader = plugin_loader.PluginLoader()

tmp = []

for py_file in plugin_loader.PLUGINPATH.glob("*.py"):
mod = plugin_loader.PluginLoader.load_module_lazy(py_file)
try:
mod_artifacts = mod.__artifacts__
except AttributeError:
pass # any unconverted plugins still get checked out so they don't break the loader during runtime

tmp.append("scripts.artifacts." + mod.__name__) # TODO this is a hack, if we ever move plugins this breaks

print(f"{len(tmp)} plugins loaded as hidden imports")

hiddenimports = list(tmp)
61 changes: 38 additions & 23 deletions lleapp.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import argparse
import io
import scripts.report as report
import typing
import plugin_loader

from scripts.search_files import *
from scripts.lleap_artifacts import *
from scripts.version_info import lleapp_version
from time import process_time, gmtime, strftime

Expand All @@ -16,16 +17,18 @@ def main():
parser.add_argument('-w', '--wrap_text', required=False, action="store_false", help='do not wrap text for output of data files')

args = parser.parse_args()


loader = plugin_loader.PluginLoader()

if args.artifact_paths == True:
print('Artifact path list generation started.')
print('')
for key, value in tosearch.items():
if type(value[1]) is tuple:
for x in value[1]:
for plugin in loader.plugins:
if isinstance(plugin.search, tuple):
for x in plugin.search:
print(x)
else:
print(value[1])
print(plugin.search)
print('')
print('Artifact path list generation completed')
return
Expand Down Expand Up @@ -73,9 +76,9 @@ def main():

out_params = OutputParameters(output_path)

crunch_artifacts(tosearch, extracttype, input_path, out_params, 1, wrap_text)
crunch_artifacts(list(loader.plugins), extracttype, input_path, out_params, 1, wrap_text)

def crunch_artifacts(search_list, extracttype, input_path, out_params, ratio, wrap_text):
def crunch_artifacts(plugins: typing.Sequence[plugin_loader.PluginSpec], extracttype, input_path, out_params, ratio, wrap_text):
start = process_time()

logfunc('Processing started. Please wait. This may take a few minutes...')
Expand Down Expand Up @@ -108,7 +111,7 @@ def crunch_artifacts(search_list, extracttype, input_path, out_params, ratio, wr
return False

# Now ready to run
logfunc(f'Artifact categories to parse: {str(len(search_list))}')
logfunc(f'Artifact categories to parse: {str(len(plugins))}')
logfunc(f'File/Directory selected: {input_path}')
logfunc('\n--------------------------------------------------------------------------------------')

Expand All @@ -118,37 +121,49 @@ def crunch_artifacts(search_list, extracttype, input_path, out_params, ratio, wr

categories_searched = 0
# Search for the files per the arguments
for key, val in search_list.items():
for plugin in plugins:
search_regexes = []
artifact_pretty_name = val[0]
if isinstance(val[1], list) or isinstance(val[1], tuple):
search_regexes = val[1]
artifact_pretty_name = plugin.name
if isinstance(plugin.search, list) or isinstance(plugin.search, tuple):
search_regexes = plugin.search
else:
search_regexes.append(val[1])
search_regexes = [plugin.search]
files_found = []
for artifact_search_regex in search_regexes:
found = seeker.search(artifact_search_regex)
if not found:
logfunc()
logfunc(f'No files found for {key} -> {artifact_search_regex}')
log.write(f'No files found for {key} -> {artifact_search_regex}<br><br>')
logfunc(f'No files found for {plugin.name} -> {artifact_search_regex}')
log.write(f'No files found for {plugin.name} -> {artifact_search_regex}<br><br>')
else:
files_found.extend(found)
if files_found:
logfunc()
process_artifact(files_found, key, artifact_pretty_name, seeker, out_params.report_folder_base, wrap_text)
for pathh in files_found:
if pathh.startswith('\\\\?\\'):
pathh = pathh[4:]
log.write(f'Files for {artifact_search_regex} located at {pathh}<br><br>')
logfunc('{} [{}] artifact started'.format(plugin.name, plugin.module_name))
category_folder = os.path.join(out_params.report_folder_base, plugin.category)
if not os.path.exists(category_folder):
try:
os.mkdir(category_folder)
except (FileExistsError, FileNotFoundError) as ex:
logfunc('Error creating {} report directory at path {}'.format(plugin.name, category_folder))
logfunc('Error was {}'.format(str(ex)))
continue # cannot do work
try:
plugin.method(files_found, category_folder, seeker, wrap_text)
except Exception as ex:
logfunc('Reading {} artifact had errors!'.format(plugin.name))
logfunc('Error was {}'.format(str(ex)))
logfunc('Exception Traceback: {}'.format(traceback.format_exc()))
continue # nope

logfunc('{} [{}] artifact completed'.format(plugin.name, plugin.module_name))
categories_searched += 1
GuiWindow.SetProgressBar(categories_searched * ratio)
log.close()

logfunc('')
logfunc('Processes completed.')
end = process_time()
run_time_secs = end - start
run_time_secs = end - start
run_time_HMS = strftime('%H:%M:%S', gmtime(run_time_secs))
logfunc("Processing time = {}".format(run_time_HMS))

Expand Down
42 changes: 23 additions & 19 deletions lleappGUI.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import typing
import lleapp
import os
import PySimpleGUI as sg
import sys
import webbrowser
import plugin_loader

from scripts.lleapfuncs import *
from scripts.version_info import lleapp_version
from time import process_time, gmtime, strftime
from scripts.lleap_artifacts import *
from scripts.search_files import *

MODULE_START_INDEX = 1000

def ValidateInput(values, window):
'''Returns tuple (success, extraction_type)'''
global indx
Expand Down Expand Up @@ -60,25 +63,25 @@ def CheckList(mtxt, lkey, mdstring, disable=False):
def pickModules():
global indx
global mlist

script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'scripts', 'artifacts')
global loader

# Create sorted dict from 'tosearch' dictionary based on plugin category
sorted_tosearch = {k: v for k, v in sorted(tosearch.items(), key=lambda item: item[1][0])}
loader = plugin_loader.PluginLoader()

indx = 1000 # arbitrary number to not interfere with other controls
for key, val in sorted_tosearch.items():
mlist.append( CheckList(val[0] + f' [{key}]', indx, key) )
indx = MODULE_START_INDEX # arbitrary number to not interfere with other controls
for plugin in sorted(loader.plugins, key=lambda p: p.category.upper()):
disabled = plugin.module_name == 'osinfo'
mlist.append(CheckList(f'{plugin.category} [{plugin.name} - {plugin.module_name}.py]', indx, plugin.name, disabled))
indx = indx + 1

sg.theme('DarkTeal9') # Add a touch of color
# All the stuff inside your window.

normal_font = ("Helvetica", 12)
loader: typing.Optional[plugin_loader.PluginLoader] = None
mlist = []
# go through list of available modules and confirm they exist on the disk
pickModules()
GuiWindow.progress_bar_total = len(lleapp.tosearch)
GuiWindow.progress_bar_total = len(loader)


layout = [ [sg.Text('Linux Logs, Events, Application, Program Parser', font=("Helvetica", 22))],
Expand Down Expand Up @@ -113,12 +116,12 @@ def pickModules():

if event == "SELECT ALL":
# mark all modules
for x in range(1000,indx):
for x in range(MODULE_START_INDEX, indx):
window[x].Update(True)
if event == "DESELECT ALL":
# none modules
for x in range(1000,indx):
window[x].Update(False)
for x in range(MODULE_START_INDEX, indx):
window[x].Update(False if window[x].metadata != 'osinfo' else True) # oosinfo.py is REQUIRED
if event == 'Process':
#check is selections made properly; if not we will return to input form without exiting app altogether
is_valid, extracttype = ValidateInput(values, window)
Expand All @@ -134,14 +137,15 @@ def pickModules():
if output_folder[1] == ':': output_folder = '\\\\?\\' + output_folder.replace('/', '\\')

# re-create modules list based on user selection
search_list = {}
search_list = [loader['osInfo']] # hardcode usagestatsVersion as first item
s_items = 0
for x in range(1000,indx):
for x in range(MODULE_START_INDEX, indx):
if window.FindElement(x).Get():
if window[x].metadata in tosearch:
search_list[window[x].metadata] = tosearch[window[x].metadata]
s_items = s_items + 1 #for progress bar

key = window[x].metadata
if key in loader and key != 'osInfo':
search_list.append(loader[key])
s_items = s_items + 1 # for progress bar

# no more selections allowed
window[x].Update(disabled = True)

Expand All @@ -151,7 +155,7 @@ def pickModules():
GuiWindow.window_handle = window
out_params = OutputParameters(output_folder)
wrap_text = True
crunch_successful = lleapp.crunch_artifacts(search_list, extracttype, input_path, out_params, len(lleapp.tosearch)/s_items, wrap_text)
crunch_successful = lleapp.crunch_artifacts(search_list, extracttype, input_path, out_params, len(loader)/s_items, wrap_text)
if crunch_successful:
report_path = os.path.join(out_params.report_folder_base, 'index.html')

Expand Down
65 changes: 65 additions & 0 deletions plugin_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import pathlib
import dataclasses
import typing
import importlib.util

#PLUGINPATH = pathlib.Path("./scripts/artifacts")
# a bit long-winded to make compatible with PyInstaller
PLUGINPATH = pathlib.Path(__file__).resolve().parent / pathlib.Path("scripts/artifacts")


@dataclasses.dataclass(frozen=True)
class PluginSpec:
name: str
module_name: str
category: str
search: str
method: typing.Callable # todo define callable signature


class PluginLoader:
def __init__(self, plugin_path: typing.Optional[pathlib.Path] = None):
self._plugin_path = plugin_path or PLUGINPATH
self._plugins: dict[str, PluginSpec] = {}
self._load_plugins()

@staticmethod
def load_module_lazy(path: pathlib.Path):
spec = importlib.util.spec_from_file_location(path.stem, path)
loader = importlib.util.LazyLoader(spec.loader)
spec.loader = loader
mod = importlib.util.module_from_spec(spec)
loader.exec_module(mod)
return mod

def _load_plugins(self):
for py_file in self._plugin_path.glob("*.py"):
mod = PluginLoader.load_module_lazy(py_file)
try:
mod_artifacts = mod.__artifacts__
except AttributeError:
continue # no artifacts defined in this plugin

for name, (category, search, func) in mod_artifacts.items():
#self._plugins.append(PluginSpec(name, search, func))
if name in self._plugins:
raise KeyError("Duplicate plugin {name} ")
self._plugins[name] = PluginSpec(name, py_file.stem, category, search, func)

@property
def plugins(self) -> typing.Iterable[PluginSpec]:
yield from self._plugins.values()

def __getitem__(self, item: str) -> PluginSpec:
return self._plugins[item]

def __contains__(self, item):
return item in self._plugins

def __len__(self):
return len(self._plugins)





7 changes: 7 additions & 0 deletions scripts/artifacts/apacheLogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,10 @@ def get_apache_logs(files_found, report_folder, seeker, wrap_text):
else:
logfunc(f'No apache_method_bytes data available')


__artifacts__ = {
"apache_logs": (
"Apache Logs",
('**/var/logs/apache2/access.log'),
get_apache_logs)
}
8 changes: 8 additions & 0 deletions scripts/artifacts/aptHistory.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,11 @@ def get_apt_history_log(files_found, report_folder, seeker, wrap_text):
timeline(report_folder, tlactivity, data_removed_programs_list, data_headers)
else:
logfunc(f'No apt_programs_removed_log data available')


__artifacts__ = {
"apt_history_log": (
"APT Logs",
('**/var/logs/apt/history.log'),
get_apt_history_log)
}
7 changes: 7 additions & 0 deletions scripts/artifacts/authLog.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,10 @@ def get_auth_log(files_found, report_folder, seeker, wrap_text):
timeline(report_folder, tlactivity, failed_data_list, data_headers)
else:
logfunc(f'No auth_log FAILED data available')

__artifacts__ = {
"auth_log": (
"Logs",
('**/var/logs/auth.log'),
get_auth_log)
}
7 changes: 7 additions & 0 deletions scripts/artifacts/bashHistory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,10 @@ def get_bash_history(files_found, report_folder, seeker, wrap_text):

else:
logfunc(f'No bash history data for {user_name}available')

__artifacts__ = {
"bash_history": (
"Bash History",
('**/home/*/.bash_history'),
get_bash_history)
}
8 changes: 8 additions & 0 deletions scripts/artifacts/btmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@ def get_btmp(files_found, report_folder, seeker, wrap_text):
timeline(report_folder, tlactivity, data_list, data_headers)
else:
logfunc(f'No btmp data available')

__artifacts__ = {
"btmp": (
"Logs",
('**/var/logs/btmp'),
get_btmp)
}

9 changes: 8 additions & 1 deletion scripts/artifacts/chromium.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,11 @@ def get_chrome(files_found, report_folder, seeker, wrap_text):

logfunc(f'No {browser_name} {identifier} data available')

db.close()
db.close()

__artifacts__ = {
"chrome": (
"Browser",
('**/home/*/.config/google-chrome/default/History*'),
get_chrome)
}
7 changes: 7 additions & 0 deletions scripts/artifacts/chromiumAutofill.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,10 @@ def get_chromeAutofill(files_found, report_folder, seeker, wrap_text):
logfunc(f'No {browser_name} Autofill data available')

db.close()

__artifacts__ = {
"chromeAutofill": (
"Browser",
('**/home/*/.config/google-chrome/default/Web Data*'),
get_chromeAutofill)
}
Loading

0 comments on commit 58cc722

Please sign in to comment.