import subprocess
import hashlib
import os
import re
from pathlib import Path
from typing import Dict, List, Any, Optional
from flask import Flask, request, jsonify
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
class SecureFileProcessor:
ALLOWED_OPERATIONS = {
'backup': {'method': 'native_copy'},
'info': {'method': 'native_stat'},
'hash': {'method': 'native_hash'},
'count': {'method': 'subprocess', 'command': ['wc', '-l']},
'validate': {'method': 'subprocess', 'command': ['file']}
}
ALLOWED_DIRS = [
Path('/secure/uploads').resolve(),
Path('/secure/documents').resolve()
]
BACKUP_DIR = Path('/secure/backup').resolve()
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
def __init__(self):
# Ensure directories exist
for directory in self.ALLOWED_DIRS + [self.BACKUP_DIR]:
directory.mkdir(parents=True, exist_ok=True)
def process_request(self, operation: str, filename: str,
options: Optional[Dict] = None) -> Dict[str, Any]:
try:
# Validate operation
if operation not in self.ALLOWED_OPERATIONS:
raise ValueError(f'Operation not allowed: {operation}')
# Validate and resolve file path
file_path = self._validate_file_path(filename)
# Execute operation
op_config = self.ALLOWED_OPERATIONS[operation]
if op_config['method'] == 'native_copy':
return self._native_copy(file_path, options or {})
elif op_config['method'] == 'native_stat':
return self._native_stat(file_path)
elif op_config['method'] == 'native_hash':
return self._native_hash(file_path)
elif op_config['method'] == 'subprocess':
return self._safe_subprocess(op_config['command'], file_path)
else:
raise ValueError('Unknown operation method')
except Exception as e:
logger.error(f'File processing error: {e}')
raise
def _validate_file_path(self, filename: str) -> Path:
# Strict filename validation
if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
raise ValueError('Invalid filename format')
if len(filename) > 255:
raise ValueError('Filename too long')
if '..' in filename:
raise ValueError('Directory traversal not allowed')
# Find file in allowed directories
for allowed_dir in self.ALLOWED_DIRS:
candidate_path = (allowed_dir / filename).resolve()
# Ensure path is within allowed directory
if not str(candidate_path).startswith(str(allowed_dir)):
continue
# Check if file exists and is accessible
if candidate_path.exists() and candidate_path.is_file():
# Check file size
if candidate_path.stat().st_size > self.MAX_FILE_SIZE:
raise ValueError('File too large')
return candidate_path
raise ValueError('File not found in allowed directories')
def _native_copy(self, source_path: Path, options: Dict) -> Dict[str, Any]:
# Generate unique backup filename
timestamp = int(time.time())
backup_name = f"{source_path.stem}_{timestamp}{source_path.suffix}.bak"
backup_path = self.BACKUP_DIR / backup_name
# SECURE: Native Python file copy
import shutil
shutil.copy2(source_path, backup_path)
return {
'operation': 'backup',
'success': True,
'source': source_path.name,
'backup': backup_name,
'size': backup_path.stat().st_size
}
def _native_stat(self, file_path: Path) -> Dict[str, Any]:
stat = file_path.stat()
return {
'operation': 'info',
'filename': file_path.name,
'size': stat.st_size,
'size_human': self._format_size(stat.st_size),
'modified': stat.st_mtime,
'permissions': oct(stat.st_mode)[-3:],
'is_readable': os.access(file_path, os.R_OK),
'is_writable': os.access(file_path, os.W_OK)
}
def _native_hash(self, file_path: Path) -> Dict[str, Any]:
algorithms = ['md5', 'sha1', 'sha256']
hashes = {}
with open(file_path, 'rb') as f:
content = f.read()
for algo in algorithms:
hasher = hashlib.new(algo)
hasher.update(content)
hashes[algo] = hasher.hexdigest()
return {
'operation': 'hash',
'filename': file_path.name,
'size': len(content),
'hashes': hashes
}
def _safe_subprocess(self, base_command: List[str],
file_path: Path) -> Dict[str, Any]:
# SECURE: Subprocess with argument list
command = base_command + [str(file_path)]
try:
result = subprocess.run(
command, # Command as list - no shell interpretation
capture_output=True,
text=True,
timeout=30,
cwd=str(file_path.parent) # Safe working directory
)
return {
'operation': 'subprocess',
'command': ' '.join(base_command),
'filename': file_path.name,
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode,
'success': result.returncode == 0
}
except subprocess.TimeoutExpired:
raise RuntimeError('Command execution timeout')
except Exception as e:
raise RuntimeError(f'Command execution failed: {e}')
def _format_size(self, size: int) -> str:
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
# Initialize processor
processor = SecureFileProcessor()
# SECURE: Flask endpoints with proper validation
@app.route('/backup', methods=['POST'])
def backup_file():
try:
data = request.get_json()
if not data:
return jsonify({'error': 'JSON data required'}), 400
filename = data.get('source', '').strip()
if not filename:
return jsonify({'error': 'Source filename required'}), 400
result = processor.process_request('backup', filename)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
logger.error(f'Backup error: {e}')
return jsonify({'error': 'Backup operation failed'}), 500
@app.route('/info/<filename>', methods=['GET'])
def get_file_info(filename):
try:
result = processor.process_request('info', filename)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
logger.error(f'Info error: {e}')
return jsonify({'error': 'Info operation failed'}), 500
@app.route('/hash/<filename>', methods=['GET'])
def calculate_hash(filename):
try:
result = processor.process_request('hash', filename)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
logger.error(f'Hash error: {e}')
return jsonify({'error': 'Hash operation failed'}), 500
@app.route('/operations', methods=['GET'])
def list_operations():
operations = list(processor.ALLOWED_OPERATIONS.keys())
return jsonify({
'available_operations': operations,
'allowed_directories': [str(d) for d in processor.ALLOWED_DIRS],
'max_file_size': processor.MAX_FILE_SIZE
})
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Endpoint not found'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
if __name__ == '__main__':
import time
app.run(host='127.0.0.1', port=5000, debug=False)