from flask import Flask, request, send_file, abort, jsonify
import os
import re
import hashlib
import sqlite3
from pathlib import Path
from urllib.parse import unquote
app = Flask(__name__)
class SecureFileManager:
def __init__(self, base_dir):
self.base_dir = Path(base_dir).resolve()
self.allowed_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.pdf', '.txt'}
self.max_file_size = 10 * 1024 * 1024 # 10MB
# Initialize file database
self.init_db()
def init_db(self):
"""Initialize SQLite database for file tracking"""
self.conn = sqlite3.connect('files.db', check_same_thread=False)
self.conn.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY,
token TEXT UNIQUE NOT NULL,
filename TEXT NOT NULL,
file_path TEXT NOT NULL,
file_hash TEXT NOT NULL,
upload_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def validate_filename(self, filename):
"""Comprehensive filename validation"""
if not filename or not isinstance(filename, str):
raise ValueError("Invalid filename")
# URL decode multiple times to catch double encoding
decoded = filename
for _ in range(3):
new_decoded = unquote(decoded)
if new_decoded == decoded:
break
decoded = new_decoded
# Remove path components
clean_name = os.path.basename(decoded)
# Validate against dangerous patterns
dangerous_patterns = [
r'\.\.', # Parent directory
r'[<>:"|?*\\]', # Dangerous characters
r'^(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])$', # Windows reserved
r'^\.' # Hidden files
]
for pattern in dangerous_patterns:
if re.search(pattern, clean_name, re.IGNORECASE):
raise ValueError(f"Filename contains invalid pattern: {pattern}")
# Check extension
ext = Path(clean_name).suffix.lower()
if ext not in self.allowed_extensions:
raise ValueError(f"Extension {ext} not allowed")
# Length validation
if len(clean_name) > 255 or len(clean_name) == 0:
raise ValueError("Invalid filename length")
return clean_name
def secure_path_join(self, filename):
"""Securely join filename with base directory"""
safe_name = self.validate_filename(filename)
# Create full path
full_path = (self.base_dir / safe_name).resolve()
# Ensure path is within base directory
try:
full_path.relative_to(self.base_dir)
except ValueError:
raise ValueError("Path traversal attempt detected")
return full_path
def register_file(self, filename, file_path):
"""Register file in database and return access token"""
# Generate secure token
token = hashlib.sha256(
f"{filename}{file_path}{os.urandom(32)}".encode()
).hexdigest()[:32]
# Calculate file hash for integrity
with open(file_path, 'rb') as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
# Store in database
self.conn.execute(
'INSERT INTO files (token, filename, file_path, file_hash) VALUES (?, ?, ?, ?)',
(token, filename, str(file_path), file_hash)
)
self.conn.commit()
return token
def get_file_by_token(self, token):
"""Retrieve file information by token"""
if not re.match(r'^[a-f0-9]{32}$', token):
raise ValueError("Invalid token format")
cursor = self.conn.execute(
'SELECT filename, file_path, file_hash FROM files WHERE token = ?',
(token,)
)
row = cursor.fetchone()
if not row:
raise FileNotFoundError("File not found")
filename, file_path, stored_hash = row
# Verify file still exists
if not os.path.exists(file_path):
raise FileNotFoundError("File no longer exists")
# Verify file integrity
with open(file_path, 'rb') as f:
current_hash = hashlib.sha256(f.read()).hexdigest()
if current_hash != stored_hash:
raise ValueError("File integrity check failed")
return {
'filename': filename,
'path': file_path,
'hash': stored_hash
}
def list_files(self):
"""List all registered files"""
cursor = self.conn.execute(
'SELECT token, filename, upload_date FROM files ORDER BY upload_date DESC'
)
return [{
'token': row[0],
'filename': row[1],
'upload_date': row[2]
} for row in cursor.fetchall()]
# Initialize secure file manager
file_manager = SecureFileManager('uploads')
# SECURE: Token-based file download
@app.route('/download/')
def download_file(token):
try:
file_info = file_manager.get_file_by_token(token)
# Security headers
response = send_file(
file_info['path'],
as_attachment=True,
download_name=file_info['filename']
)
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
return response
except (ValueError, FileNotFoundError) as e:
app.logger.warning(f"Download attempt failed: {e}")
abort(404)
except Exception as e:
app.logger.error(f"Download error: {e}")
abort(500)
# SECURE: Protected image serving
@app.route('/images/')
def serve_image(token):
try:
file_info = file_manager.get_file_by_token(token)
# Verify it's an image file
ext = Path(file_info['filename']).suffix.lower()
if ext not in {'.jpg', '.jpeg', '.png', '.gif'}:
abort(400)
response = send_file(file_info['path'])
response.headers['X-Content-Type-Options'] = 'nosniff'
return response
except (ValueError, FileNotFoundError):
abort(404)
except Exception as e:
app.logger.error(f"Image serve error: {e}")
abort(500)
# SECURE: Template access with whitelist
@app.route('/template')
def get_template():
# Whitelist of allowed templates
allowed_templates = {
'default': 'default.html',
'login': 'login.html',
'dashboard': 'dashboard.html'
}
template_key = request.args.get('name', 'default')
if template_key not in allowed_templates:
abort(400)
template_file = allowed_templates[template_key]
template_path = file_manager.secure_path_join(template_file)
try:
with open(template_path, 'r', encoding='utf-8') as f:
content = f.read()
return content
except FileNotFoundError:
abort(404)
except Exception as e:
app.logger.error(f"Template error: {e}")
abort(500)
# File listing endpoint
@app.route('/files')
def list_files():
try:
files = file_manager.list_files()
return jsonify({'files': files})
except Exception as e:
app.logger.error(f"File listing error: {e}")
return jsonify({'error': 'Unable to list files'}), 500
# Error handlers
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Resource not found'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
if __name__ == '__main__':
app.run(debug=False) # Never run with debug=True in production