← Back to Blog

AWS Security Lake: Centralized Security Analytics and SIEM Integration

7 min read

AWS Security Lake: Centralized Security Analytics and SIEM Integration

AWS Security Lake aggregates security data from across your AWS environment and third-party sources into a centralized data lake stored in S3. All data is normalized to the Open Cybersecurity Schema Framework (OCSF), eliminating the schema translation work that typically consumes most of a security engineering team's time. This guide covers practical deployment, source configuration, SIEM integration, Athena-based threat hunting, and cost management.

Understanding OCSF and Security Lake Architecture

The OCSF Schema Advantage

Before Security Lake, correlating CloudTrail events with VPC Flow Logs and Security Hub findings required writing custom parsers for each source's schema. OCSF normalizes everything into a common event format with consistent field names for actors, resources, timestamps, and severities.

Security Lake stores data as Apache Parquet files partitioned by source, region, account, and time. This partitioning is critical for query performance and cost, since Athena charges per byte scanned and partitions let you skip irrelevant data entirely.

{
  "ocsf_schema_example": {
    "class_name": "Authentication",
    "class_uid": 3002,
    "category_name": "Identity & Access Management",
    "severity_id": 1,
    "severity": "Informational",
    "activity_name": "Logon",
    "activity_id": 1,
    "time": 1706630400000,
    "actor": {
      "user": {
        "name": "[email protected]",
        "uid": "AIDACKCEVSQ6C2EXAMPLE",
        "type": "IAMUser"
      },
      "session": {
        "uid": "session-abc123",
        "mfa": true
      }
    },
    "src_endpoint": {
      "ip": "203.0.113.50",
      "location": {
        "country": "US",
        "region": "Virginia"
      }
    },
    "dst_endpoint": {
      "svc_name": "signin.amazonaws.com"
    },
    "status": "Success",
    "status_id": 1,
    "metadata": {
      "product": {
        "name": "CloudTrail",
        "vendor_name": "AWS"
      },
      "version": "1.1.0"
    },
    "cloud": {
      "account_uid": "123456789012",
      "region": "us-east-1",
      "provider": "AWS"
    }
  }
}

The key benefit is that the same query structure works regardless of whether the underlying event came from CloudTrail, VPC Flow Logs, or a third-party source. Field names like actor.user.name, src_endpoint.ip, and status are consistent across all event classes.

Source Integration

Enabling Native AWS Sources

Security Lake natively supports CloudTrail management events, CloudTrail data events (S3 and Lambda), VPC Flow Logs, Route 53 resolver query logs, Security Hub findings, and EKS audit logs. Enable these through the Security Lake console or API in the delegated administrator account.

import boto3
import json

class SecurityLakeManager:
    def __init__(self, region='us-east-1'):
        self.client = boto3.client('securitylake', region_name=region)
        self.region = region

    def configure_aws_sources(self, accounts, source_configs):
        """Enable native AWS log sources for specified accounts."""
        configurations = []

        for source in source_configs:
            configurations.append({
                'accounts': accounts,
                'regions': [self.region],
                'sourceName': source['name'],
                'sourceVersion': source.get('version', '2.0')
            })

        response = self.client.create_aws_log_source(
            sources=configurations
        )

        for failed in response.get('failed', []):
            print(f"Failed to enable source: {failed}")

        return response

    def create_custom_source(self, source_name, event_classes, crawler_role_arn):
        """Register a custom OCSF source for application security logs."""
        response = self.client.create_custom_log_source(
            sourceName=source_name,
            sourceVersion='1.0',
            configuration={
                'crawlerConfiguration': {
                    'roleArn': crawler_role_arn
                },
                'providerIdentity': {
                    'externalId': f'security-lake-custom-{source_name}',
                    'principal': crawler_role_arn.split(':')[4]
                }
            },
            eventClasses=event_classes
        )

        print(f"Custom source created: {source_name}")
        print(f"  S3 location: {response['source']['attributes']['s3']['bucket']}")
        print(f"  Write role: {response['source']['providerIdentity']}")

        return response['source']

    def create_siem_subscriber(self, subscriber_name, siem_account_id, sources):
        """Create a subscriber for SIEM data access."""
        source_list = []
        for source in sources:
            if source.get('custom'):
                source_list.append({
                    'customLogSource': {
                        'sourceName': source['name'],
                        'sourceVersion': source.get('version', '1.0')
                    }
                })
            else:
                source_list.append({
                    'awsLogSource': {
                        'sourceName': source['name'],
                        'sourceVersion': source.get('version', '2.0')
                    }
                })

        response = self.client.create_subscriber(
            subscriberName=subscriber_name,
            subscriberIdentity={
                'externalId': f'siem-{subscriber_name}-ext-id',
                'principal': siem_account_id
            },
            sources=source_list,
            accessTypes=['S3']
        )

        # Create notification for new data
        self.client.create_subscriber_notification(
            subscriberName=subscriber_name,
            configuration={
                'sqsNotificationConfiguration': {}
            }
        )

        return response['subscriber']

# Usage
lake = SecurityLakeManager()

# Enable CloudTrail and VPC Flow Logs for member accounts
lake.configure_aws_sources(
    accounts=['111122223333', '444455556666'],
    source_configs=[
        {'name': 'CLOUD_TRAIL_MGMT', 'version': '2.0'},
        {'name': 'VPC_FLOW', 'version': '2.0'},
        {'name': 'ROUTE53', 'version': '2.0'},
        {'name': 'SH_FINDINGS', 'version': '2.0'}
    ]
)

For custom sources, your application writes OCSF-formatted Parquet files to the S3 location provided by Security Lake. The Glue crawler registered during source creation automatically updates the data catalog so Athena can query the new data.

