Large AWS organization faces many security challenges like granular permissions, encryption, network, compliance… One of them is the monitoring. This article provides a comprehensive solution for monitoring and detecting risky activities in an AWS environment.
Solution overview
This solution take place in an AWS Organization with a master account and a dedicated security account. It leverages Cloudtrail, Athena and Lambda.
Centralized Cloudtrail logs storage
The first step in this solution is to configure CloudTrail to collect activity logs. Centralizing logs in an S3 bucket is essential for easy and efficient management.
Cloudtrail trail
In the master account, create an "Organization trail" with this settings:
- Apply trail to my organization: Enabled for all accounts
- Multi-region: yes
- Trail log location: centralized S3 bucket name
- Log file validation: enabled
- Log file SSE-KMS encryption: disable (encryption is done by S3)
- Event type: Management events
- API activity: Read & Write
Be careful of enabling data events. If you have workloads with intensive use of S3, you may end with a tremendous amount of logs. It would have a cost impact.
S3 Bucket
In the security account, create a bucket with SSE encryption enable. Edit the bucket policy to allow Cloudtrail to push log files. Template:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AWSCloudTrailAclCheck20150319",
"Effect": "Allow",
"Principal": {
"Service": [
"cloudtrail.amazonaws.com"
]
},
"Action": "s3:GetBucketAcl",
"Resource": "arn:aws:s3:::[BUCKET_NAME]",
"Condition": {
"StringEquals": {
"aws:SourceArn": "arn:aws:cloudtrail:[REGION]:[SECURITY_ACCOUNT_ID]:trail/[TRAIL_NAME]"
}
}
},
{
"Sid": "AWSCloudTrailWrite20150319",
"Effect": "Allow",
"Principal": {
"Service": [
"cloudtrail.amazonaws.com"
]
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::[BUCKET_NAME]/AWSLogs/[SECURITY_ACCOUNT_ID]/*",
"Condition": {
"StringEquals": {
"s3:x-amz-acl": "bucket-owner-full-control",
"aws:SourceArn": "arn:aws:cloudtrail:[REGION]:[SECURITY_ACCOUNT_ID]:trail/[TRAIL_NAME]"
}
}
},
{
"Sid": "AWSCloudTrailOrganizationWrite20150319",
"Effect": "Allow",
"Principal": {
"Service": [
"cloudtrail.amazonaws.com"
]
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/*",
"Condition": {
"StringEquals": {
"s3:x-amz-acl": "bucket-owner-full-control",
"aws:SourceArn": "arn:aws:cloudtrail:[REGION]:[SECURITY_ACCOUNT_ID]:trail/[TRAIL_NAME]"
}
}
}
]
}
For finops and compliance, we recommend to create a lifecycle policy to move S3 object to cheaper storage class and delete too old logs.
Logs analysis
Athena plays a central role in this solution by allowing SQL querying of logs stored in S3.
Table creation
To optimize cost and performance, we leverage partitioning as the logs are stored using this pattern: "[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/[ACCOUNT_ID]/CloudTrail/[REGION]/[YYYY]/[MM]/[DD]".
This ends up with this query to create the table:
CREATE EXTERNAL TABLE cloudtrail_logs(
eventVersion STRING,
userIdentity STRUCT<
type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
invokedBy: STRING,
accessKeyId: STRING,
userName: STRING,
sessionContext: STRUCT<
attributes: STRUCT<
mfaAuthenticated: STRING,
creationDate: STRING>,
sessionIssuer: STRUCT<
type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
userName: STRING>,
ec2RoleDelivery:string,
webIdFederationData:map<string,string>
>
>,
eventTime STRING,
eventSource STRING,
eventName STRING,
awsRegion STRING,
sourceIpAddress STRING,
userAgent STRING,
errorCode STRING,
errorMessage STRING,
requestparameters STRING,
responseelements STRING,
additionaleventdata STRING,
requestId STRING,
eventId STRING,
readOnly STRING,
resources ARRAY<
STRUCT<
arn: STRING,
accountId: STRING,
type: STRING
>
>,
eventType STRING,
apiVersion STRING,
recipientAccountId STRING,
serviceEventDetails STRING,
sharedEventID STRING,
vpcendpointid STRING,
tlsDetails STRUCT<
tlsVersion:string,
cipherSuite:string,
clientProvidedHostHeader:string
>
)
PARTITIONED BY (`account` string, `region` string, `timestamp` string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/'
TBLPROPERTIES (
'projection.enabled'='true',
'projection.region.type'='enum',
'projection.region.values'='us-east-1,us-east-2,us-west-1,us-west-2,ca-central-1,sa-east-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,af-south-1,me-south-1,ap-east-1,ap-south-1,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1',
'projection.timestamp.type'='date',
'projection.timestamp.format'='yyyy/MM/dd',
'projection.timestamp.interval'='1',
'projection.timestamp.interval.unit'='DAYS',
'projection.timestamp.range'='2020/01/01,NOW',
'projection.account.type'='enum',
'projection.account.values'='[COMMA_SEPARATED_ACCOUNT_IDS_LIST]',
'storage.location.template'='s3://[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/${account}/CloudTrail/${region}/${timestamp}/'
)
Queries example
Detect root login during last hour:
SELECT
eventtime,
account,
useridentity.type as usertype,
useridentity.arn as identity
FROM "default"."cloudtrail_logs"
WHERE
"timestamp" = format_datetime(current_timestamp, 'Y/MM/dd')
AND from_iso8601_timestamp(eventtime) > date_trunc('second', current_timestamp - interval '1' hour)
AND eventSource = 'signin.amazonaws.com'
AND eventName = 'ConsoleLogin'
AND useridentity.type = 'Root'
ORDER BY eventtime
Detect resource deletion today:
SELECT
eventtime,
account,
useridentity.type as usertype,
useridentity.arn as identity,
eventsource,
eventname
FROM "default"."cloudtrail_logs"
WHERE
"timestamp" = format_datetime(current_timestamp, 'Y/MM/dd')
AND (lower(eventName) like '%delete%' OR lower(eventName) like '%remove%')
ORDER BY eventtime
Send notifications
Lambda
A Lambda function is triggered by an scheduled event rule. The schedule depends on the timeframe of the query (today, yesterday, last hour, last week…). Thanks to boto3, the AWS SDK for python, the function runs Athena query execution.
...
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
params = {
"region": "eu-west-1",
"database": "default",
"bucket": "aws-athena-query-results-123456789-eu-west-1",
"path": "temp/athena/output",
"query": f'select eventtime, \
account, \
useridentity.arn as identity \
FROM "default"."cloudtrail_logs" \
WHERE "timestamp" = \'{yesterday.strftime("%Y/%m/%d")}\' \
AND eventSource = \'signin.amazonaws.com\' \
AND eventName = \'ConsoleLogin\' \
AND useridentity.type = \'Root\' \
order by eventtime',
}
client = boto3.client("athena")
# This function executes the query and returns the query execution ID
response_query_execution_id = client.start_query_execution(
QueryString=params["query"],
QueryExecutionContext={"Database": "default"},
ResultConfiguration={
"OutputLocation": "s3://" + params["bucket"] + "/" + params["path"]
}
)
...
The results of the query is then downloaded and parsed to prepare the content of the notification. Notification destination will depends of your organization. Below two examples.
Email notification
Emails are sent via the Simple Email Service (SES). Again, we use boto3 to use its API.
def send_email(data):
ses = boto3.client('ses')
sender = os.environ["email_sender"]
recipients = [ os.environ["email_recipient"] ]
html, txt = build_email_content(data)
response = ses.send_email(
Source=sender,
Destination={'ToAddresses': recipients},
Message={
'Subject': {
'Data': "AWS - Resource deletion report",
'Charset': 'UTF-8'
},
'Body': {
'Html': {
'Data': html,
'Charset': 'UTF-8'
},
'Text': {
'Data': txt,
'Charset': 'UTF-8'
}
}
}
)
Don't forget to verify your identities in SES before sending email.
Slack notification
Slack notification is done by calling an http "incoming webhook". Here the documentation to create one: https://api.slack.com/messaging/webhooks
def send_slack_notification(data):
for result in data:
event_time = result["eventtime"]
user = result["user"]
account_id = result["account"]
region = result["region"]
event_source = result["eventsource"]
event_name = result["eventname"]
text = f"{event_time} | Resource deletion alert : {user} deleted a resource in {region} in the account {account_id} with the event {event_source}/{event_name}"
post_message_to_slack(text)
def post_message_to_slack(text,):
http = urllib3.PoolManager()
body = {"channel": os.environ["slack_channel"], "text": text}
response = http.request(
"POST",
os.environ["slack_webhook"],
body=json.dumps(body),
headers={"Content-Type": "application/json"},
)
Conclusion
This solution can be your first step in your security journey. Its serverless nature makes it simple to deploy, operate and cheap.
Full code is available there: https://github.com/maitelkamel-mks/aws-sample/tree/main/cloudtrail-notifications. As a bonus, you'll find a lambda that creates or updates the Athena table according to account ids.

