const { logger } = require('../../middleware/logger'); class RateLimitRepository { constructor(connection) { this.connection = connection; } /** * Get and lock the rate limit row for a key and window_start. * @param {string} rateKey * @param {Date} windowStart * @returns {Promise} */ async getForUpdate(rateKey, windowStart) { logger.info('RateLimitRepository.getForUpdate:start', { rateKey, windowStart }); try { const [rows] = await this.connection.query( `SELECT * FROM rate_limit WHERE rate_key = ? AND window_start = ? FOR UPDATE`, [rateKey, windowStart] ); logger.info('RateLimitRepository.getForUpdate:success', { rateKey, found: !!rows[0] }); return rows[0] || null; } catch (error) { logger.error('RateLimitRepository.getForUpdate:error', { rateKey, error: error.message }); throw error; } } /** * Insert a new rate limit row. * @param {Object} data * @returns {Promise} */ async insert(data) { logger.info('RateLimitRepository.insert:start', { rateKey: data.rate_key, windowStart: data.window_start }); try { await this.connection.query( `INSERT INTO rate_limit (rate_key, window_start, count, window_seconds, max) VALUES (?, ?, ?, ?, ?)`, [data.rate_key, data.window_start, data.count, data.window_seconds, data.max] ); logger.info('RateLimitRepository.insert:success', { rateKey: data.rate_key }); } catch (error) { logger.error('RateLimitRepository.insert:error', { rateKey: data.rate_key, error: error.message }); throw error; } } /** * Update count for an existing rate limit row. * @param {string} rateKey * @param {Date} windowStart * @param {number} count * @returns {Promise} */ async updateCount(rateKey, windowStart, count) { logger.info('RateLimitRepository.updateCount:start', { rateKey, windowStart, count }); try { await this.connection.query( `UPDATE rate_limit SET count = ? WHERE rate_key = ? AND window_start = ?`, [count, rateKey, windowStart] ); logger.info('RateLimitRepository.updateCount:success', { rateKey, count }); } catch (error) { logger.error('RateLimitRepository.updateCount:error', { rateKey, error: error.message }); throw error; } } /** * Atomically get or create and update the rate limit row. * @param {string} rateKey * @param {Date} windowStart * @param {number} windowSeconds * @param {number} max * @param {number} increment * @returns {Promise} The updated row */ async incrementOrCreate(rateKey, windowStart, windowSeconds, max, increment = 1) { logger.info('RateLimitRepository.incrementOrCreate:start', { rateKey, windowStart, windowSeconds, max, increment }); try { // Lock row if exists let row = await this.getForUpdate(rateKey, windowStart); if (row) { const newCount = row.count + increment; await this.updateCount(rateKey, windowStart, newCount); row.count = newCount; logger.info('RateLimitRepository.incrementOrCreate:success', { rateKey }); return row; } else { await this.insert({ rate_key: rateKey, window_start: windowStart, count: increment, window_seconds: windowSeconds, max: max }); logger.info('RateLimitRepository.incrementOrCreate:success', { rateKey }); return { rate_key: rateKey, window_start: windowStart, count: increment, window_seconds: windowSeconds, max: max }; } } catch (error) { logger.error('RateLimitRepository.incrementOrCreate:error', { rateKey, error: error.message }); throw error; } } /** * Cleanup old rate limit rows older than the specified number of days. * @param {number} days * @returns {Promise} */ async cleanupOldRows(days) { logger.info('RateLimitRepository.cleanupOldRows:start', { days }); try { const cutoff = new Date(Date.now() - days * 24 * 60 * 60 * 1000); await this.connection.query( 'DELETE FROM rate_limit WHERE window_start < ?', [cutoff] ); logger.info('RateLimitRepository.cleanupOldRows:success', { days }); } catch (error) { logger.error('RateLimitRepository.cleanupOldRows:error', { days, error: error.message }); throw error; } } } module.exports = RateLimitRepository;