Large AWS organizations face many security challenges: granular permissions, encryption, network segmentation, compliance — and monitoring. Monitoring is often the last thing teams set up, yet it is what separates organizations that detect a breach in hours from those that discover it months later. This article provides a comprehensive solution for monitoring and detecting risky activities in an AWS environment using CloudTrail, Athena, and Lambda.
Why AWS Security Monitoring Matters
The AWS shared responsibility model puts the burden of detecting unauthorized activity squarely on you. AWS protects the infrastructure; you protect what runs on it. In practice, the most common sources of incidents in AWS environments are not sophisticated external attackers — they are insider threats, misconfigured IAM roles, and compromised access keys.
A few scenarios that happen in real organizations:
- A developer's access key leaked in a public GitHub repository triggers thousands of API calls from an unknown IP within minutes of exposure.
- An IAM role with overly broad permissions gets attached to a public-facing Lambda, allowing privilege escalation through a chained set of policy attachments.
- A contractor account that should have been deprovisioned months ago is used to exfiltrate data from S3.
Reactive tooling — scanning for vulnerabilities after the fact — is not enough. You need continuous, near-real-time visibility into what is happening across every account in your organization. CloudTrail is the foundation of any AWS security program: it records every API call made in your accounts, by any principal, from any source. Without it, investigations are blind.
The solution below builds on that foundation by adding a queryable layer (Athena) and an alerting layer (Lambda + SES/Slack), giving you actionable signals without requiring expensive third-party SIEMs.
Solution overview
This solution takes place in an AWS Organization with a master account and a dedicated security account. It leverages CloudTrail, Athena, and Lambda.
Centralized Cloudtrail logs storage
The first step in this solution is to configure CloudTrail to collect activity logs. Centralizing logs in an S3 bucket is essential for easy and efficient management.
Cloudtrail trail
In the master account, create an "Organization trail" with this settings:
- Apply trail to my organization: Enabled for all accounts
- Multi-region: yes
- Trail log location: centralized S3 bucket name
- Log file validation: enabled
- Log file SSE-KMS encryption: disable (encryption is done by S3)
- Event type: Management events
- API activity: Read & Write
Be careful of enabling data events. If you have workloads with intensive use of S3, you may end with a tremendous amount of logs. It would have a cost impact.
S3 Bucket
In the security account, create a bucket with SSE encryption enable. Edit the bucket policy to allow Cloudtrail to push log files. Template:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AWSCloudTrailAclCheck20150319",
"Effect": "Allow",
"Principal": {
"Service": [
"cloudtrail.amazonaws.com"
]
},
"Action": "s3:GetBucketAcl",
"Resource": "arn:aws:s3:::[BUCKET_NAME]",
"Condition": {
"StringEquals": {
"aws:SourceArn": "arn:aws:cloudtrail:[REGION]:[SECURITY_ACCOUNT_ID]:trail/[TRAIL_NAME]"
}
}
},
{
"Sid": "AWSCloudTrailWrite20150319",
"Effect": "Allow",
"Principal": {
"Service": [
"cloudtrail.amazonaws.com"
]
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::[BUCKET_NAME]/AWSLogs/[SECURITY_ACCOUNT_ID]/*",
"Condition": {
"StringEquals": {
"s3:x-amz-acl": "bucket-owner-full-control",
"aws:SourceArn": "arn:aws:cloudtrail:[REGION]:[SECURITY_ACCOUNT_ID]:trail/[TRAIL_NAME]"
}
}
},
{
"Sid": "AWSCloudTrailOrganizationWrite20150319",
"Effect": "Allow",
"Principal": {
"Service": [
"cloudtrail.amazonaws.com"
]
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/*",
"Condition": {
"StringEquals": {
"s3:x-amz-acl": "bucket-owner-full-control",
"aws:SourceArn": "arn:aws:cloudtrail:[REGION]:[SECURITY_ACCOUNT_ID]:trail/[TRAIL_NAME]"
}
}
}
]
}
For finops and compliance, we recommend to create a lifecycle policy to move S3 object to cheaper storage class and delete too old logs.
Cost estimation
One concern that often delays adoption is cost. Here is a realistic breakdown for a 10-account organization:
CloudTrail
- Management events: the first copy of management events delivered to S3 is free per account. Additional copies cost $2.00 per 100,000 events. For most accounts running typical workloads, a month of management events stays well under $5 per account.
- Data events (S3 object-level, Lambda invocations): $0.10 per 100,000 events. Enable these selectively — high-traffic S3 buckets can generate millions of events per day.
Athena
- $5 per TB of data scanned. Partition projection (configured in the table creation query below) dramatically reduces scanned data by skipping irrelevant partitions. A typical daily query against a 10-account org scanning a single day's logs will process less than 100 MB — a fraction of a cent per query run.
Lambda
- Near-zero cost. The function runs on a schedule (e.g., hourly or daily), executes for a few seconds, and stays within the free tier for most organizations.
Total realistic cost: $20–$50/month for a 10-account organization, heavily dominated by CloudTrail if you enable data events. Management-events-only monitoring often stays under $10/month.
Logs analysis
Athena plays a central role in this solution by allowing SQL querying of logs stored in S3.
Table creation
To optimize cost and performance, we leverage partitioning as the logs are stored using this pattern: "[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/[ACCOUNT_ID]/CloudTrail/[REGION]/[YYYY]/[MM]/[DD]".
This ends up with this query to create the table:
CREATE EXTERNAL TABLE cloudtrail_logs(
eventVersion STRING,
userIdentity STRUCT<
type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
invokedBy: STRING,
accessKeyId: STRING,
userName: STRING,
sessionContext: STRUCT<
attributes: STRUCT<
mfaAuthenticated: STRING,
creationDate: STRING>,
sessionIssuer: STRUCT<
type: STRING,
principalId: STRING,
arn: STRING,
accountId: STRING,
userName: STRING>,
ec2RoleDelivery:string,
webIdFederationData:map<string,string>
>
>,
eventTime STRING,
eventSource STRING,
eventName STRING,
awsRegion STRING,
sourceIpAddress STRING,
userAgent STRING,
errorCode STRING,
errorMessage STRING,
requestparameters STRING,
responseelements STRING,
additionaleventdata STRING,
requestId STRING,
eventId STRING,
readOnly STRING,
resources ARRAY<
STRUCT<
arn: STRING,
accountId: STRING,
type: STRING
>
>,
eventType STRING,
apiVersion STRING,
recipientAccountId STRING,
serviceEventDetails STRING,
sharedEventID STRING,
vpcendpointid STRING,
tlsDetails STRUCT<
tlsVersion:string,
cipherSuite:string,
clientProvidedHostHeader:string
>
)
PARTITIONED BY (`account` string, `region` string, `timestamp` string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/'
TBLPROPERTIES (
'projection.enabled'='true',
'projection.region.type'='enum',
'projection.region.values'='us-east-1,us-east-2,us-west-1,us-west-2,ca-central-1,sa-east-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,af-south-1,me-south-1,ap-east-1,ap-south-1,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1',
'projection.timestamp.type'='date',
'projection.timestamp.format'='yyyy/MM/dd',
'projection.timestamp.interval'='1',
'projection.timestamp.interval.unit'='DAYS',
'projection.timestamp.range'='2020/01/01,NOW',
'projection.account.type'='enum',
'projection.account.values'='[COMMA_SEPARATED_ACCOUNT_IDS_LIST]',
'storage.location.template'='s3://[BUCKET_NAME]/AWSLogs/[ORGANIZATION_ID]/${account}/CloudTrail/${region}/${timestamp}/'
)
Queries example
Detect root login during last hour:
SELECT
eventtime,
account,
useridentity.type as usertype,
useridentity.arn as identity
FROM "default"."cloudtrail_logs"
WHERE
"timestamp" = format_datetime(current_timestamp, 'Y/MM/dd')
AND from_iso8601_timestamp(eventtime) > date_trunc('second', current_timestamp - interval '1' hour)
AND eventSource = 'signin.amazonaws.com'
AND eventName = 'ConsoleLogin'
AND useridentity.type = 'Root'
ORDER BY eventtime
Detect resource deletion today:
SELECT
eventtime,
account,
useridentity.type as usertype,
useridentity.arn as identity,
eventsource,
eventname
FROM "default"."cloudtrail_logs"
WHERE
"timestamp" = format_datetime(current_timestamp, 'Y/MM/dd')
AND (lower(eventName) like '%delete%' OR lower(eventName) like '%remove%')
ORDER BY eventtime
Detect IAM policy changes:
SELECT
eventtime,
account,
awsregion,
useridentity.arn as identity,
eventname,
requestparameters
FROM "default"."cloudtrail_logs"
WHERE
"timestamp" = format_datetime(current_timestamp, 'Y/MM/dd')
AND eventSource = 'iam.amazonaws.com'
AND eventName IN (
'AttachUserPolicy', 'DetachUserPolicy',
'AttachRolePolicy', 'DetachRolePolicy',
'CreatePolicy', 'DeletePolicy',
'PutUserPolicy', 'DeleteUserPolicy',
'PutRolePolicy', 'DeleteRolePolicy'
)
ORDER BY eventtime
Detect security group ingress rules opened to the world:
SELECT
eventtime,
account,
awsregion,
useridentity.arn as identity,
eventname,
requestparameters
FROM "default"."cloudtrail_logs"
WHERE
"timestamp" = format_datetime(current_timestamp, 'Y/MM/dd')
AND eventSource = 'ec2.amazonaws.com'
AND eventName = 'AuthorizeSecurityGroupIngress'
AND (
requestparameters LIKE '%0.0.0.0/0%'
OR requestparameters LIKE '%::/0%'
)
ORDER BY eventtime
Detect S3 bucket policy changes:
SELECT
eventtime,
account,
awsregion,
useridentity.arn as identity,
eventname,
requestparameters
FROM "default"."cloudtrail_logs"
WHERE
"timestamp" = format_datetime(current_timestamp, 'Y/MM/dd')
AND eventSource = 's3.amazonaws.com'
AND eventName IN ('PutBucketPolicy', 'DeleteBucketPolicy', 'PutBucketAcl')
ORDER BY eventtime
Detect console logins without MFA:
SELECT
eventtime,
account,
sourceipaddress,
useridentity.arn as identity,
useridentity.type as usertype
FROM "default"."cloudtrail_logs"
WHERE
"timestamp" = format_datetime(current_timestamp, 'Y/MM/dd')
AND eventSource = 'signin.amazonaws.com'
AND eventName = 'ConsoleLogin'
AND useridentity.type != 'Root'
AND json_extract_scalar(
json_extract_scalar(additionaleventdata, '$.MFAUsed'),
'#x27;
) = 'No'
ORDER BY eventtime
Send notifications
Lambda
A Lambda function is triggered by an scheduled event rule. The schedule depends on the timeframe of the query (today, yesterday, last hour, last week…). Thanks to boto3, the AWS SDK for python, the function runs Athena query execution.
...
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
params = {
"region": "eu-west-1",
"database": "default",
"bucket": "aws-athena-query-results-123456789-eu-west-1",
"path": "temp/athena/output",
"query": f'select eventtime, \
account, \
useridentity.arn as identity \
FROM "default"."cloudtrail_logs" \
WHERE "timestamp" = \'{yesterday.strftime("%Y/%m/%d")}\' \
AND eventSource = \'signin.amazonaws.com\' \
AND eventName = \'ConsoleLogin\' \
AND useridentity.type = \'Root\' \
order by eventtime',
}
client = boto3.client("athena")
# This function executes the query and returns the query execution ID
response_query_execution_id = client.start_query_execution(
QueryString=params["query"],
QueryExecutionContext={"Database": "default"},
ResultConfiguration={
"OutputLocation": "s3://" + params["bucket"] + "/" + params["path"]
}
)
...
The results of the query is then downloaded and parsed to prepare the content of the notification. Notification destination will depends of your organization. Below two examples.
Email notification
Emails are sent via the Simple Email Service (SES). Again, we use boto3 to use its API.
def send_email(data):
ses = boto3.client('ses')
sender = os.environ["email_sender"]
recipients = [ os.environ["email_recipient"] ]
html, txt = build_email_content(data)
response = ses.send_email(
Source=sender,
Destination={'ToAddresses': recipients},
Message={
'Subject': {
'Data': "AWS - Resource deletion report",
'Charset': 'UTF-8'
},
'Body': {
'Html': {
'Data': html,
'Charset': 'UTF-8'
},
'Text': {
'Data': txt,
'Charset': 'UTF-8'
}
}
}
)
Don't forget to verify your identities in SES before sending email.
Slack notification
Slack notification is done by calling an http "incoming webhook". Here the documentation to create one: https://api.slack.com/messaging/webhooks
def send_slack_notification(data):
for result in data:
event_time = result["eventtime"]
user = result["user"]
account_id = result["account"]
region = result["region"]
event_source = result["eventsource"]
event_name = result["eventname"]
text = f"{event_time} | Resource deletion alert : {user} deleted a resource in {region} in the account {account_id} with the event {event_source}/{event_name}"
post_message_to_slack(text)
def post_message_to_slack(text,):
http = urllib3.PoolManager()
body = {"channel": os.environ["slack_channel"], "text": text}
response = http.request(
"POST",
os.environ["slack_webhook"],
body=json.dumps(body),
headers={"Content-Type": "application/json"},
)
Alert on what matters — avoiding alert fatigue
The queries above will catch real threats, but they will also fire on legitimate automation. A CI/CD pipeline that deploys infrastructure every hour will produce dozens of IAM and security group events per day. If every event triggers a notification, your team will start ignoring them within a week.
A few practical techniques to keep the signal-to-noise ratio high:
Use result counts as a gate. In your Lambda function, only send a notification if the query returns results above a threshold. A single DeletePolicy event might be a developer cleaning up a test policy. Ten DeletePolicy events in one hour from the same identity warrants attention.
# Only alert if more than N results were returned
MIN_RESULTS_TO_ALERT = 1 # set to higher values for noisy queries
results = get_query_results(query_execution_id)
if len(results) >= MIN_RESULTS_TO_ALERT:
send_slack_notification(results)
Suppress known service accounts. Most organizations have IAM roles used by automation — Terraform, CI/CD pipelines, data pipelines. Add a filter to your queries to exclude their ARNs:
AND useridentity.arn NOT LIKE '%terraform-deployer%'
AND useridentity.arn NOT LIKE '%github-actions%'
Prioritize by event severity. Not all events deserve the same response time. A useful mental model:
| Severity | Examples | Response |
|---|---|---|
| Critical | Root login, IAM policy attach on unknown identity | Alert immediately (real-time via EventBridge) |
| High | Security group open to 0.0.0.0/0, S3 bucket policy change | Alert within 1 hour (scheduled Lambda) |
| Medium | Resource deletion, console login without MFA | Daily digest |
| Low | Read-only access from unusual region | Weekly review |
Start by implementing alerts only for the Critical and High tiers. Expand once you have tuned the suppression rules and your team has built the habit of reviewing them.
Use time-based thresholds wisely. The root login query uses a 1-hour window. For less urgent events, use a daily window and send a digest rather than per-event alerts. This keeps alert volume predictable and gives responders context from the full day's activity.
Conclusion
This solution can be your first step in your security journey. Its serverless nature makes it simple to deploy, operate, and cheap — no agents to manage, no SIEM license to negotiate, and no proprietary data format to deal with. CloudTrail logs are yours, stored in your own S3 bucket, queryable with standard SQL.
When an IAM event fires or a security group is opened to the internet, understanding the blast radius requires more than a log entry. VizCon complements this monitoring setup by giving you a live visual map of your AWS network topology — when an alert fires, you can immediately see which resources are connected to the affected security group, which VPCs are involved, and what the exposure surface looks like. Correlating a security event with a network diagram cuts investigation time significantly.
What's next
This solution is intentionally simple to deploy, but there are several natural extensions worth considering once it is running:
Real-time alerting with EventBridge. The scheduled Lambda approach introduces a lag (up to 1 hour for hourly schedules). For Critical-tier events — root logins, IAM policy changes — you can replace the schedule trigger with an EventBridge rule that matches specific CloudTrail event patterns and invokes Lambda directly. This reduces detection latency to seconds.
AWS Security Hub. Security Hub aggregates findings from GuardDuty, Inspector, Macie, and partner integrations into a single pane of glass. It also accepts custom findings via its API, so you can push results from your Athena queries into Security Hub for unified tracking and workflow integration (e.g., automatic Jira ticket creation).
Amazon GuardDuty. GuardDuty uses machine learning to detect anomalies in CloudTrail, VPC Flow Logs, and DNS logs that rule-based queries would miss — unusual API call patterns from a new geography, credential exfiltration attempts, or communication with known malicious IPs. It is a natural complement to the explicit detection queries above: your queries catch known-bad patterns, GuardDuty catches unknown-bad behavior.
Athena query federation. As your organization grows, you can extend the same query pattern to VPC Flow Logs and Route 53 DNS logs by creating additional Athena tables. This gives you a unified security query layer across network, DNS, and control-plane activity without any additional infrastructure.
Full code is available there: https://github.com/maitelkamel-mks/aws-sample/tree/main/cloudtrail-notifications. As a bonus, you'll find a lambda that creates or updates the Athena table according to account ids.




