Loading...
Loading...
Loading...
Anders & A-Cube S.r.l. / March 1, 2024 โข 2 min read
A3-Shared-Notifier is a production-grade serverless notification service built with AWS Lambda and Python 3.12. It provides reliable, scalable email delivery through AWS SES with flexible triggering via SQS, SNS, and direct Lambda invocation, supporting both HTML and plain text content.
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Trigger Sources โ
โ AWS SQS โข AWS SNS โข Direct Invocation โ
โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ AWS Lambda Function โ
โ Python 3.12 โข Email Processing โข Validation โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโ
โ โ
โโโโโโโโผโโโโโโโโโโโ โโโโโโโโโโโผโโโโโโโโโโโโโ
โ AWS SES โ โ CloudWatch โ
โ - Email Send โ โ - Logs โ
โ - Bounce Track โ โ - Metrics โ
โ - Complaint Mgmtโ โ - Alarms โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโ
SQS/SNS Event โ Lambda Handler โ Validate Payload
โ
Parse Email Content
โ
Generate HTML/Text Email
โ
Send via AWS SES
โ
Log to CloudWatch โ Success
Main Handler (src/handler.py):
import json
import boto3
import logging
from typing import Dict, Any, List
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ses_client = boto3.client('ses')
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
AWS Lambda handler for email notifications
Supports SQS, SNS, and direct invocation
"""
try:
# Parse event based on source
messages = parse_event(event)
results = []
for message in messages:
result = process_message(message)
results.append(result)
return {
'statusCode': 200,
'body': json.dumps({
'processed': len(results),
'results': results
})
}
except Exception as e:
logger.error(f'Error processing event: {str(e)}', exc_info=True)
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def parse_event(event: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Parse event from different sources"""
if 'Records' in event:
# SQS or SNS event
return [
json.loads(record['body']) if 'body' in record
else json.loads(record['Sns']['Message'])
for record in event['Records']
]
else:
# Direct invocation
return [event]
def process_message(message: Dict[str, Any]) -> Dict[str, Any]:
"""Process single email message"""
# Validate required fields
validate_message(message)
# Build email
email = build_email(
sender=message['sender'],
recipients=message['recipients'],
subject=message['subject'],
text_content=message.get('text_content', ''),
html_content=message.get('html_content', ''),
bucket_name=message.get('bucket_name')
)
# Send via SES
response = ses_client.send_raw_email(
Source=message['sender'],
Destinations=message['recipients'],
RawMessage={'Data': email.as_string()}
)
logger.info(f"Email sent successfully. MessageId: {response['MessageId']}")
return {
'success': True,
'messageId': response['MessageId'],
'recipients': message['recipients']
}
def validate_message(message: Dict[str, Any]) -> None:
"""Validate message structure"""
required_fields = ['sender', 'recipients', 'subject']
for field in required_fields:
if field not in message:
raise ValueError(f'Missing required field: {field}')
if not isinstance(message['recipients'], list):
raise ValueError('Recipients must be a list')
if not message['recipients']:
raise ValueError('Recipients list cannot be empty')
# Validate at least one content type present
if not message.get('text_content') and not message.get('html_content'):
raise ValueError('Either text_content or html_content must be provided')
def build_email(
sender: str,
recipients: List[str],
subject: str,
text_content: str = '',
html_content: str = '',
bucket_name: str = ''
) -> MIMEMultipart:
"""Build MIME multipart email"""
msg = MIMEMultipart('alternative')
msg['From'] = sender
msg['To'] = ', '.join(recipients)
msg['Subject'] = subject
# Add text version
if text_content:
part1 = MIMEText(text_content, 'plain')
msg.attach(part1)
# Add HTML version
if html_content:
part2 = MIMEText(html_content, 'html')
msg.attach(part2)
# Add S3 attachment reference if provided
if bucket_name:
msg.add_header('X-S3-Bucket', bucket_name)
return msgSQS Message Format:
{
"sender": "noreply@acubeapi.com",
"recipients": ["user@example.com"],
"subject": "Your Invoice is Ready",
"text_content": "Your invoice #12345 is now available for download.",
"html_content": "<h1>Invoice Ready</h1><p>Your invoice #12345 is now available.</p>",
"bucket_name": "acube-invoices"
}SNS Topic Integration:
import boto3
sns_client = boto3.client('sns')
def publish_notification(topic_arn: str, email_data: Dict[str, Any]) -> None:
"""Publish email notification to SNS topic"""
sns_client.publish(
TopicArn=topic_arn,
Message=json.dumps(email_data),
Subject='Email Notification Request'
)SQS Queue Processing:
def send_to_queue(queue_url: str, email_data: Dict[str, Any]) -> None:
"""Send email notification to SQS queue"""
sqs_client = boto3.client('sqs')
sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(email_data)
)SAM Template (template.yaml):
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Globals:
Function:
Timeout: 30
Runtime: python3.12
MemorySize: 256
Environment:
Variables:
LOG_LEVEL: INFO
Resources:
NotifierFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: handler.lambda_handler
Description: Email notification service
Policies:
- SESCrudPolicy:
IdentityName: "*"
Events:
Local Invocation:
# Build Lambda function
sam build
# Local invocation with test event
sam local invoke NotifierFunction \
--env-vars local/env.json \
--event tests/events/sqs-email-event.json
# Start local API Gateway
sam local start-api
# Local debugging
sam local invoke NotifierFunction \
--debug-port 5858 \
--env-vars local/env.json \
--event tests/events/sqs-email-event.jsonUnit Tests (tests/unit/test_handler.py):
import unittest
from unittest.mock import patch, MagicMock
from src.handler import lambda_handler, validate_message, build_email
class TestNotifierHandler(unittest.TestCase):
def test_validate_message_success(self):
"""Test message validation with valid data"""
message = {
'sender': 'test@example.com',
'recipients': ['user@example.com'],
'subject': 'Test',
'text_content': 'Hello'
}
# Should not raise exception
validate_message(message)
def test_validate_message_missing_sender(self):
"""Test validation fails with missing sender"""
Integration Tests (tests/integration/test_ses.py):
import boto3
import pytest
from src.handler import lambda_handler
@pytest.fixture
def ses_client():
return boto3.client('ses', region_name='eu-west-1')
def test_send_email_integration(ses_client):
"""Integration test with actual SES (requires verified emails)"""
event = {
'sender': 'verified-sender@acubeapi.com',
'recipients': ['verified-recipient@acubeapi.com'],
'subject': 'Integration Test',
'text_content': 'This is an integration test',
'html_content': '<p>This is an integration test</p>'
}
response
Makefile:
.PHONY: init build up down test
init:
@make build
@make up
build:
docker-compose build
up:
docker-compose up -d
down:
docker-compose down
test:
@make tests.unit
@make tests.integration
tests.unit:
docker-compose exec python pytest tests/unit -v
tests.integration:
docker-compose exec python pytest tests/integration -v
lint:
cfn-lint template.yaml
yamllint template.yaml
invoke.local:
sam local invoke NotifierFunction \
--env-vars local/env.json \
--event tests/events/sqs-email-event.json \
--docker-network acube-notifier_defaultDeploy to Dev:
# Build Lambda function
sam build -u
# Deploy to dev environment
sam deploy \
--profile acube-dev \
--config-env dev \
--no-confirm-changesetDeploy to Production:
# Build with production optimizations
sam build -u --use-container
# Deploy to production
sam deploy \
--profile acube-prod \
--config-env production \
--no-confirm-changeset \
--parameter-overrides \
Environment=production \
LogRetentionDays=90SAM Config (samconfig.toml):
version = 0.1
[default.deploy.parameters]
stack_name = "A3-Shared-Notifier"
region = "eu-west-1"
capabilities = "CAPABILITY_IAM"
parameter_overrides = "Environment=dev"
[dev.deploy.parameters]
stack_name = "A3-Shared-Notifier-Dev"
parameter_overrides = "Environment=dev LogRetentionDays=7"
[production.deploy.parameters]
stack_name = "A3-Shared-Notifier-Prod"
parameter_overrides = "Environment=production LogRetentionDays=90"Structured Logging:
import logging
import json
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def log_event(event_type: str, data: Dict[str, Any]) -> None:
"""Log structured event to CloudWatch"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'data': data
}
logger.info(json.dumps(log_entry))
# Usage
log_event('email_sent', {
'message_id': response['MessageId'],
'recipients'
Resources:
ErrorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: NotifierErrors
MetricName: Errors
Namespace: AWS/Lambda
Statistic: Sum
Period: 300
EvaluationPeriods: 1
Threshold: 5
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: FunctionName
Value: !Ref NotifierFunctionA3-Shared-Notifier demonstrates production-ready serverless architecture with AWS Lambda, providing reliable, scalable email notifications with comprehensive testing and monitoring capabilities.
License: Proprietary (A-Cube S.r.l.)