CentralBackend/repositories/RateLimitRepository.js
2025-09-07 12:44:01 +02:00

134 lines
5.0 KiB
JavaScript

const { logger } = require('../middleware/logger');
class RateLimitRepository {
constructor(connection) {
this.connection = connection;
}
/**
* Get and lock the rate limit row for a key and window_start.
* @param {string} rateKey
* @param {Date} windowStart
* @returns {Promise<Object|null>}
*/
async getForUpdate(rateKey, windowStart) {
logger.info('RateLimitRepository.getForUpdate:start', { rateKey, windowStart });
try {
const [rows] = await this.connection.query(
`SELECT * FROM rate_limit WHERE rate_key = ? AND window_start = ? FOR UPDATE`,
[rateKey, windowStart]
);
logger.info('RateLimitRepository.getForUpdate:success', { rateKey, found: !!rows[0] });
return rows[0] || null;
} catch (error) {
logger.error('RateLimitRepository.getForUpdate:error', { rateKey, error: error.message });
throw error;
}
}
/**
* Insert a new rate limit row.
* @param {Object} data
* @returns {Promise<void>}
*/
async insert(data) {
logger.info('RateLimitRepository.insert:start', { rateKey: data.rate_key, windowStart: data.window_start });
try {
await this.connection.query(
`INSERT INTO rate_limit (rate_key, window_start, count, window_seconds, max)
VALUES (?, ?, ?, ?, ?)`,
[data.rate_key, data.window_start, data.count, data.window_seconds, data.max]
);
logger.info('RateLimitRepository.insert:success', { rateKey: data.rate_key });
} catch (error) {
logger.error('RateLimitRepository.insert:error', { rateKey: data.rate_key, error: error.message });
throw error;
}
}
/**
* Update count for an existing rate limit row.
* @param {string} rateKey
* @param {Date} windowStart
* @param {number} count
* @returns {Promise<void>}
*/
async updateCount(rateKey, windowStart, count) {
logger.info('RateLimitRepository.updateCount:start', { rateKey, windowStart, count });
try {
await this.connection.query(
`UPDATE rate_limit SET count = ? WHERE rate_key = ? AND window_start = ?`,
[count, rateKey, windowStart]
);
logger.info('RateLimitRepository.updateCount:success', { rateKey, count });
} catch (error) {
logger.error('RateLimitRepository.updateCount:error', { rateKey, error: error.message });
throw error;
}
}
/**
* Atomically get or create and update the rate limit row.
* @param {string} rateKey
* @param {Date} windowStart
* @param {number} windowSeconds
* @param {number} max
* @param {number} increment
* @returns {Promise<Object>} The updated row
*/
async incrementOrCreate(rateKey, windowStart, windowSeconds, max, increment = 1) {
logger.info('RateLimitRepository.incrementOrCreate:start', { rateKey, windowStart, windowSeconds, max, increment });
try {
// Lock row if exists
let row = await this.getForUpdate(rateKey, windowStart);
if (row) {
const newCount = row.count + increment;
await this.updateCount(rateKey, windowStart, newCount);
row.count = newCount;
logger.info('RateLimitRepository.incrementOrCreate:success', { rateKey });
return row;
} else {
await this.insert({
rate_key: rateKey,
window_start: windowStart,
count: increment,
window_seconds: windowSeconds,
max: max
});
logger.info('RateLimitRepository.incrementOrCreate:success', { rateKey });
return {
rate_key: rateKey,
window_start: windowStart,
count: increment,
window_seconds: windowSeconds,
max: max
};
}
} catch (error) {
logger.error('RateLimitRepository.incrementOrCreate:error', { rateKey, error: error.message });
throw error;
}
}
/**
* Cleanup old rate limit rows older than the specified number of days.
* @param {number} days
* @returns {Promise<void>}
*/
async cleanupOldRows(days) {
logger.info('RateLimitRepository.cleanupOldRows:start', { days });
try {
const cutoff = new Date(Date.now() - days * 24 * 60 * 60 * 1000);
await this.connection.query(
'DELETE FROM rate_limit WHERE window_start < ?', [cutoff]
);
logger.info('RateLimitRepository.cleanupOldRows:success', { days });
} catch (error) {
logger.error('RateLimitRepository.cleanupOldRows:error', { days, error: error.message });
throw error;
}
}
}
module.exports = RateLimitRepository;