# SECURE: Using AWS Secrets Manager for credential management
import boto3
import json
from botocore.exceptions import ClientError
from functools import lru_cache
import os
class AWSSecretsManager:
def __init__(self, region_name=None):
self.region_name = region_name or os.getenv('AWS_DEFAULT_REGION', 'us-east-1')
self.secrets_client = boto3.client('secretsmanager', region_name=self.region_name)
self.ssm_client = boto3.client('ssm', region_name=self.region_name)
@lru_cache(maxsize=32)
def get_secret(self, secret_name):
"""Retrieve secret from AWS Secrets Manager with caching"""
try:
response = self.secrets_client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
raise ValueError(f"Secret {secret_name} not found")
elif e.response['Error']['Code'] == 'InvalidRequestException':
raise ValueError(f"Invalid request for secret {secret_name}")
else:
raise ValueError(f"Error retrieving secret {secret_name}: {e}")
@lru_cache(maxsize=32)
def get_parameter(self, parameter_name, decrypt=True):
"""Retrieve parameter from SSM Parameter Store with caching"""
try:
response = self.ssm_client.get_parameter(
Name=parameter_name,
WithDecryption=decrypt
)
return response['Parameter']['Value']
except ClientError as e:
if e.response['Error']['Code'] == 'ParameterNotFound':
raise ValueError(f"Parameter {parameter_name} not found")
else:
raise ValueError(f"Error retrieving parameter {parameter_name}: {e}")
def get_database_credentials(self, secret_name):
"""Get database credentials from Secrets Manager"""
secret = self.get_secret(secret_name)
required_keys = ['username', 'password', 'host', 'port', 'dbname']
missing_keys = [key for key in required_keys if key not in secret]
if missing_keys:
raise ValueError(f"Missing database credential keys: {missing_keys}")
return secret
def get_api_credentials(self, secret_name):
"""Get API credentials from Secrets Manager"""
secret = self.get_secret(secret_name)
if 'api_key' not in secret:
raise ValueError("API key not found in secret")
return secret
class SecureAWSServiceManager:
def __init__(self, secrets_manager=None):
self.secrets_manager = secrets_manager or AWSSecretsManager()
self._clients = {}
def get_s3_client_with_credentials(self, credentials_secret_name):
"""Get S3 client using credentials from Secrets Manager"""
if credentials_secret_name not in self._clients:
credentials = self.secrets_manager.get_secret(credentials_secret_name)
required_keys = ['access_key_id', 'secret_access_key']
missing_keys = [key for key in required_keys if key not in credentials]
if missing_keys:
raise ValueError(f"Missing AWS credential keys: {missing_keys}")
client = boto3.client(
's3',
aws_access_key_id=credentials['access_key_id'],
aws_secret_access_key=credentials['secret_access_key'],
aws_session_token=credentials.get('session_token'),
region_name=credentials.get('region', 'us-east-1')
)
self._clients[credentials_secret_name] = client
return self._clients[credentials_secret_name]
def get_rds_connection_string(self, db_secret_name):
"""Get RDS connection string from Secrets Manager"""
db_creds = self.secrets_manager.get_database_credentials(db_secret_name)
return (
f"postgresql://{db_creds['username']}:{db_creds['password']}"
f"@{db_creds['host']}:{db_creds['port']}/{db_creds['dbname']}"
)
def get_external_api_client(self, api_secret_name):
"""Get external API client with credentials from Secrets Manager"""
api_creds = self.secrets_manager.get_api_credentials(api_secret_name)
# Example: Return configured API client
return ExternalAPIClient(
api_key=api_creds['api_key'],
base_url=api_creds.get('base_url'),
timeout=api_creds.get('timeout', 30)
)
# Usage example:
# First, store credentials in AWS Secrets Manager:
# aws secretsmanager create-secret \
# --name "prod/app/aws-credentials" \
# --description "AWS credentials for production app" \
# --secret-string '{"access_key_id":"AKIA...","secret_access_key":"...","region":"us-east-1"}'
# Then use in application:
secretsManager = AWSSecretsManager()
serviceManager = SecureAWSServiceManager(secretsManager)
# Get S3 client using credentials from Secrets Manager
s3_client = serviceManager.get_s3_client_with_credentials('prod/app/aws-credentials')
# Get database connection string
db_url = serviceManager.get_rds_connection_string('prod/app/database-credentials')