Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created new window pipe for sliding window aggregations #3

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/query-guide/pipes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,30 @@ Get the top five network connections that transmitted the most data
| sort total_out_bytes
| tail 5

``window``
---------
The ``window`` pipe will buffer events based on the timespan specify, which allows other pipes to function on a sliding
window. This allows pipes to function when streaming data continuously.

Find suspicious recon commands that were executed within a 5 minute window
.. code-block:: eql

process where process_name in ("whoami.exe", "netstat.exe", "hostname.exe", "net.exe", "sc.exe", "systeminfo.exe")
| window 5m
| unique process_name
| unique_count
| filter count >= 3

Find processes that have network connections to a single host with over 100 unique ports within a 10 second window
.. code-block:: eql

network where wildcard(destination_address, "10.*", "172.*", "192.*")
| window 10s
| unique_count process_name, destination_port
| filter count >= 100

.. note::

The window pipe will emit all events within the window buffer from the first event, meaning events will appear like
so: [[1], [1,2], [1,2,3], ...]. Therefore, it is recommended to use a combination of ``unique_count`` and
``filter`` to only show events over a certain threshold.
100 changes: 82 additions & 18 deletions eql/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,20 +463,32 @@ def _convert_count_pipe(self, node, next_pipe): # type: (CountPipe, callable) -
if len(node.arguments) == 0:
# Counting only the total
summary = {'key': 'totals', 'count': 0}
hosts = set()

# mutable scoped variable
hosts = [set()]

def count_total_callback(events):
if events is PIPE_EOF:
if len(hosts):
summary['total_hosts'] = len(hosts)
summary['hosts'] = list(sorted(hosts))
# immutable version of summary
event = summary.copy()

if len(hosts[0]):
event['total_hosts'] = len(hosts[0])
event['hosts'] = list(sorted(hosts[0]))

next_pipe([Event(EVENT_TYPE_GENERIC, 0, summary)])
next_pipe([Event(EVENT_TYPE_GENERIC, 0, event)])
next_pipe(PIPE_EOF)

# reset state
summary['count'] = 0
if len(hosts[0]):
del summary['hosts']
del summary['total_hosts']
hosts[0] = set()
else:
summary['count'] += 1
if host_key in events[0].data:
hosts.add(events[0].data[host_key])
hosts[0].add(events[0].data[host_key])

return count_total_callback

Expand Down Expand Up @@ -506,6 +518,9 @@ def count_tuple_callback(events): # type: (list[Event]) -> None
details['percent'] = float(details['count']) / total
next_pipe([Event(EVENT_TYPE_GENERIC, 0, details)])
next_pipe(PIPE_EOF)

# reset state
count_table.clear()
else:
key = get_key(events)
insensitive_key = remove_case(key)
Expand All @@ -529,18 +544,20 @@ def filter_callback(events): # type: (list[Event]) -> None
return filter_callback

def _convert_head_pipe(self, node, next_pipe): # type: (HeadPipe, callable) -> callable
totals = [0] # has to be mutable because of python scoping
output_buffer = []
max_count = node.count

def head_callback(events):
if totals[0] < max_count:
if events is PIPE_EOF:
next_pipe(PIPE_EOF)
else:
totals[0] += 1
next_pipe(events)
if totals[0] == max_count:
next_pipe(PIPE_EOF)
if events is PIPE_EOF:
for output in output_buffer:
next_pipe(output)
next_pipe(PIPE_EOF)

# reset state
output_buffer.clear()
else:
if len(output_buffer) < max_count:
output_buffer.append(events)

return head_callback

Expand All @@ -552,6 +569,9 @@ def tail_callback(events):
for output in output_buffer:
next_pipe(output)
next_pipe(PIPE_EOF)

# reset state
output_buffer.clear()
else:
output_buffer.append(events)

Expand All @@ -572,6 +592,9 @@ def get_converted_key(buffer_events):
for output in output_buffer:
next_pipe(output)
next_pipe(PIPE_EOF)

# reset state
output_buffer.clear()
else:
output_buffer.append(events)

