Source code for aws_spitzel

#!/usr/bin/env python3
"""Heuristic CloudTrail Event History Lookup for AWS IAM Forensics

Lightweight and flexible AWS DevOps command-line tool and Python 3.9 module for 
security operation duties (SOC) of AWS platform services.

This program extends the 
`native AWS CloudTrail API LookupEvents action <https://docs.aws.amazon.com/awscloudtrail/latest/APIReference/API_LookupEvents.html>`_ by being 
able to query against CloudTrail event objects with JSONPath expressions and a 
barebone implementation of comparison operations for Python built-in types, and
regular expressions. In addition, the UNIX filename pattern of AWS IAM policy 
statement actions is used for filtering events by service and action 
(e.g. ``s3:List*``), instead of the CloudTrail API schema attributes.
(``eventName``, ``eventSource``, etc.).

This program is licensed under the 
"Data licence Germany – attribution – Version 2.0".
`URL <http://www.govdata.de/dl-de/by-2-0>`_
"""
from dataclasses import dataclass, field, asdict
import json
from datetime import datetime
from typing import Dict, Optional, List, Callable, Generator, Any, Tuple
from queue import PriorityQueue
from threading import Thread
from ast import parse as astparse
from re import match as regexmatch
from fnmatch import fnmatch
from time import sleep
import sys

import boto3
from jsonpath import jsonpath


[docs]class DefaultAWSAPIClients: cloudtrail = boto3.client('cloudtrail')
[docs]@dataclass(order=True) class PrioritizedQueueItem: """ """ priority: int item: Any = field(compare=False)
[docs]@dataclass class Expression: """ """ left_operand: str right_operand: str operator: str
[docs]def get_expression_object( raw: str, operators: Optional[List[str]] = ['==', '!=', 'regex'] ) -> Expression: """ """ expression_size = len(raw) bfr = '' operator_index = -1 operator_size = -1 for index in range(expression_size): # this is groovy 🕺 opts = [op[len(bfr)] for op in operators if len(op) -1 >= len(bfr)] if raw[index] in opts: if bfr == '': operator_index = index bfr += raw[index] else: operator_index = -1 bfr = '' continue if bfr in operators: operator_size = len(bfr) break if operator_index == -1 or operator_size == -1: raise Exception('unable to parse operator') right_operand = astparse(raw[operator_index + operator_size:].lstrip(), mode='eval').body.n return Expression( left_operand = raw[0:operator_index].rstrip(), right_operand = right_operand, operator = raw[operator_index:operator_index + operator_size], )
[docs]@dataclass class Action: """AWS IAM action """ #: id of service (e.g. `s3`) service_id: str #: name of action (e.g. `ListBuckets`) action_name: str
[docs]@dataclass class DateRange: """ """ #: start of range start: datetime #: end of range end: datetime #: string format of datetime format: str = '%Y-%m-%d %H:%M:%S'
[docs]@dataclass class Filter: """ """ expression: Expression
[docs]@dataclass class ProgramContext: """ """ actions: List[Action] date_range: DateRange filters: Optional[List[Filter]] = field(default_factory=[])
[docs]def boto3_next_token( callable_: Callable, kwargs={}, next_token='', max_results=50, args = [] ) -> Generator[Tuple[dict, str], None, None]: """ """ failures_in_a_row = 0 while next_token != None: kwargs_proto = { 'MaxResults': max_results } if next_token != '': kwargs_proto['NextToken'] = next_token try: response = callable_( *args, **{ **kwargs_proto, **kwargs } ) except Exception as err: if failures_in_a_row >= 3: raise err from err failures_in_a_row += 1 print('API Throttle',file=sys.stderr) sleep(1) continue next_token = response.get('NextToken', None) yield response, next_token if next_token is None: break
[docs]def lookup_event_response( response: dict, context: ProgramContext, action: Action, callback: Callable[[Any], None], operations: Optional[Dict[str, Callable]] = { '==': lambda a, b: a == b, '!=': lambda a, b: a != b, 'regex': lambda a, b: False if regexmatch(b, a) is None else True } ) -> None: """retrieve matches of a boto3 CloudTrail API lookup through a callback :param response: a cloudtrail boto3 LookupEvents API response object :param action: the action (eventSource) the API request was made for :param context: the global request object of the program :param callback: callable for reporting a succesful operation :param operations: comparison operation callables """ for _event in response.get('Events', []): event = json.loads(_event['CloudTrailEvent']) if not fnmatch(event['eventName'], action.action_name): continue match = True for filter_ in context.filters: json_path_matches = jsonpath( event, filter_.expression.left_operand, 'VALUE' ) if isinstance(json_path_matches, list) and \ len(json_path_matches) == 0: match = False break elif isinstance(json_path_matches, list) \ and len(json_path_matches) > 1: print(f'JSONPath {filter_.expression.left_operand} returns ' 'more than one match', file=sys.stderr) break json_path_match = json_path_matches[0] if isinstance(json_path_matches, list) else json_path_matches match = operations[filter_.expression.operator]( json_path_match, filter_.expression.right_operand ) if match is not True: break if match is True: callback(event)
[docs]def handle_action_lookup( context: ProgramContext, action: Action, queue: PriorityQueue, worker_threads: Optional[list] = [], cloudtrail_api_client = DefaultAWSAPIClients.cloudtrail, ): """ :param context: the global request object of the program :param action: the action (eventSource) the API request will be made for :param queue: lookup match item queue for synchronising threads :param worker_threads: list instance of worker threads assigned to this handler thread :param cloudtrail_api_client: """ for response_page, next_token in boto3_next_token( cloudtrail_api_client.lookup_events, { 'LookupAttributes': [ { 'AttributeKey': 'EventSource', 'AttributeValue': f'{action.service_id}.amazonaws.com' }, ], 'StartTime': context.date_range.start, 'EndTime': context.date_range.end, } ): thread = Thread( name=next_token, target=lookup_event_response, args=[ response_page, context, action, lambda item: queue.put( PrioritizedQueueItem(priority=0, item=item) ), ] ) worker_threads.append(thread) thread.start() for thread in worker_threads: thread.join() queue.put(PrioritizedQueueItem(priority=2, item='WORKERS_JOINED'))
[docs]def main(context: ProgramContext) -> dict: """ :param range: date range to search in """ queue = PriorityQueue() handler_threads = [] for action in context.actions: thread = Thread( target = handle_action_lookup, args = [ context, action, queue, [], boto3.client('cloudtrail') ] ) handler_threads.append(thread) thread.start() done = 0 while True: item = queue.get(True) if item.item == 'WORKERS_JOINED': done += 1 else: yield item.item if done == len(handler_threads): break for thread in handler_threads: thread.join()