AWS Spitzel#
Note
Repository: https://bitbucket.org/victorykit/py-aws-spitzel
Documentation: https://victorykit.bitbucket.io/py-aws-spitzel
Issue Tracker: https://bitbucket.org/victorykit/py-aws-spitzel/issues
Mailing List: https://groups.google.com/a/victory-k.it/g/py-aws-spitzel
Heuristic CloudTrail Event History Lookup for AWS IAM Forensics
Lightweight and flexible AWS DevOps command-line tool and Python 3.9 module for security operation duties (SOC) of AWS platform services.
This program extends the
native AWS CloudTrail API LookupEvents action by being
able to query against CloudTrail event objects with JSONPath expressions and a
barebone implementation of comparison operations for Python built-in types, and
regular expressions. In addition, the UNIX filename pattern of AWS IAM policy
statement actions is used for filtering events by service and action
(e.g. s3:List*
), instead of the CloudTrail API schema attributes.
(eventName
, eventSource
, etc.).
This program is licensed under the “Data licence Germany – attribution – Version 2.0”. URL
Run the following to get additional information on using the command-line interface:
$ aws-spitzel –help
If you neither specifiy --from
, nor --to
, nor --last-minute
, the
entire available date range will be used.
usage: aws-spitzel [-h] [--match EXPRESSION] [--from DATETIME] [--to DATETIME] [--last-minute MINUTES] IAM_ACTION [IAM_ACTION ...]
Positional Arguments#
- IAM_ACTION
service id and action name in AWS IAM policy statement format ($service_id:$action_name)
Named Arguments#
- --match
JSONPath filter
Default: []
- --from
start of date range to find events in
Default: 2023-01-03 01:47:14.846958
- --to
end of date range to find events in
Default: 2023-04-03 01:47:14.847040
- --last-minute
from now back to x minutes ago
Make sure to specify the correct AWS CLI profile through the AWS_PROFILE environment variable
About#
Things currently change a lot and sometimes they break features, and routines that have been previously established. It’s noisy, but one gets to a point of true system resilience much quicker. After the migration to a different IAM scheme, a project suffered from the loss of access to an AWS service. The project manager claimed that an AWS service isn’t accessible anymore, but was so in the past. The new IAM scheme restricts access to more AWS services, so it might be possible to accidentally have restricted access to the service mentioned by the project manager. All services accessed by projects were regarded when defining the new IAM scheme. According to the compliance criteria of the German BSI C5 catalogue for operating certifiable cloud services in accordance with German data privacy protection regulations, events like these still fall under the security incident management requirements. These are just the nice kind of security incidents, where somebody accidentally get’s locked out. Unpleasant for the principal affected, but to quote the great Elton John: “I’m still standing…”. However, it would still be required to properly classify this incident accordingly (BSI C5 SIM-02). Depending on the correctness of the project managers statements, remediation actions may be postponed.
It is obvious which actions, and services are applicable as CloudTrail events
and when they should have occured, however this would mean joining multiple
queries against the AWS CloudTrail API LookupEvent
action, since it
currently allows only 1 query attribute at a time. One needs some more advanced
query utility in order to do that. Amazon Athena is a perfect fit for that,
since it supports SQL and advanced JSON-oriented queries. However, it is a
giant and has rather extensive requirements, like an already existing S3 bucket
populated with CloudTrail trail log events, even though CloudTrail is storing
all events in the Nirvana for 90 days, regardless of somebody proactively
creating a CloudTrail trail. The effort of getting the Amazon Athena
functionality wasn’t worth it, because this program is what came up in the
meantime. Besides some JSON-oriented query the only real operations required
are some basic comparisons and regular expresssion substring evaluations. There
is XPath for XML queries, and now there is JSONPath for JSON queries. The
Python standard library itself (ast - abstract syntax tree for parsing strings
as definitions of Python built-in types) and a custom basic tokenizer for
parsing the tokens of an operand-operation-operand expression built on top of
it can deliver the rest.
System interchange is possible through line-delimited JSON streaming via stdout. The program routine is parallelized through multi-threading, making it fast enough to keep up with the AWS API throttling threshold.
Due to the AWS CloudTrail API LookupEvents
throttling threshold (100
events, across 2 requests per principal, per second) this program is optimized
for single-core execution. Multi-core execution makes sense, when more than two
access keys for the same AWS environment are being used, therefore doubling the
networking throughput. An implementation for that will be covered in the
future, when support for CloudTrail trails with S3 backends has been
established by this program. This is currently planned for the middle of Q2 in
2023.
How It Works#
A main thread spawns a handler thread. The handler executes
cloudtrail:LookupEvents
requests in a loop indefinetly until a
pagination token is no longer provided. Meanwhile, each paginated API response
will spawn a worker thread, which are registered inside the handler thread.
Each worker thread will loop through the list of events of the API response,
and match each list item against one or multiple JSONPath expressions. Any
matching item will then be compared against a specified Python built-in type,
or regular expression.
Warning
This program uses a quasi-port of the original Javascript JSONPath reference implementation. Expect resolution as described in IETF draft-goessner-dispatch-jsonpath-00.
Note
Supported filter expression operators:
==: equal comparison to int, str, dict, bool, None, tuple, or list values
!=: not equal comparison to int, str, dict, bool, None, tuple, or list values
regex: compare by matching against a regular expression (only supported for str built-in types)
Note
ECMAScript behaviour of non-existing object properties being of type
undefined
is being emulated through get() method on dictionaries, so
that JSONPath expressions not matching against any items can be compared
to None
(e.g. $.errorCode != None
).
Should the item match, it will be pushed onto a priority queue as a queue item. After the thread looped over the entire event list, it will return.
The main thread loops over the priority queue indefinetly. Each time it retrieves a lookup match item from the queue, it will yield the item. Should it receive a stop signal, it will set the queue item retrieval timeout, so that the main thread’s loop will be broken, should there be no more items to be expected coming from the queue.
Getting Started#
Get familiarized with the CloudTrail event format and configure API access to the AWS environment in question (https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html. ).
The following is a sample AWS CloudTrail event
{
"eventVersion": "1.04",
"userIdentity": {
"type": "IAMUser",
"principalId": "EX_PRINCIPAL_ID",
"arn": "arn:aws:iam::123456789012:user/Alice",
"accountId": "123456789012",
"accessKeyId": "EXAMPLE_KEY_ID",
"userName": "AliceIsNotBob"
},
"eventTime": "2016-07-14T19:15:45Z",
"eventSource": "cloudtrail.amazonaws.com",
"eventName": "UpdateTrail",
"awsRegion": "us-east-2",
"sourceIPAddress": "205.251.233.182",
"userAgent": "aws-cli/1.10.32 Python/2.7.9 Windows/7 botocore/1.4.22",
"errorCode": "TrailNotFoundException",
"errorMessage": "Unknown trail: myTrail2 for the user: 123456789012",
"requestParameters": {"name": "myTrail2"},
"responseElements": null,
"requestID": "5d40662a-49f7-11e6-97e4-d9cb6ff7d6a3",
"eventID": "b7d4398e-b2f0-4faa-9c76-e2d316a8d67f",
"eventType": "AwsApiCall",
"recipientAccountId": "123456789012"
}
With aws-spitzel, it is possible to query any attribute with a JSONPath query
(e.g. $.userIdentity.userName
) and compare it to a string, or number
$ aws-spitzel 'cloudtrail:Update*' \
--match '$.userIdentity.userName == "AliceIsNotBob"'
$ aws-spitzel 'cloudtrail:Update*' \
--match '$.userIdentity.userName != "AliceIsNotBob"'
You can also execute a regular expression substring search
$ aws-spitzel 'cloudtrail:Update*' \
--match '$.userIdentity.userName regex "AliceIsNot.*"'
Note
operations occur on each single CloudTrail event, any JSON container
objects (e.g. Records
arrays) will not be available.
The following commands are required:
python3
pip
pipenv
(Development)
Next, install and make sure the command is available.
$ python3 -m pip install victorykit-aws-spitzel
$ aws-spitzel --help
Alternatively, you can clone the repository
$ mkdir py-aws-spitzel && cd $_ && git clone https://bitbucket.org/victorykit/py-aws-spitzel.git .
install via pipenv (development)
$ python3 -m pipenv install -d
$ python3 -m pipenv run aws-spitzel --help
or pip
$ python3 -m pipenv install .
$ aws-spitzel --help
More information in the Contribution Guidelines
Usage Examples#
Make sure to configure the AWS API through setting the well-known AWS CLI environment variables.
The defaults are, to get all events within the last 90 days
$ aws-spitzel 's3:Get*' 'dynamodb:Get*'
There is a shorthand for the last x minutes
$ aws-spitzel 's3:Get*' 'dynamodb:*' --last-minute 300
Also, date ranges can be explicitly specified and will default to now and 90 days before now:
$ aws-spitzel \
--from '2023-03-31 14:00:12' \
--to '2023-04-01 00:00:00' \
's3:Get*' \
'dynamodb:*'
The following example finds all CloudTrail events of the AWS Transfer Family
API, not made by AWS IAM user Alice
existing in AWS account 000000000000
that we’re not denied and came from the host 147.161.171.112
. Strange
query, but hopefully the point comes across.
$ aws-spitzel \
--match '$.errorCode == "AccessDenied"' \
--match '$.userIdentity.principalId regex ".*:^((?!Alice).)"' \
--match '$.userIdentity.accountId == "060862059283"' \
--match '$.sourceIPAddress == "147.161.171.112"' \
"transfer:List*"
The next example gets all Get events on S3 and DynamoDB API calls in the last 3 hours, which were denied for an IAM user MyUser from the principal account 060862059283, that assumed the role MyRole in the target account.
$ aws-spitzel \
--match '$.errorCode == "AccessDenied"' \
--match '$.userIdentity.arn regex ".*/MyRole/MyUser"' \
--match '$.userIdentity.accountId == "060862059283"' \
--match ''
--last-minute 300 \
's3:Get*' \
'dynamodb:Get*' \
Piping is supported (warnings and errors are written to stderr)
while [ 1 -eq 1 ]; do
echo "getting CloudTrail"
aws-spitzel \
--match '$.errorCode != "AccessDenied"' \
--last-minute 300 \
"s3:*Acl" \
"ssm:List*" \
| \
jq '.'
echo "waiting for CloudTrail (3000 seconds)"
sleep 3000
done
License#
DL-DE->BY-2.0
Datenlizenz Deutschland – Namensnennung – Version 2.0
(1) Jede Nutzung ist unter den Bedingungen dieser „Datenlizenz Deutschland – Namensnennung – Version 2.0" zulässig.
Die bereitgestellten Daten und Metadaten dürfen für die kommerzielle und nicht kommerzielle Nutzung insbesondere
vervielfältigt, ausgedruckt, präsentiert, verändert, bearbeitet sowie an Dritte übermittelt werden;
mit eigenen Daten und Daten Anderer zusammengeführt und zu selbständigen neuen Datensätzen verbunden werden;
in interne und externe Geschäftsprozesse, Produkte und Anwendungen in öffentlichen und nicht öffentlichen elektronischen Netzwerken eingebunden werden.
(2) Bei der Nutzung ist sicherzustellen, dass folgende Angaben als Quellenvermerk enthalten sind:
Bezeichnung des Bereitstellers nach dessen Maßgabe,
der Vermerk „Datenlizenz Deutschland – Namensnennung – Version 2.0" oder „dl-de/by-2-0" mit Verweis auf den Lizenztext unter www.govdata.de/dl-de/by-2-0 sowie
einen Verweis auf den Datensatz (URI).
Dies gilt nur soweit die datenhaltende Stelle die Angaben 1. bis 3. zum Quellenvermerk bereitstellt.
(3) Veränderungen, Bearbeitungen, neue Gestaltungen oder sonstige Abwandlungen sind im Quellenvermerk mit dem Hinweis zu versehen, dass die Daten geändert wurden.
Data licence Germany – attribution – version 2.0
(1) Any use will be permitted provided it fulfils the requirements of this "Data licence Germany – attribution – Version 2.0".
The data and meta-data provided may, for commercial and non-commercial use, in particular
be copied, printed, presented, altered, processed and transmitted to third parties;
be merged with own data and with the data of others and be combined to form new and independent datasets;
be integrated in internal and external business processes, products and applications in public and non-public electronic networks.
(2) The user must ensure that the source note contains the following information:
the name of the provider,
the annotation "Data licence Germany – attribution – Version 2.0" or "dl-de/by-2-0" referring to the licence text available at www.govdata.de/dl-de/by-2-0, and
a reference to the dataset (URI).
This applies only if the entity keeping the data provides the pieces of information 1-3 for the source note.
(3) Changes, editing, new designs or other amendments must be marked as such in the source note.
URL: http://www.govdata.de/dl-de/by-2-0