import ldap
from ldap.filter import escape_filter_chars
import logging
import re
class SecureLdapSearch:
def __init__(self, server_url):
self.conn = ldap.initialize(server_url)
# SECURE: Using limited service account
self.conn.simple_bind_s('cn=app-service,ou=services,dc=example,dc=com', 'service_pass')
self.logger = logging.getLogger(__name__)
def search_users(self, search_term, max_results=50):
# Input validation
if not self._is_valid_search_term(search_term):
self.logger.warning(f"Invalid search term rejected: {search_term}")
return []
# SECURE: Proper LDAP filter escaping
escaped_term = escape_filter_chars(search_term)
ldap_filter = f"(&(objectClass=person)(|(cn=*{escaped_term}*)(mail=*{escaped_term}*)))"
# Log the search for monitoring
self.logger.info(f"LDAP search: {ldap_filter}")
try:
# Restricted search with limited scope and attributes
result = self.conn.search_s(
'ou=users,dc=example,dc=com', # Restricted base DN
ldap.SCOPE_ONELEVEL, # Limited scope
ldap_filter,
['cn', 'mail', 'uid'] # Only necessary attributes
)
# Limit results to prevent resource exhaustion
return result[:max_results]
except ldap.LDAPError as e:
self.logger.error(f"LDAP search error: {e}")
return []
def _is_valid_search_term(self, term):
# Length validation
if not term or len(term) < 2 or len(term) > 100:
return False
# Character validation - allow only safe characters
if not re.match(r'^[a-zA-Z0-9@._\s-]+$', term):
return False
# Check for obvious injection attempts
suspicious_patterns = ['*)(', '|(', '))', '\\*', 'objectClass']
term_lower = term.lower()
if any(pattern in term_lower for pattern in suspicious_patterns):
return False
return True
def __del__(self):
try:
self.conn.unbind()
except:
pass