Hardening AWS security: a proactive approach with CloudTrail, Athena and Lambda
Large AWS organization faces many security challenges like granular permissions, encryption, network, compliance… One of them is the monitoring. This article provides a comprehensive solution for monitoring and detecting risky activities in an AWS environment.
Solution overview

This solution take place in an AWS Organization with a master account and a dedicated security account. It leverages Cloudtrail, Athena and Lambda.
Centralized Cloudtrail logs storage
The first step in this solution is to configure CloudTrail to collect activity logs. Centralizing logs in an S3 bucket is essential for easy and efficient management.
Cloutrail trail
In the master account, create an “Organization trail” with this settings :
- Apply trail to my organization : Enabled for all accounts
- Multi-region : yes
- Trail log location : centralized S3 bucket name
- Log file validation : enabled
- Log file SSE-KMS encryption : disable (encryption is done by S3)
- Event type : Management events
- API activity : Read & Write
Be carefull of enabling data events. If you have workloads with intensive use of S3, you may end with a a tremendous amount of logs. It would have a cost impact.
S3 Bucket
In the security account, create a bucket with SSE encryption enable. Edit the bucket policy to allow Cloudtrail to push log files. Template :
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AWSCloudTrailAclCheck20150319",
"Effect": "Allow",
"Principal": {
"Service": [
"cloudtrail.amazonaws.com"
]
},
"Action": "s3:GetBucketAcl",
"Resource": "arn:aws:s3:::[BUCKET_NAME]",
"Condition": {
"StringEquals": {
"aws:SourceArn": "arn:aws:cloudtrail:[REGION]:[SECURITY_ACCOUNT_ID]:trail/[TRAIL_NAME]"
}
}
},
{
"Sid": "AWSCloudTrailWrite20150319",
"Effect": "Allow",
"Principal": {
"Service": [
"cloudtrail.amazonaws.com"
]
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::[BUCKET_NAME]/AWSLogs/[SECURITY_ACCOUNT_ID]/*",
"Condition": {
"StringEquals": {
"s3:x-amz-acl": "bucket-owner-full-control",
"aws:SourceArn": "arn:aws:cloudtrail:[REGION]:[SECURITY_ACCOUNT_ID]:trail/[TRAIL_NAME]"
}
}
},
{
"Sid": "AWSCloudTrailOrganizationWrite20150319",
"Effect": "Allow",
"Principal": {
"Service": [
"cloudtrail.amazonaws.com"
]
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/*",
"Condition": {
"StringEquals": {
"s3:x-amz-acl": "bucket-owner-full-control",
"aws:SourceArn": "arn:aws:cloudtrail:[REGION]:[SECURITY_ACCOUNT_ID]:trail/[TRAIL_NAME]"
}
}
}
]
}
For finops and compliance, we recommand to create a lifecyle policy to move S3 object to cheaper storage class and delete too old logs.
Logs analysis
Athena plays a central role in this solution by allowing SQL querying of logs stored in S3.
Table creation
To optimize cost and performance, we leverage partitionning as the logs are stored using this pattern : “[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/[ACCOUNT_ID]/CloudTrail/[REGION]/[YYYY]/[MM]/[DD]”.
This endup with this query to create the table :
CREATE EXTERNAL TABLE cloudtrail_logs(
eventVersion STRING,
userIdentity STRUCT<
type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
invokedBy: STRING,
accessKeyId: STRING,
userName: STRING,
sessionContext: STRUCT<
attributes: STRUCT<
mfaAuthenticated: STRING,
creationDate: STRING>,
sessionIssuer: STRUCT<
type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
userName: STRING>,
ec2RoleDelivery:string,
webIdFederationData:map<string,string>
>
>,
eventTime STRING,
eventSource STRING,
eventName STRING,
awsRegion STRING,
sourceIpAddress STRING,
userAgent STRING,
errorCode STRING,
errorMessage STRING,
requestparameters STRING,
responseelements STRING,
additionaleventdata STRING,
requestId STRING,
eventId STRING,
readOnly STRING,
resources ARRAY<
STRUCT<
arn: STRING,
accountId: STRING,
type: STRING
>
>,
eventType STRING,
apiVersion STRING,
recipientAccountId STRING,
serviceEventDetails STRING,
sharedEventID STRING,
vpcendpointid STRING,
tlsDetails STRUCT<
tlsVersion:string,
cipherSuite:string,
clientProvidedHostHeader:string
>
)
PARTITIONED BY (`account` string, `region` string, `timestamp` string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/'
TBLPROPERTIES (
'projection.enabled'='true',
'projection.region.type'='enum',
'projection.region.values'='us-east-1,us-east-2,us-west-1,us-west-2,ca-central-1,sa-east-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,af-south-1,me-south-1,ap-east-1,ap-south-1,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1',
'projection.timestamp.type'='date',
'projection.timestamp.format'='yyyy/MM/dd',
'projection.timestamp.interval'='1',
'projection.timestamp.interval.unit'='DAYS',
'projection.timestamp.range'='2020/01/01,NOW',
'projection.account.type'='enum',
'projection.account.values'='[COMMA_SEPARATED_ACCOUNT_IDS_LIST]',
'storage.location.template'='s3://[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/${account}/CloudTrail/${region}/${timestamp}/'
)
Queries example
Detect root login during last hour
SELECT
eventtime,
account,
useridentity.type as usertype,
useridentity.arn as identity
FROM "default"."cloudtrail_logs"
WHERE
"timestamp" = format_datetime(current_timestamp, 'Y/MM/dd')
AND from_iso8601_timestamp(eventtime) > date_trunc('second', current_timestamp - interval '1' hour)
AND eventSource = 'signin.amazonaws.com'
AND eventName = 'ConsoleLogin'
AND useridentity.type = 'Root'
ORDER BY eventtime
Detect resource deletion today
SELECT
eventtime,
account,
useridentity.type as usertype,
useridentity.arn as identity,
eventsource,
eventname
FROM "default"."cloudtrail_logs"
WHERE
"timestamp" = format_datetime(current_timestamp, 'Y/MM/dd')
AND (lower(eventName) like '%delete%' OR lower(eventName) like '%remove%')
ORDER BY eventtime
Send notifications
Lambda
A Lambda function is triggered by an scheduled event rule. The schedule depends on the timeframe of the query (today, yesterday, last hour, last week…). Thanks to boto3, the AWS SDK for python, the function run Athena query execution.
...
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
params = {
"region": "eu-west-1",
"database": "default",
"bucket": "aws-athena-query-results-123456789-eu-west-1",
"path": "temp/athena/output",
"query": f'select eventtime, \
account, \
useridentity.arn as identity \
FROM "default"."cloudtrail_logs" \
WHERE "timestamp" = \'{yesterday.strftime("%Y/%m/%d")}\' \
AND eventSource = \'signin.amazonaws.com\' \
AND eventName = \'ConsoleLogin\' \
AND useridentity.type = \'Root\' \
order by eventtime',
}
client = boto3.client("athena")
# This function executes the query and returns the query execution ID
response_query_execution_id = client.start_query_execution(
QueryString=params["query"],
QueryExecutionContext={"Database": "default"},
ResultConfiguration={
"OutputLocation": "s3://" + params["bucket"] + "/" + params["path"]
}
)
...
The results of the query is then download and parsed according to prepare the content of the notification. Notification destination will depends of your organization. Below two examples.
Email notification
Emails are send via the Simple Email Service (SES). Again, we use boto3 to use its API.
def send_email(data):
ses = boto3.client('ses')
sender = os.environ["email_sender"]
recipients = [ os.environ["email_recipient"] ]
html, txt = build_email_content(data)
print("--------------------------------------------------")
print(f"HTML: {html}")
print("--------------------------------------------------")
print(f"TXT: {txt}")
print("--------------------------------------------------")
response = ses.send_email(
Source=sender,
Destination={'ToAddresses': recipients},
Message={
'Subject': {
'Data': "AWS - Resource deletion report",
'Charset': 'UTF-8'
},
'Body': {
'Html': {
'Data': html,
'Charset': 'UTF-8'
},
'Text': {
'Data': txt,
'Charset': 'UTF-8'
}
}
}
)
print(response)
def build_email_content(data):
html = f"<html><body><h1>AWS - Resource deletion report</h1><p>Hi,</p><p>Here are deletion actions done yesterday:</p><table border=\"1\" style=\"border-collapse : collapse; border: 2px solid; \"><tr><th>Event time</th><th>User</th><th>Account</th><th>Region</th><th>Event source</th><th>Event name</th></tr></thead><tbody>"
txt = f"AWS - Resource deletion report\n\nHi,\n\nHere are deletion actions done yesterday:\n\n"
for result in data:
event_time = result["eventtime"]
user = result["user"]
account_id = result["account"]
region = result["region"]
event_source = result["eventsource"]
event_name = result["eventname"]
html += f"<tr><td>{event_time}</td><td>{user}</td><td>{account_id}</td><td>{region}</td><td>{event_source}</td><td>{event_name}</td></tr>"
txt += f"{event_time} - {user} - {account_id} - {region} - {event_source} - {event_name}\n"
html += "<tbody></table></body></html>"
return html, txt
Don’t forget to verified your identities in SES before sending email.
Slack notification
Slack notification is done by calling an http “incoming webhook”. Here the documentation to create one : https://api.slack.com/messaging/webhooks
def send_slack_notification(data):
for result in data:
event_time = result["eventtime"]
user = result["user"]
account_id = result["account"]
region = result["region"]
event_source = result["eventsource"]
event_name = result["eventname"]
text = f"{event_time} | Resource deletion alert : {user} deleted a resource in {region} in the account {account_id} with the event {event_source}/{event_name}"
post_message_to_slack(text)
def post_message_to_slack(text,):
http = urllib3.PoolManager()
body = {"channel": os.environ["slack_channel"], "text": text}
response = http.request(
"POST",
os.environ["slack_webhook"],
body=json.dumps(body),
headers={"Content-Type": "application/json"},
)
print(response.data)
Conclusion
This solution can be your first step in your security journey. Its serverless nature makes is simple to deploy, operate and cheap.
Full code is available there : https://github.com/maitelkamel-mks/aws-sample/tree/main/cloudtrail-notifications. As a bonus, you’ll find a lambda that create or update the athena table according to accounts ids.