from lxml import etree
import xml.etree.ElementTree as ET
import re
from typing import List, Optional, Any
class SecureXMLProcessor:
def __init__(self, xml_file):
self.tree = etree.parse(xml_file)
# Define allowed fields and attributes
self.allowed_fields = ['name', 'description', 'category', 'price', 'stock']
self.allowed_attributes = ['id', 'type', 'status', 'category']
self.allowed_categories = ['electronics', 'books', 'clothing', 'home']
def find_products(self, category: str, min_price: float) -> List[Any]:
try:
# Input validation
if not self._validate_category(category):
raise ValueError("Invalid category")
if not isinstance(min_price, (int, float)) or min_price < 0:
raise ValueError("Invalid price")
# SECURE: Use parameterized approach with validation
# Escape the category string properly
escaped_category = self._escape_xpath_string(category)
xpath_query = f"//product[category={escaped_category} and price >= {min_price}]"
results = self.tree.xpath(xpath_query)
# Limit results to prevent resource exhaustion
return results[:100]
except Exception as e:
print(f"Product search error: {e}")
return []
def get_user_data(self, user_id: str, field: str) -> Optional[str]:
try {
# Strict input validation
if not self._validate_user_id(user_id):
raise ValueError("Invalid user ID")
if field not in self.allowed_fields:
raise ValueError(f"Field '{field}' not allowed")
escaped_user_id = self._escape_xpath_string(user_id)
# SECURE: Absolute path with restricted context
xpath_query = f"/root/users/user[@id={escaped_user_id}]/{field}"
# Restrict to users section only
users_node = self.tree.xpath("/root/users")[0]
result = users_node.xpath(f"user[@id={escaped_user_id}]/{field}")
return result[0].text if result else None
except Exception as e:
print(f"User data access error: {e}")
return None
def search_by_attribute(self, attr_name: str, attr_value: str) -> List[Any]:
try:
# Validate attribute name against whitelist
if attr_name not in self.allowed_attributes:
raise ValueError(f"Attribute '{attr_name}' not allowed")
if not self._validate_attribute_value(attr_value):
raise ValueError("Invalid attribute value")
escaped_value = self._escape_xpath_string(attr_value)
# SECURE: Restricted search with absolute path
xpath_query = f"/root/products/product[@{attr_name}={escaped_value}]"
results = self.tree.xpath(xpath_query)
# Limit and log results
limited_results = results[:50]
if len(results) > 50:
print(f"Warning: Search returned {len(results)} results, limited to 50")
return limited_results
except Exception as e:
print(f"Attribute search error: {e}")
return []
def _validate_category(self, category: str) -> bool:
if not isinstance(category, str) or len(category) > 50:
return False
return category in self.allowed_categories
def _validate_user_id(self, user_id: str) -> bool:
if not isinstance(user_id, str) or len(user_id) > 20:
return False
# Only allow alphanumeric user IDs
return re.match(r'^[a-zA-Z0-9]+$', user_id) is not None
def _validate_attribute_value(self, value: str) -> bool:
if not isinstance(value, str) or len(value) > 100:
return False
# Allow alphanumeric, spaces, hyphens, underscores
return re.match(r'^[a-zA-Z0-9\s._-]+$', value) is not None
def _escape_xpath_string(self, input_string: str) -> str:
"""Properly escape string for XPath queries"""
if input_string is None:
return "''"
# If no single quotes, wrap in single quotes
if "'" not in input_string:
return f"'{input_string}'"
# If no double quotes, wrap in double quotes
elif '"' not in input_string:
return f'"{input_string}"'
else:
# Contains both types of quotes - use concat()
parts = input_string.split("'")
escaped_parts = []
for i, part in enumerate(parts):
if i > 0:
escaped_parts.append("\"'\"") # Add escaped single quote
if part:
escaped_parts.append(f"'{part}'")
return f"concat({', '.join(escaped_parts)})"
def get_safe_product_count(self, category: str) -> int:
"""Example of safe counting without data exposure"""
try:
if not self._validate_category(category):
return 0
escaped_category = self._escape_xpath_string(category)
xpath_query = f"count(//product[category={escaped_category}])"
result = self.tree.xpath(xpath_query)
return int(result) if isinstance(result, (int, float)) else 0
except Exception as e:
print(f"Count error: {e}")
return 0