AWS Spitzel#

Heuristic CloudTrail Event History Lookup for AWS IAM Forensics

Lightweight and flexible AWS DevOps command-line tool and Python 3.9 module for security operation duties (SOC) of AWS platform services.

This program extends the native AWS CloudTrail API LookupEvents action by being able to query against CloudTrail event objects with JSONPath expressions and a barebone implementation of comparison operations for Python built-in types, and regular expressions. In addition, the UNIX filename pattern of AWS IAM policy statement actions is used for filtering events by service and action (e.g. s3:List*), instead of the CloudTrail API schema attributes. (eventName, eventSource, etc.).

This program is licensed under the “Data licence Germany – attribution – Version 2.0”. URL

Run the following to get additional information on using the command-line interface:

$ aws-spitzel –help

If you neither specifiy --from, nor --to, nor --last-minute, the entire available date range will be used.

usage: aws-spitzel [-h] [--match EXPRESSION] [--from DATETIME] [--to DATETIME] [--last-minute MINUTES] IAM_ACTION [IAM_ACTION ...]

Positional Arguments#

IAM_ACTION

service id and action name in AWS IAM policy statement format ($service_id:$action_name)

Named Arguments#

--match

JSONPath filter

Default: []

--from

start of date range to find events in

Default: 2023-01-03 01:47:14.846958

--to

end of date range to find events in

Default: 2023-04-03 01:47:14.847040

--last-minute

from now back to x minutes ago

Make sure to specify the correct AWS CLI profile through the AWS_PROFILE environment variable

About#

Things currently change a lot and sometimes they break features, and routines that have been previously established. It’s noisy, but one gets to a point of true system resilience much quicker. After the migration to a different IAM scheme, a project suffered from the loss of access to an AWS service. The project manager claimed that an AWS service isn’t accessible anymore, but was so in the past. The new IAM scheme restricts access to more AWS services, so it might be possible to accidentally have restricted access to the service mentioned by the project manager. All services accessed by projects were regarded when defining the new IAM scheme. According to the compliance criteria of the German BSI C5 catalogue for operating certifiable cloud services in accordance with German data privacy protection regulations, events like these still fall under the security incident management requirements. These are just the nice kind of security incidents, where somebody accidentally get’s locked out. Unpleasant for the principal affected, but to quote the great Elton John: “I’m still standing…”. However, it would still be required to properly classify this incident accordingly (BSI C5 SIM-02). Depending on the correctness of the project managers statements, remediation actions may be postponed.

It is obvious which actions, and services are applicable as CloudTrail events and when they should have occured, however this would mean joining multiple queries against the AWS CloudTrail API LookupEvent action, since it currently allows only 1 query attribute at a time. One needs some more advanced query utility in order to do that. Amazon Athena is a perfect fit for that, since it supports SQL and advanced JSON-oriented queries. However, it is a giant and has rather extensive requirements, like an already existing S3 bucket populated with CloudTrail trail log events, even though CloudTrail is storing all events in the Nirvana for 90 days, regardless of somebody proactively creating a CloudTrail trail. The effort of getting the Amazon Athena functionality wasn’t worth it, because this program is what came up in the meantime. Besides some JSON-oriented query the only real operations required are some basic comparisons and regular expresssion substring evaluations. There is XPath for XML queries, and now there is JSONPath for JSON queries. The Python standard library itself (ast - abstract syntax tree for parsing strings as definitions of Python built-in types) and a custom basic tokenizer for parsing the tokens of an operand-operation-operand expression built on top of it can deliver the rest.

System interchange is possible through line-delimited JSON streaming via stdout. The program routine is parallelized through multi-threading, making it fast enough to keep up with the AWS API throttling threshold.

Due to the AWS CloudTrail API LookupEvents throttling threshold (100 events, across 2 requests per principal, per second) this program is optimized for single-core execution. Multi-core execution makes sense, when more than two access keys for the same AWS environment are being used, therefore doubling the networking throughput. An implementation for that will be covered in the future, when support for CloudTrail trails with S3 backends has been established by this program. This is currently planned for the middle of Q2 in 2023.

How It Works#