Expand All @@ -584,6 +607,9 @@ def _convert_unique_pipe(self, node, next_pipe): # type: (UniquePipe, callable)
def unique_callback(events):
if events is PIPE_EOF:
next_pipe(PIPE_EOF)

# reset state
seen.clear()
else:
key = get_unique_key(events)
if key not in seen:
Expand All @@ -592,6 +618,32 @@ def unique_callback(events):

return unique_callback

def _convert_window_pipe(self, node, next_pipe): # type: (WindowPipe) -> callable
"""Aggregate events over a sliding window using a buffer."""
window_buf = deque() # tuple of (timestamp, events)
timespan = self.convert(node.timespan)

def time_window_callback(events): # type: (list[Event]) -> None
if events is PIPE_EOF:
next_pipe(PIPE_EOF)

# reset state
window_buf.clear()
else:
minimum_start = events[0].time - timespan

# Remove any events that no longer sit within the time window
while len(window_buf) > 0 and window_buf[0][0] < minimum_start:
window_buf.popleft()

window_buf.append((events[0].time, events))

for result in window_buf:
next_pipe(result[1])
next_pipe(PIPE_EOF)

return time_window_callback

def _convert_unique_count_pipe(self, node, next_pipe): # type: (CountPipe) -> callable
"""Aggregate counts coming into the pipe."""
host_key = self.host_key
Expand All @@ -613,6 +665,8 @@ def count_unique_callback(events): # type: (list[Event]) -> None
next_pipe(result)
next_pipe(PIPE_EOF)

# reset state
results.clear()
else:
# Create a copy of these, because they can be modified
events = [events[0].copy()] + events[1:]
Expand Down Expand Up @@ -648,12 +702,19 @@ def _reduce_count_pipe(self, node, next_pipe): # type: (CountPipe) -> callable
def count_total_aggregates(events): # type: (list[Event]) -> None
if events is PIPE_EOF:
hosts = result.pop('hosts') # type: set

# immutable version of result
event = result.copy()
if len(hosts) > 0:
result['hosts'] = list(sorted(hosts))
result['total_hosts'] = len(hosts)
event['hosts'] = list(sorted(hosts))
event['total_hosts'] = len(hosts)

next_pipe([Event(EVENT_TYPE_GENERIC, 0, result)])
next_pipe([Event(EVENT_TYPE_GENERIC, 0, event)])
next_pipe(PIPE_EOF)

# reset state
result['count'] = 0
result['hosts'] = set()
else:
piece = events[0].data
result['count'] += piece['count']
Expand Down Expand Up @@ -684,6 +745,9 @@ def count_tuple_callback(events): # type: (list[Event]) -> None
result['percent'] = float(result['count']) / total
next_pipe([Event(EVENT_TYPE_GENERIC, 0, result)])
next_pipe(PIPE_EOF)

# reset state
results.clear()
else:
piece = events[0].data
key = events[0].data['key']
Expand Down
2 changes: 1 addition & 1 deletion eql/etc/eql.g
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ sequence: "sequence" [join_values with_params? | with_params join_values?] subqu
join: "join" join_values? subquery_by subquery_by+ until_subquery_by?
until_subquery_by.2: "until" subquery_by
pipes: pipe+
pipe: "|" name [single_atom single_atom+ | expressions]
pipe: "|" name [single_atom single_atom+ | time_range | expressions]

join_values.2: "by" expressions
?with_params.2: "with" named_params
Expand Down
23 changes: 22 additions & 1 deletion eql/etc/test_queries.toml
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ sequence
'''
expected_event_ids = [1, 2, 2, 3]


[[queries]]
query = '''
sequence
Expand Down Expand Up @@ -1296,3 +1295,25 @@ expected_event_ids = []
query = '''
process where length(between(process_name, 'g', 'z')) > 0
'''

[[queries]]
expected_event_ids = [11, 50]
description = "test window pipe"
query = '''
process where subtype == "create" |
window 5m |
unique parent_process_name, process_name |
unique_count parent_process_name |
filter count == 5
'''