Querying with Athena for Threat Hunting

Practical Investigation Queries

Athena queries against Security Lake data use standard SQL. The OCSF schema makes cross-source correlation straightforward because you can join on consistent fields like IP addresses, user identities, and timestamps.

#!/bin/bash
# Athena queries for Security Lake threat hunting

# 1. Find failed console logins followed by successful API calls from same IP
# (potential credential stuffing that succeeded)
QUERY_1="
SELECT
    auth.src_endpoint.ip AS source_ip,
    auth.actor.user.name AS username,
    auth.time AS auth_time,
    auth.status AS auth_status,
    api.activity_name AS subsequent_action,
    api.time AS action_time
FROM amazon_security_lake_table_us_east_1_sh_findings_2_0 auth
JOIN amazon_security_lake_table_us_east_1_cloud_trail_mgmt_2_0 api
    ON auth.src_endpoint.ip = api.src_endpoint.ip
    AND api.time BETWEEN auth.time AND auth.time + 3600000
WHERE auth.class_uid = 3002
    AND auth.status = 'Failure'
    AND api.status = 'Success'
    AND auth.time > CAST(to_unixtime(current_timestamp - interval '24' hour) * 1000 AS bigint)
ORDER BY auth.time DESC
LIMIT 100;
"

# 2. Detect unusual cross-account role assumptions
QUERY_2="
SELECT
    actor.user.name AS assuming_principal,
    cloud.account_uid AS target_account,
    dst_endpoint.svc_name AS assumed_role,
    COUNT(*) AS assumption_count,
    ARRAY_AGG(DISTINCT src_endpoint.ip) AS source_ips
FROM amazon_security_lake_table_us_east_1_cloud_trail_mgmt_2_0
WHERE activity_name = 'AssumeRole'
    AND status = 'Success'
    AND actor.user.account_uid != cloud.account_uid
    AND time > CAST(to_unixtime(current_timestamp - interval '7' day) * 1000 AS bigint)
GROUP BY actor.user.name, cloud.account_uid, dst_endpoint.svc_name
HAVING COUNT(*) > 50
ORDER BY assumption_count DESC;
"

# 3. Correlate VPC flow denies with CloudTrail activity from same source
QUERY_3="
SELECT
    flow.src_endpoint.ip AS blocked_ip,
    flow.dst_endpoint.port AS target_port,
    COUNT(DISTINCT flow.dst_endpoint.ip) AS targets_scanned,
    trail.activity_name AS api_activity,
    trail.actor.user.name AS api_identity
FROM amazon_security_lake_table_us_east_1_vpc_flow_2_0 flow
LEFT JOIN amazon_security_lake_table_us_east_1_cloud_trail_mgmt_2_0 trail
    ON flow.src_endpoint.ip = trail.src_endpoint.ip
    AND trail.time BETWEEN flow.time - 3600000 AND flow.time + 3600000
WHERE flow.action = 'Denied'
    AND flow.time > CAST(to_unixtime(current_timestamp - interval '24' hour) * 1000 AS bigint)
GROUP BY flow.src_endpoint.ip, flow.dst_endpoint.port, trail.activity_name, trail.actor.user.name
HAVING COUNT(DISTINCT flow.dst_endpoint.ip) > 10
ORDER BY targets_scanned DESC;
"

echo "Running threat hunting query: Failed logins followed by successful API calls"
aws athena start-query-execution \
  --query-string "$QUERY_1" \
  --work-group "security-lake-investigations" \
  --result-configuration "OutputLocation=s3://security-lake-query-results/"

These queries demonstrate the core value of OCSF normalization: correlating network denies with authentication events and API activity using consistent field names. Without a normalized schema, each of these joins would require custom field mappings.

Cost Management

Storage and Query Optimization

Security Lake costs come from three sources: S3 storage, Glue crawler runs, and Athena query scans. Partition pruning is the most impactful optimization. Always include time, region, and account filters in Athena queries to avoid scanning the entire lake. Configure S3 lifecycle rules to transition data older than 90 days to S3 Glacier Instant Retrieval and data older than 365 days to Glacier Deep Archive. For high-volume sources like VPC Flow Logs, consider enabling them only in accounts and regions where you have production workloads rather than organization-wide.

Use rollup regions to consolidate data from multiple regions into a single region for querying, reducing the number of Athena queries needed for cross-region investigations. But be aware that rollup adds S3 cross-region transfer costs, so only roll up regions you actively query.

Securing Security Analytics with AccessLens

Security Lake centralizes your security data, but the IAM policies controlling who can access that data, create subscribers, or modify source configurations are themselves a critical attack surface. An attacker who can create a subscriber gains read access to your entire security telemetry. An attacker who can modify sources can create blind spots by disabling log collection.

AccessLens protects your security analytics infrastructure by providing:

  • IAM access mapping for Security Lake resources, showing exactly which principals can read, write, or administer your security data lake
  • Subscriber access review that audits which external accounts and roles have been granted access to your security data
  • Cross-account trust analysis that verifies Security Lake delegated administrator permissions are properly scoped
  • Permission change alerts that notify you when IAM policies affecting Security Lake, Athena, or Glue resources are modified
  • Compliance validation that ensures access to security logs meets regulatory requirements for log integrity and access control

Your security data lake is only trustworthy if the access controls protecting it are verified continuously.

Secure your security analytics with AccessLens and ensure the IAM layer protecting your Security Lake deployment is as comprehensive as the data it contains.

Ready to secure your AWS environment?

Get comprehensive IAM visibility across all your AWS accounts in minutes.