const WebSocket = require('ws');
const EventEmitter = require('events');
// Comprehensive WebSocket rate limiting and resource management
class WebSocketResourceManager extends EventEmitter {
constructor(options = {}) {
super();
this.config = {
maxConnections: options.maxConnections || 1000,
maxConnectionsPerIP: options.maxConnectionsPerIP || 10,
maxMessageRate: options.maxMessageRate || 60, // messages per minute
maxMessageSize: options.maxMessageSize || 10240, // 10KB
maxSubscriptionsPerConnection: options.maxSubscriptionsPerConnection || 20,
connectionTimeoutMs: options.connectionTimeoutMs || 300000, // 5 minutes
messageQueueSize: options.messageQueueSize || 100,
cleanupIntervalMs: options.cleanupIntervalMs || 60000 // 1 minute
};
this.connections = new Map();
this.ipConnections = new Map();
this.rateLimits = new Map();
this.stats = {
totalConnections: 0,
activeConnections: 0,
messagesProcessed: 0,
messagesDropped: 0,
connectionsRejected: 0
};
// Start cleanup interval
this.cleanupInterval = setInterval(() => {
this.cleanup();
}, this.config.cleanupIntervalMs);
}
canAcceptConnection(req) {
const ip = this.getClientIP(req);
// Check global connection limit
if (this.stats.activeConnections >= this.config.maxConnections) {
this.stats.connectionsRejected++;
this.emit('limitExceeded', {
type: 'GLOBAL_CONNECTION_LIMIT',
ip,
currentCount: this.stats.activeConnections,
limit: this.config.maxConnections
});
return false;
}
// Check per-IP connection limit
const ipConnectionCount = this.ipConnections.get(ip)?.size || 0;
if (ipConnectionCount >= this.config.maxConnectionsPerIP) {
this.stats.connectionsRejected++;
this.emit('limitExceeded', {
type: 'IP_CONNECTION_LIMIT',
ip,
currentCount: ipConnectionCount,
limit: this.config.maxConnectionsPerIP
});
return false;
}
return true;
}
registerConnection(ws, req) {
const connectionId = this.generateConnectionId();
const ip = this.getClientIP(req);
const now = Date.now();
const connection = {
id: connectionId,
ws,
ip,
connectedAt: now,
lastActivity: now,
messageCount: 0,
subscriptions: new Set(),
messageQueue: [],
rateLimitResets: {
messages: now + 60000, // 1 minute window
subscriptions: now + 300000 // 5 minute window
},
limits: {
messagesThisWindow: 0,
subscriptionsThisWindow: 0
}
};
// Register connection
this.connections.set(connectionId, connection);
// Track IP connections
if (!this.ipConnections.has(ip)) {
this.ipConnections.set(ip, new Set());
}
this.ipConnections.get(ip).add(connectionId);
// Update stats
this.stats.totalConnections++;
this.stats.activeConnections++;
// Set up connection handlers
ws.connectionId = connectionId;
ws.on('close', () => this.unregisterConnection(connectionId));
ws.on('error', () => this.unregisterConnection(connectionId));
this.emit('connectionRegistered', {
connectionId,
ip,
totalConnections: this.stats.activeConnections
});
return connectionId;
}
unregisterConnection(connectionId) {
const connection = this.connections.get(connectionId);
if (!connection) return;
const ip = connection.ip;
// Remove from connections
this.connections.delete(connectionId);
// Remove from IP tracking
const ipSet = this.ipConnections.get(ip);
if (ipSet) {
ipSet.delete(connectionId);
if (ipSet.size === 0) {
this.ipConnections.delete(ip);
}
}
// Update stats
this.stats.activeConnections--;
this.emit('connectionUnregistered', {
connectionId,
ip,
duration: Date.now() - connection.connectedAt,
messagesProcessed: connection.messageCount
});
}
canProcessMessage(connectionId, messageSize) {
const connection = this.connections.get(connectionId);
if (!connection) return false;
const now = Date.now();
// Check message size limit
if (messageSize > this.config.maxMessageSize) {
this.emit('limitExceeded', {
type: 'MESSAGE_SIZE_LIMIT',
connectionId,
messageSize,
limit: this.config.maxMessageSize
});
return false;
}
// Reset rate limit window if expired
if (now > connection.rateLimitResets.messages) {
connection.limits.messagesThisWindow = 0;
connection.rateLimitResets.messages = now + 60000;
}
// Check message rate limit
if (connection.limits.messagesThisWindow >= this.config.maxMessageRate) {
this.stats.messagesDropped++;
this.emit('limitExceeded', {
type: 'MESSAGE_RATE_LIMIT',
connectionId,
currentRate: connection.limits.messagesThisWindow,
limit: this.config.maxMessageRate
});
return false;
}
// Check message queue size
if (connection.messageQueue.length >= this.config.messageQueueSize) {
this.stats.messagesDropped++;
this.emit('limitExceeded', {
type: 'MESSAGE_QUEUE_FULL',
connectionId,
queueSize: connection.messageQueue.length,
limit: this.config.messageQueueSize
});
return false;
}
return true;
}
recordMessage(connectionId, messageSize) {
const connection = this.connections.get(connectionId);
if (!connection) return;
connection.lastActivity = Date.now();
connection.messageCount++;
connection.limits.messagesThisWindow++;
this.stats.messagesProcessed++;
// Add to processing queue
connection.messageQueue.push({
timestamp: Date.now(),
size: messageSize
});
}
canSubscribe(connectionId, channel) {
const connection = this.connections.get(connectionId);
if (!connection) return false;
const now = Date.now();
// Check subscription limit
if (connection.subscriptions.size >= this.config.maxSubscriptionsPerConnection) {
this.emit('limitExceeded', {
type: 'SUBSCRIPTION_LIMIT',
connectionId,
currentCount: connection.subscriptions.size,
limit: this.config.maxSubscriptionsPerConnection
});
return false;
}
// Reset subscription rate limit window if expired
if (now > connection.rateLimitResets.subscriptions) {
connection.limits.subscriptionsThisWindow = 0;
connection.rateLimitResets.subscriptions = now + 300000;
}
// Check subscription rate limit (prevent subscription flooding)
if (connection.limits.subscriptionsThisWindow >= 10) {
this.emit('limitExceeded', {
type: 'SUBSCRIPTION_RATE_LIMIT',
connectionId,
currentRate: connection.limits.subscriptionsThisWindow,
limit: 10
});
return false;
}
return true;
}
recordSubscription(connectionId, channel) {
const connection = this.connections.get(connectionId);
if (!connection) return;
connection.subscriptions.add(channel);
connection.limits.subscriptionsThisWindow++;
this.emit('subscriptionAdded', {
connectionId,
channel,
totalSubscriptions: connection.subscriptions.size
});
}
removeSubscription(connectionId, channel) {
const connection = this.connections.get(connectionId);
if (!connection) return;
connection.subscriptions.delete(channel);
this.emit('subscriptionRemoved', {
connectionId,
channel,
totalSubscriptions: connection.subscriptions.size
});
}
cleanup() {
const now = Date.now();
let cleanedConnections = 0;
let cleanedMessages = 0;
// Clean up inactive connections
for (const [connectionId, connection] of this.connections.entries()) {
const inactiveTime = now - connection.lastActivity;
// Close connections that have been inactive too long
if (inactiveTime > this.config.connectionTimeoutMs) {
try {
connection.ws.close(1000, 'Inactive connection timeout');
} catch (error) {
// Connection already closed
}
this.unregisterConnection(connectionId);
cleanedConnections++;
continue;
}
// Clean up old messages from queue
const oldMessageCount = connection.messageQueue.length;
connection.messageQueue = connection.messageQueue.filter(
msg => now - msg.timestamp < 300000 // Keep messages for 5 minutes
);
cleanedMessages += oldMessageCount - connection.messageQueue.length;
}
if (cleanedConnections > 0 || cleanedMessages > 0) {
this.emit('cleanup', {
cleanedConnections,
cleanedMessages,
activeConnections: this.stats.activeConnections
});
}
}
getConnectionStats(connectionId) {
const connection = this.connections.get(connectionId);
if (!connection) return null;
const now = Date.now();
return {
id: connectionId,
ip: connection.ip?.replace(/\d+/g, 'XXX'), // Anonymize for privacy
connectedAt: connection.connectedAt,
lastActivity: connection.lastActivity,
uptime: now - connection.connectedAt,
inactiveTime: now - connection.lastActivity,
messageCount: connection.messageCount,
subscriptionCount: connection.subscriptions.size,
queueSize: connection.messageQueue.length,
limits: {
messagesThisWindow: connection.limits.messagesThisWindow,
subscriptionsThisWindow: connection.limits.subscriptionsThisWindow
}
};
}
getGlobalStats() {
return {
...this.stats,
connectionsPerIP: Array.from(this.ipConnections.entries()).map(
([ip, connections]) => ({
ip: ip.replace(/\d+/g, 'XXX'),
count: connections.size
})
).filter(entry => entry.count > 1), // Only show IPs with multiple connections
avgConnectionDuration: this.calculateAvgConnectionDuration(),
topChannels: this.getTopChannels()
};
}
calculateAvgConnectionDuration() {
if (this.connections.size === 0) return 0;
const now = Date.now();
const totalDuration = Array.from(this.connections.values())
.reduce((sum, conn) => sum + (now - conn.connectedAt), 0);
return Math.round(totalDuration / this.connections.size);
}
getTopChannels() {
const channelCounts = new Map();
for (const connection of this.connections.values()) {
for (const channel of connection.subscriptions) {
channelCounts.set(channel, (channelCounts.get(channel) || 0) + 1);
}
}
return Array.from(channelCounts.entries())
.sort((a, b) => b[1] - a[1])
.slice(0, 10)
.map(([channel, count]) => ({ channel, subscribers: count }));
}
getClientIP(req) {
return req.headers['x-forwarded-for']?.split(',')[0]?.trim() ||
req.headers['x-real-ip'] ||
req.socket.remoteAddress ||
'unknown';
}
generateConnectionId() {
return require('crypto').randomBytes(16).toString('hex');
}
destroy() {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
}
// Close all connections
for (const connection of this.connections.values()) {
try {
connection.ws.close(1001, 'Server shutting down');
} catch (error) {
// Ignore errors during shutdown
}
}
this.connections.clear();
this.ipConnections.clear();
this.rateLimits.clear();
}
}
// Usage example with WebSocket server
const resourceManager = new WebSocketResourceManager({
maxConnections: 1000,
maxConnectionsPerIP: 5,
maxMessageRate: 30, // 30 messages per minute
maxMessageSize: 8192, // 8KB
maxSubscriptionsPerConnection: 15
});
const wss = new WebSocket.Server({
port: 8080,
verifyClient: (info) => {
return resourceManager.canAcceptConnection(info.req);
}
});
wss.on('connection', (ws, req) => {
const connectionId = resourceManager.registerConnection(ws, req);
ws.on('message', (rawMessage) => {
const messageSize = Buffer.byteLength(rawMessage);
if (!resourceManager.canProcessMessage(connectionId, messageSize)) {
ws.send(JSON.stringify({
type: 'error',
code: 'RATE_LIMITED',
message: 'Rate limit exceeded'
}));
return;
}
resourceManager.recordMessage(connectionId, messageSize);
// Process message...
try {
const message = JSON.parse(rawMessage);
// Handle message based on type
} catch (error) {
ws.send(JSON.stringify({
type: 'error',
code: 'INVALID_JSON',
message: 'Invalid message format'
}));
}
});
});
// Monitor resource usage
resourceManager.on('limitExceeded', (event) => {
console.warn('Resource limit exceeded:', event);
});
resourceManager.on('cleanup', (event) => {
console.info('Resource cleanup completed:', event);
});