[[queries]]
expected_event_ids = [55]
description = "test window pipe with descendant"
query = '''
file where event_subtype_full == "file_create_event"
and descendant of [process where process_name == "cmd.exe"] |
window 5m |
unique_count process_name |
filter count == 5
'''
27 changes: 26 additions & 1 deletion eql/pipes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""EQL Pipes."""
from .ast import PipeCommand
from .ast import PipeCommand, TimeRange
from .schema import Schema, EVENT_TYPE_GENERIC, MIXED_TYPES
from .types import dynamic, NUMBER, literal, PRIMITIVES, EXPRESSION, get_type, BASE_STRING
from .utils import is_string
Expand All @@ -14,6 +14,7 @@
"CountPipe",
"FilterPipe",
"UniqueCountPipe",
"WindowPipe"
)


Expand Down Expand Up @@ -154,3 +155,27 @@ class FilterPipe(PipeCommand):
def expression(self):
"""Get the filter expression."""
return self.arguments[0]


@PipeCommand.register('window')
class WindowPipe(PipeCommand):
"""Maintains a time window buffer for streaming events."""

argument_types = [literal(NUMBER)]

minimum_args = 1
maximum_args = 1

@property
def timespan(self):
"""Get timespan as a TimeRange object."""
return TimeRange.convert(self.arguments[0])

@classmethod
def validate(cls, arguments, type_hints=None):
"""After performing type checks, validate that the timespan is greater than zero."""
index, arguments, type_hints = super(WindowPipe, cls).validate(arguments, type_hints)
ts = cls(arguments).timespan
if index is None and (ts is None or ts.delta.total_seconds() <= 0):
index = 0
return index, arguments, type_hints
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Test(TestCommand):
def initialize_options(self):
"""Need to ensure pytest_args exists."""
TestCommand.initialize_options(self)
self.pytest_args = []
self.pytest_args = ["--disable-warnings"]

def run_tests(self):
"""Run pytest."""
Expand Down
4 changes: 4 additions & 0 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def test_valid_queries(self):
'any where true | unique a b c | sort a b c | count',
'any where true | unique a, b, c | sort a b c | count',
'any where true | unique a, b, c | sort a,b,c | count',
'any where true | window 5s | unique a, b | unique_count a | filter count > 5',
'file where child of [registry where true]',
'file where event of [registry where true]',
'file where event of [registry where true]',
Expand Down Expand Up @@ -282,9 +283,12 @@ def test_invalid_queries(self):
'process where process_name == "abc.exe" | head abc',
'process where process_name == "abc.exe" | head abc()',
'process where process_name == "abc.exe" | head abc(def, ghi)',
'process where process_name == "abc.exe" | window abc',
'process where process_name == "abc.exe" | window 10g',
'sequence [process where pid == pid]',
'sequence [process where pid == pid] []',
'sequence with maxspan=false [process where true] [process where true]',
'sequence with maxspan=10g [process where true] [process where true]',
'sequence with badparam=100 [process where true] [process where true]',
# check that the same number of BYs are in every subquery
'sequence [file where true] [process where true] by field1',
Expand Down
39 changes: 39 additions & 0 deletions tests/test_python_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,42 @@ def test_relationship_pid_collision(self):
output = self.get_output(queries=[parse_query(query)], config=config, events=events)
event_ids = [event.data['unique_pid'] for event in output]
self.validate_results(event_ids, ['host1-1003'], "Relationships failed due to pid collision")

def test_pipes_reset_state(self):
"""Test that the pipes are clearing their state after receiving PIPE_EOF"""
events = self.get_events()

queries = [
'process where true | unique opcode',
'process where true | unique_count opcode',
'process where true | count',
'process where true | count opcode',
'process where true | head 1',
'process where true | tail',
'process where true | sort opcode',
'process where true | window 10s',
'process where true | window 5m | head 1',
]

for query in queries:
engine = PythonEngine()

results = [] # type: list[Event]
engine.add_output_hook(results.append)
engine.add_queries([parse_query(query)])

engine.stream_events(events)
engine.finalize()
expected_len = len(results)

results.clear()

engine.stream_events(events)
engine.finalize()
actual_len = len(results)

self.assertEquals(
expected_len,
actual_len,
f"Expected results to be same when streaming events multiple times {query}"
)