Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linux: Address limitations in determining KASLR shifts by introducing VMCoreInfo support #1332

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 104 additions & 3 deletions volatility3/framework/automagic/linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ def stack(
isf_url=isf_path,
)
context.symbol_space.append(table)

kaslr_shift, aslr_shift = cls.find_aslr(
context, table_name, layer_name, progress_callback=progress_callback
context,
table_name,
layer_name,
progress_callback=progress_callback,
)

layer_class: Type = intel.Intel
Expand Down Expand Up @@ -118,15 +122,25 @@ def stack(
return None

@classmethod
def find_aslr(
def find_aslr_classic(
cls,
context: interfaces.context.ContextInterface,
symbol_table: str,
layer_name: str,
progress_callback: constants.ProgressCallback = None,
) -> Tuple[int, int]:
"""Determines the offset of the actual DTB in physical space and its
symbol offset."""
symbol offset.

Args:
context: The context to retrieve required elements (layers, symbol tables) from
symbol_table: The name of the kernel module on which to operate
layer_name: The layer within the context in which the module exists
progress_callback: A function that takes a percentage (and an optional description) that will be called periodically

Returns:
kaslr_shirt and aslr_shift
"""
init_task_symbol = symbol_table + constants.BANG + "init_task"
init_task_json_address = context.symbol_space.get_symbol(
init_task_symbol
Expand Down Expand Up @@ -184,6 +198,58 @@ def find_aslr(
vollog.debug("Scanners could not determine any ASLR shifts, using 0 for both")
return 0, 0

@classmethod
def find_aslr_vmcoreinfo(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
progress_callback: constants.ProgressCallback = None,
) -> Optional[Tuple[int, int]]:
"""Determines the ASLR offsets using the VMCOREINFO ELF note

Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The layer within the context in which the module exists
progress_callback: A function that takes a percentage (and an optional description) that will be called periodically

Returns:
kaslr_shirt and aslr_shift
"""

for (
_vmcoreinfo_offset,
vmcoreinfo,
) in linux.VMCoreInfo.search_vmcoreinfo_elf_note(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SInce this is versioned separately, we should check the version above (and bail gracefully) if it's different to what we expected. That'll allow addition only bumps and major breaking bumps just fine.

context=context,
layer_name=layer_name,
progress_callback=progress_callback,
):

phys_base_str = vmcoreinfo.get("NUMBER(phys_base)")
if phys_base_str is None:
# We are in kernel (x86) < 4.10 401721ecd1dcb0a428aa5d6832ee05ffbdbffbbe where it was SYMBOL(phys_base)
# It's the symbol address instead of the value itself, which is useless for calculating the physical address.
continue

kerneloffset_str = vmcoreinfo.get("KERNELOFFSET")
if kerneloffset_str is None:
# KERNELOFFSET: (x86) kernels < 3.13 b6085a865762236bb84934161273cdac6dd11c2d
continue

aslr_shift = int(kerneloffset_str, 16)
kaslr_shift = int(phys_base_str) + aslr_shift

vollog.debug(
"Linux ASLR shift values found in VMCOREINFO ELF note: physical 0x%x virtual 0x%x",
kaslr_shift,
aslr_shift,
)

return kaslr_shift, aslr_shift

vollog.debug("The vmcoreinfo scanner could not determine any ASLR shifts")
return None

@classmethod
def virtual_to_physical_address(cls, addr: int) -> int:
"""Converts a virtual linux address to a physical one (does not account
Expand All @@ -192,6 +258,41 @@ def virtual_to_physical_address(cls, addr: int) -> int:
return addr - 0xFFFFFFFF80000000
return addr - 0xC0000000

@classmethod
def find_aslr(
cls,
context: interfaces.context.ContextInterface,
symbol_table: str,
layer_name: str,
progress_callback: constants.ProgressCallback = None,
) -> Tuple[int, int]:
"""Determines the offset of the actual DTB in physical space and its
symbol offset.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
symbol_table: The name of the kernel module on which to operate
layer_name: The layer within the context in which the module exists
progress_callback: A function that takes a percentage (and an optional description) that will be called periodically

Returns:
kaslr_shirt and aslr_shift
"""

aslr_shifts = cls.find_aslr_vmcoreinfo(
context, layer_name, progress_callback=progress_callback
)
if aslr_shifts:
kaslr_shift, aslr_shift = aslr_shifts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the rest of the automagic ever validate these values in any way? If not, perhaps they should (checking for an ELF signature or mapping the virtual kernel to the physical one and checking a number of bytes match, just something to make sure the map works correctly)?

else:
# Fallback to the traditional scanner method
kaslr_shift, aslr_shift = cls.find_aslr_classic(
context,
symbol_table,
layer_name,
progress_callback=progress_callback,
)
return kaslr_shift, aslr_shift


class LinuxSymbolFinder(symbol_finder.SymbolFinder):
"""Linux symbol loader based on uname signature strings."""
Expand Down
6 changes: 6 additions & 0 deletions volatility3/framework/constants/linux/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,9 @@ def flags(self) -> str:
MODULE_MAXIMUM_CORE_SIZE = 20000000
MODULE_MAXIMUM_CORE_TEXT_SIZE = 20000000
MODULE_MINIMUM_SIZE = 4096

# VMCOREINFO
VMCOREINFO_MAGIC = b"VMCOREINFO\x00"
# Aligned to 4 bytes. See storenote() in kernels < 4.19 or append_kcore_note() in kernels >= 4.19
VMCOREINFO_MAGIC_ALIGNED = VMCOREINFO_MAGIC + b"\x00"
OSRELEASE_TAG = b"OSRELEASE="
49 changes: 49 additions & 0 deletions volatility3/framework/plugins/linux/vmcoreinfo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

from typing import List

from volatility3.framework import renderers, interfaces
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.symbols import linux
from volatility3.framework.renderers import format_hints


class VMCoreInfo(plugins.PluginInterface):
"""Enumerate VMCoreInfo tables"""

_required_framework_version = (2, 11, 0)
_version = (1, 0, 0)

@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.TranslationLayerRequirement(
name="primary", description="Memory layer to scan"
),
requirements.VersionRequirement(
name="VMCoreInfo", component=linux.VMCoreInfo, version=(1, 0, 0)
),
]

def _generator(self):
layer_name = self.config["primary"]
for (
vmcoreinfo_offset,
vmcoreinfo,
) in linux.VMCoreInfo.search_vmcoreinfo_elf_note(
context=self.context,
layer_name=layer_name,
):
for key, value in vmcoreinfo.items():
yield 0, (format_hints.Hex(vmcoreinfo_offset), key, value)

def run(self):
headers = [
("Offset", format_hints.Hex),
("Key", str),
("Value", str),
]
return renderers.TreeGrid(headers, self._generator())
102 changes: 101 additions & 1 deletion volatility3/framework/symbols/linux/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import math
import string
import contextlib
from abc import ABC, abstractmethod
from typing import Iterator, List, Tuple, Optional, Union
from typing import Iterator, List, Tuple, Optional, Union, Dict

from volatility3 import framework
from volatility3.framework import constants, exceptions, interfaces, objects
from volatility3.framework.objects import utility
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.linux import extensions
from volatility3.framework.layers import scanners
from volatility3.framework.constants import linux as linux_constants


class LinuxKernelIntermedSymbols(intermed.IntermediateSymbolTable):
Expand Down Expand Up @@ -832,3 +835,100 @@ def get_cached_pages(self) -> Iterator[interfaces.objects.ObjectInterface]:
page = self.vmlinux.object("page", offset=page_addr, absolute=True)
if page:
yield page


class VMCoreInfo(interfaces.configuration.VersionableInterface):
_required_framework_version = (2, 11, 0)

_version = (1, 0, 0)

@staticmethod
def _vmcoreinfo_data_to_dict(
vmcoreinfo_data,
) -> Optional[Dict[str, str]]:
"""Converts the input VMCoreInfo data buffer into a dictionary"""

# Ensure the whole buffer is printable
if not all(c in string.printable.encode() for c in vmcoreinfo_data):
# Abort, we are in the wrong place
return None

vmcoreinfo_dict = dict()
for line in vmcoreinfo_data.decode().splitlines():
if not line:
break

key, value = line.split("=", 1)
vmcoreinfo_dict[key] = value

return vmcoreinfo_dict

@classmethod
def search_vmcoreinfo_elf_note(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
progress_callback: constants.ProgressCallback = None,
) -> Iterator[Tuple[int, Dict[str, str]]]:
"""Enumerates each VMCoreInfo ELF note table found in memory along with its offset.

This approach is independent of any external ISF symbol or type, requiring only the
Elf64_Note found in 'elf.json', which is already included in the framework.

Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The layer within the context in which the module exists
progress_callback: A function that takes a percentage (and an optional description) that will be called periodically

Yields:
Tuples with the VMCoreInfo ELF note offset and the VMCoreInfo table parsed in a dictionary.
"""

elf_table_name = intermed.IntermediateSymbolTable.create(
context, "elf_symbol_table", "linux", "elf"
)
module = context.module(elf_table_name, layer_name, 0)
layer = context.layers[layer_name]

# Both Elf32_Note and Elf64_Note are of the same size
elf_note_size = context.symbol_space[elf_table_name].get_type("Elf64_Note").size

for vmcoreinfo_offset in layer.scan(
scanner=scanners.BytesScanner(linux_constants.VMCOREINFO_MAGIC_ALIGNED),
context=context,
progress_callback=progress_callback,
):
# vmcoreinfo_note kernels >= 2.6.24 fd59d231f81cb02870b9cf15f456a897f3669b4e
vmcoreinfo_elf_note_offset = vmcoreinfo_offset - elf_note_size

# Elf32_Note and Elf64_Note are identical, so either can be used interchangeably here
elf_note = module.object(
object_type="Elf64_Note",
offset=vmcoreinfo_elf_note_offset,
absolute=True,
)

# Ensure that we are within an ELF note
if (
elf_note.n_namesz != len(linux_constants.VMCOREINFO_MAGIC)
or elf_note.n_type != 0
or elf_note.n_descsz == 0
):
continue

vmcoreinfo_data_offset = vmcoreinfo_offset + len(
linux_constants.VMCOREINFO_MAGIC_ALIGNED
)

# Also, confirm this with the first tag, which has consistently been OSRELEASE
vmcoreinfo_data = layer.read(vmcoreinfo_data_offset, elf_note.n_descsz)
if not vmcoreinfo_data.startswith(linux_constants.OSRELEASE_TAG):
continue

table = cls._vmcoreinfo_data_to_dict(vmcoreinfo_data)
if not table:
# Wrong VMCoreInfo note offset, keep trying
continue

# A valid VMCoreInfo ELF note exists at 'vmcoreinfo_elf_note_offset'
yield vmcoreinfo_elf_note_offset, table
Loading