Skip to content

Commit 2dbc06f

Browse files
authored
Merge pull request #1825 from SolitudePy/pebmasq
pebmasquerade plugin
2 parents 78e05f9 + 0e17057 commit 2dbc06f

File tree

1 file changed

+234
-0
lines changed

1 file changed

+234
-0
lines changed
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
import logging
2+
from typing import List, Union, Tuple
3+
4+
from volatility3.framework import interfaces, renderers, exceptions
5+
from volatility3.framework.configuration import requirements
6+
from volatility3.framework.objects import utility
7+
from volatility3.plugins.windows import pslist
8+
9+
vollog = logging.getLogger(__name__)
10+
11+
12+
# https://www.ired.team/offensive-security/defense-evasion/masquerading-processes-in-userland-through-_peb
13+
# https://github.com/FuzzySecurity/PowerShell-Suite/blob/master/Masquerade-PEB.ps1
14+
class PebMasquerade(interfaces.plugins.PluginInterface):
15+
"""Detects potential process name spoofing by comparing EPROCESS and PEB data."""
16+
17+
_version = (1, 0, 0)
18+
_required_framework_version = (2, 27, 0)
19+
20+
@classmethod
21+
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
22+
return [
23+
requirements.ModuleRequirement(
24+
name="kernel",
25+
description="Windows kernel",
26+
architectures=["Intel32", "Intel64"],
27+
),
28+
requirements.VersionRequirement(
29+
name="pslist", component=pslist.PsList, version=(3, 0, 0)
30+
),
31+
requirements.ListRequirement(
32+
name="pid",
33+
element_type=int,
34+
description="Process ID to include (all other processes are excluded)",
35+
optional=True,
36+
),
37+
]
38+
39+
@classmethod
40+
def get_process_names(cls, proc: interfaces.objects.ObjectInterface) -> Tuple[
41+
Union[str, renderers.NotAvailableValue],
42+
Union[str, renderers.NotAvailableValue],
43+
Union[str, renderers.NotAvailableValue],
44+
Union[str, renderers.NotAvailableValue],
45+
]:
46+
"""Extract process names and related information from various sources (EPROCESS and PEB).
47+
48+
Args:
49+
proc: The process object
50+
51+
Returns:
52+
tuple: (eprocess_imagefilename, eprocess_seaudit_imagefilename, peb_imagefilepath, peb_cmdline)
53+
"""
54+
eprocess_imagefilename = renderers.NotAvailableValue()
55+
eprocess_seaudit_imagefilename = renderers.NotAvailableValue()
56+
peb_imagefilepath = renderers.NotAvailableValue()
57+
peb_cmdline = renderers.NotAvailableValue()
58+
59+
try:
60+
eprocess_imagefilename = utility.array_to_string(proc.ImageFileName)
61+
except (AttributeError, exceptions.InvalidAddressException):
62+
vollog.debug(
63+
"Unable to read EPROCESS.ImageFileName for PID %d", proc.UniqueProcessId
64+
)
65+
except Exception as e:
66+
vollog.warning(
67+
"Error reading EPROCESS.ImageFileName for PID %d: %s",
68+
proc.UniqueProcessId,
69+
str(e),
70+
)
71+
72+
try:
73+
audit = proc.SeAuditProcessCreationInfo.ImageFileName.Name
74+
audit_string = audit.get_string()
75+
if audit_string:
76+
eprocess_seaudit_imagefilename = audit_string
77+
except exceptions.InvalidAddressException:
78+
vollog.debug(
79+
"Unable to read SeAuditProcessCreationInfo.ImageFileName for PID %d",
80+
proc.UniqueProcessId,
81+
)
82+
except AttributeError:
83+
vollog.debug(
84+
"SeAuditProcessCreationInfo structure not available for PID %d",
85+
proc.UniqueProcessId,
86+
)
87+
except Exception as e:
88+
vollog.warning(
89+
"Error reading SeAuditProcessCreationInfo for PID %d: %s",
90+
proc.UniqueProcessId,
91+
str(e),
92+
)
93+
94+
try:
95+
peb = proc.get_peb()
96+
if peb and peb.ProcessParameters:
97+
# Get ImagePathName
98+
try:
99+
image_path_str = peb.ProcessParameters.ImagePathName.get_string()
100+
if image_path_str:
101+
peb_imagefilepath = image_path_str
102+
except (AttributeError, exceptions.InvalidAddressException):
103+
vollog.debug(
104+
"Unable to read PEB.ImagePathName for PID %d",
105+
proc.UniqueProcessId,
106+
)
107+
except Exception as e:
108+
vollog.warning(
109+
"Error reading PEB.ImagePathName for PID %d: %s",
110+
proc.UniqueProcessId,
111+
str(e),
112+
)
113+
114+
try:
115+
cmdline_str = peb.ProcessParameters.CommandLine.get_string()
116+
if cmdline_str:
117+
peb_cmdline = cmdline_str
118+
except (AttributeError, exceptions.InvalidAddressException):
119+
vollog.debug(
120+
"Unable to read PEB.ProcessParameters.CommandLine for PID %d",
121+
proc.UniqueProcessId,
122+
)
123+
except Exception as e:
124+
vollog.warning(
125+
"Error reading PEB.ProcessParameters.CommandLine for PID %d: %s",
126+
proc.UniqueProcessId,
127+
str(e),
128+
)
129+
except (AttributeError, exceptions.InvalidAddressException):
130+
# Important for cases where PEB does not exist or is inaccessible (e.g SYSTEM process)
131+
vollog.debug("Unable to access PEB for PID %d", proc.UniqueProcessId)
132+
except Exception as e:
133+
vollog.warning(
134+
"Error accessing PEB for PID %d: %s", proc.UniqueProcessId, str(e)
135+
)
136+
137+
return (
138+
eprocess_imagefilename,
139+
eprocess_seaudit_imagefilename,
140+
peb_imagefilepath,
141+
peb_cmdline,
142+
)
143+
144+
def _generator(self, pids, context, kernel_module_name):
145+
pid_filter = pslist.PsList.create_pid_filter(pids)
146+
147+
for proc in pslist.PsList.list_processes(
148+
context=context,
149+
kernel_module_name=kernel_module_name,
150+
filter_func=pid_filter,
151+
):
152+
proc_id = proc.UniqueProcessId
153+
try:
154+
peb = proc.get_peb()
155+
except (exceptions.InvalidAddressException, AttributeError):
156+
vollog.debug(
157+
"Unable to access PEB for PID %d, skipping process", proc_id
158+
)
159+
peb_imagefilepath_length_check = False
160+
peb_cmdline_length_check = False
161+
(
162+
eprocess_imagefilename,
163+
eprocess_seaudit_imagefilename,
164+
peb_imagefilepath,
165+
peb_cmdline,
166+
) = PebMasquerade.get_process_names(proc)
167+
168+
if isinstance(peb_imagefilepath, str) and peb:
169+
try:
170+
171+
# Length values are of type USHORT
172+
peb_imagefilepath_length = (
173+
peb.ProcessParameters.ImagePathName.Length // 2
174+
)
175+
peb_imagefilepath_maxlength = (
176+
peb.ProcessParameters.ImagePathName.MaximumLength // 2 - 1
177+
)
178+
179+
if (peb_imagefilepath_length != len(peb_imagefilepath)) or (
180+
peb_imagefilepath_maxlength != len(peb_imagefilepath)
181+
):
182+
peb_imagefilepath_length_check = True
183+
except Exception as e:
184+
vollog.warning(
185+
"PEB.ImagePathName Length comparison error for PID %d: %s",
186+
proc_id,
187+
str(e),
188+
)
189+
190+
if isinstance(peb_cmdline, str) and peb:
191+
try:
192+
# Length values are of type USHORT
193+
peb_cmdline_length = peb.ProcessParameters.CommandLine.Length // 2
194+
peb_cmdline_maxlength = (
195+
peb.ProcessParameters.CommandLine.MaximumLength // 2 - 1
196+
)
197+
198+
if (peb_cmdline_length != len(peb_cmdline)) or (
199+
peb_cmdline_maxlength != len(peb_cmdline)
200+
):
201+
peb_cmdline_length_check = True
202+
except Exception as e:
203+
vollog.warning(
204+
"PEB.CommandLine Length comparison error for PID %d: %s",
205+
proc_id,
206+
str(e),
207+
)
208+
yield (
209+
0,
210+
(
211+
proc_id,
212+
eprocess_imagefilename,
213+
eprocess_seaudit_imagefilename,
214+
peb_imagefilepath,
215+
peb_cmdline_length_check,
216+
peb_imagefilepath_length_check,
217+
),
218+
)
219+
220+
def run(self):
221+
pids = self.config.get("pid", None)
222+
context = self.context
223+
kernel_module_name = self.config["kernel"]
224+
return renderers.TreeGrid(
225+
[
226+
("PID", int),
227+
("EPROCESS_ImageFileName", str),
228+
("EPROCESS_SeAudit_ImageFileName", str),
229+
("PEB_ImageFilePath", str),
230+
("PEB_ImageFilePath_Spoofed", bool),
231+
("PEB_CommandLine_Spoofed", bool),
232+
],
233+
self._generator(pids, context, kernel_module_name),
234+
)

0 commit comments

Comments
 (0)