A main thread spawns a handler thread. The handler executes cloudtrail:LookupEvents requests in a loop indefinetly until a pagination token is no longer provided. Meanwhile, each paginated API response will spawn a worker thread, which are registered inside the handler thread. Each worker thread will loop through the list of events of the API response, and match each list item against one or multiple JSONPath expressions. Any matching item will then be compared against a specified Python built-in type, or regular expression.

Note

Supported filter expression operators:

  • ==: equal comparison to int, str, dict, bool, None, tuple, or list values

  • !=: not equal comparison to int, str, dict, bool, None, tuple, or list values

  • regex: compare by matching against a regular expression (only supported for str built-in types)

Note

ECMAScript behaviour of non-existing object properties being of type undefined is being emulated through get() method on dictionaries, so that JSONPath expressions not matching against any items can be compared to None (e.g. $.errorCode != None).

Should the item match, it will be pushed onto a priority queue as a queue item. After the thread looped over the entire event list, it will return.

The main thread loops over the priority queue indefinetly. Each time it retrieves a lookup match item from the queue, it will yield the item. Should it receive a stop signal, it will set the queue item retrieval timeout, so that the main thread’s loop will be broken, should there be no more items to be expected coming from the queue.

Getting Started#

Get familiarized with the CloudTrail event format and configure API access to the AWS environment in question (https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html. ).

The following is a sample AWS CloudTrail event

{
    "eventVersion": "1.04",
    "userIdentity": {
        "type": "IAMUser",
        "principalId": "EX_PRINCIPAL_ID",
        "arn": "arn:aws:iam::123456789012:user/Alice",
        "accountId": "123456789012",
        "accessKeyId": "EXAMPLE_KEY_ID",
        "userName": "AliceIsNotBob"
    },
    "eventTime": "2016-07-14T19:15:45Z",
    "eventSource": "cloudtrail.amazonaws.com",
    "eventName": "UpdateTrail",
    "awsRegion": "us-east-2",
    "sourceIPAddress": "205.251.233.182",
    "userAgent": "aws-cli/1.10.32 Python/2.7.9 Windows/7 botocore/1.4.22",
    "errorCode": "TrailNotFoundException",
    "errorMessage": "Unknown trail: myTrail2 for the user: 123456789012",
    "requestParameters": {"name": "myTrail2"},
    "responseElements": null,
    "requestID": "5d40662a-49f7-11e6-97e4-d9cb6ff7d6a3",
    "eventID": "b7d4398e-b2f0-4faa-9c76-e2d316a8d67f",
    "eventType": "AwsApiCall",
    "recipientAccountId": "123456789012"
}

With aws-spitzel, it is possible to query any attribute with a JSONPath query (e.g. $.userIdentity.userName) and compare it to a string, or number

$ aws-spitzel 'cloudtrail:Update*' \
    --match '$.userIdentity.userName == "AliceIsNotBob"'
$ aws-spitzel 'cloudtrail:Update*' \
    --match '$.userIdentity.userName != "AliceIsNotBob"'

You can also execute a regular expression substring search

$ aws-spitzel 'cloudtrail:Update*' \
    --match '$.userIdentity.userName regex "AliceIsNot.*"'

Note

operations occur on each single CloudTrail event, any JSON container objects (e.g. Records arrays) will not be available.

The following commands are required:

  • python3

  • pip

  • pipenv (Development)

Next, install and make sure the command is available.

$ python3 -m pip install victorykit-aws-spitzel
$ aws-spitzel --help

Alternatively, you can clone the repository

$ mkdir py-aws-spitzel && cd $_ && git clone https://bitbucket.org/victorykit/py-aws-spitzel.git .

install via pipenv (development)

$ python3 -m pipenv install -d
$ python3 -m pipenv run aws-spitzel --help

or pip

$ python3 -m pipenv install .
$ aws-spitzel --help

More information in the Contribution Guidelines

Usage Examples#

Make sure to configure the AWS API through setting the well-known AWS CLI environment variables.

The defaults are, to get all events within the last 90 days

$ aws-spitzel 's3:Get*' 'dynamodb:Get*'

There is a shorthand for the last x minutes

$ aws-spitzel 's3:Get*' 'dynamodb:*' --last-minute 300

Also, date ranges can be explicitly specified and will default to now and 90 days before now:

$ aws-spitzel \
    --from '2023-03-31 14:00:12' \
    --to '2023-04-01 00:00:00' \
    's3:Get*' \
    'dynamodb:*'

The following example finds all CloudTrail events of the AWS Transfer Family API, not made by AWS IAM user Alice existing in AWS account 000000000000 that we’re not denied and came from the host 147.161.171.112. Strange query, but hopefully the point comes across.

$ aws-spitzel \
    --match '$.errorCode == "AccessDenied"' \
    --match '$.userIdentity.principalId regex ".*:^((?!Alice).)"' \
    --match '$.userIdentity.accountId == "060862059283"' \
    --match '$.sourceIPAddress == "147.161.171.112"' \
    "transfer:List*"

The next example gets all Get events on S3 and DynamoDB API calls in the last 3 hours, which were denied for an IAM user MyUser from the principal account 060862059283, that assumed the role MyRole in the target account.

$ aws-spitzel \
    --match '$.errorCode == "AccessDenied"' \
    --match '$.userIdentity.arn regex ".*/MyRole/MyUser"' \
    --match '$.userIdentity.accountId == "060862059283"' \
    --match ''
    --last-minute 300 \
    's3:Get*' \
    'dynamodb:Get*' \

Piping is supported (warnings and errors are written to stderr)

while [ 1 -eq 1 ]; do

    echo "getting CloudTrail"

    aws-spitzel \
        --match '$.errorCode != "AccessDenied"' \
        --last-minute 300 \
        "s3:*Acl" \
        "ssm:List*" \
    | \
    jq '.'

    echo "waiting for CloudTrail (3000 seconds)"

    sleep 3000
done

License#

DL-DE->BY-2.0

Datenlizenz Deutschland – Namensnennung – Version 2.0

(1) Jede Nutzung ist unter den Bedingungen dieser „Datenlizenz Deutschland – Namensnennung – Version 2.0" zulässig.

Die bereitgestellten Daten und Metadaten dürfen für die kommerzielle und nicht kommerzielle Nutzung insbesondere

vervielfältigt, ausgedruckt, präsentiert, verändert, bearbeitet sowie an Dritte übermittelt werden;
mit eigenen Daten und Daten Anderer zusammengeführt und zu selbständigen neuen Datensätzen verbunden werden;
in interne und externe Geschäftsprozesse, Produkte und Anwendungen in öffentlichen und nicht öffentlichen elektronischen Netzwerken eingebunden werden.

(2) Bei der Nutzung ist sicherzustellen, dass folgende Angaben als Quellenvermerk enthalten sind:

Bezeichnung des Bereitstellers nach dessen Maßgabe,
der Vermerk „Datenlizenz Deutschland – Namensnennung – Version 2.0" oder „dl-de/by-2-0" mit Verweis auf den Lizenztext unter www.govdata.de/dl-de/by-2-0 sowie
einen Verweis auf den Datensatz (URI).
Dies gilt nur soweit die datenhaltende Stelle die Angaben 1. bis 3. zum Quellenvermerk bereitstellt.

(3) Veränderungen, Bearbeitungen, neue Gestaltungen oder sonstige Abwandlungen sind im Quellenvermerk mit dem Hinweis zu versehen, dass die Daten geändert wurden.

Data licence Germany – attribution – version 2.0

(1) Any use will be permitted provided it fulfils the requirements of this "Data licence Germany – attribution – Version 2.0".

The data and meta-data provided may, for commercial and non-commercial use, in particular

be copied, printed, presented, altered, processed and transmitted to third parties;
be merged with own data and with the data of others and be combined to form new and independent datasets;
be integrated in internal and external business processes, products and applications in public and non-public electronic networks.

(2) The user must ensure that the source note contains the following information:

the name of the provider,
the annotation "Data licence Germany – attribution – Version 2.0" or "dl-de/by-2-0" referring to the licence text available at www.govdata.de/dl-de/by-2-0, and
a reference to the dataset (URI).
This applies only if the entity keeping the data provides the pieces of information 1-3 for the source note.

(3) Changes, editing, new designs or other amendments must be marked as such in the source note.

URL: http://www.govdata.de/dl-de/by-2-0