CentralBackend/repositories/matrix/MatrixRepository.js
2025-10-16 16:34:56 +02:00

290 lines
8.7 KiB
JavaScript

const mysql = require('mysql2/promise');
const fs = require('fs');
const path = require('path');
const NODE_ENV = process.env.NODE_ENV || 'development';
function getSSLConfig() {
const useSSL = String(process.env.DB_SSL || '').toLowerCase() === 'true';
const caPath = process.env.DB_SSL_CA_PATH;
if (!useSSL) return undefined;
try {
if (caPath) {
const resolved = path.resolve(caPath);
if (fs.existsSync(resolved)) {
return { ca: fs.readFileSync(resolved), rejectUnauthorized: false };
}
}
} catch (_) {}
return { rejectUnauthorized: false };
}
let dbConfig;
if (NODE_ENV === 'development') {
dbConfig = {
host: process.env.DEV_DB_HOST || 'localhost',
port: Number(process.env.DEV_DB_PORT) || 3306,
user: process.env.DEV_DB_USER || 'root',
password: process.env.DEV_DB_PASSWORD || '',
database: process.env.DEV_DB_NAME || 'profitplanet_centralserver',
ssl: undefined
};
} else {
dbConfig = {
host: process.env.DB_HOST,
port: Number(process.env.DB_PORT) || 3306,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
ssl: getSSLConfig()
};
}
const pool = mysql.createPool({
...dbConfig,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
async function ensureUserExistsByEmail(conn, email) {
const [rows] = await conn.query('SELECT id, email FROM users WHERE LOWER(email) = LOWER(?) LIMIT 1', [email]);
if (!rows.length) {
const err = new Error('Top node user not found');
err.status = 404;
throw err;
}
return rows[0];
}
async function createMatrix({ name, topNodeEmail, force = false }) {
const conn = await pool.getConnection();
try {
await conn.beginTransaction();
// ensure "name" column exists on matrix_config (safe if already exists)
try {
await conn.query(`ALTER TABLE matrix_config ADD COLUMN name VARCHAR(255) NULL`);
} catch (_) {}
// ensure user_matrix_metadata has needed columns (safe if exists)
try { await conn.query(`ALTER TABLE user_matrix_metadata ADD COLUMN name VARCHAR(255) NULL`); } catch (_) {}
try { await conn.query(`ALTER TABLE user_matrix_metadata ADD COLUMN is_active BOOLEAN DEFAULT TRUE`); } catch (_) {}
const topUser = await ensureUserExistsByEmail(conn, topNodeEmail);
// lock matrix_config row if present
const [cfgRows] = await conn.query(
'SELECT id, master_top_user_id, name FROM matrix_config WHERE id = 1 FOR UPDATE'
);
if (!cfgRows.length) {
await conn.query(
'INSERT INTO matrix_config (id, master_top_user_id, name) VALUES (1, ?, ?)',
[topUser.id, name]
);
} else {
const current = cfgRows[0];
if (!force) {
const err = new Error('Matrix already exists. Pass force=true to overwrite.');
err.status = 409;
throw err;
}
await conn.query(
'UPDATE matrix_config SET master_top_user_id = ?, name = ?, updated_at = CURRENT_TIMESTAMP WHERE id = 1',
[topUser.id, name]
);
}
// ensure self-closure for the master top user
await conn.query(
'INSERT IGNORE INTO user_tree_closure (ancestor_user_id, descendant_user_id, depth) VALUES (?, ?, 0)',
[topUser.id, topUser.id]
);
// Also register this top user as an ego matrix entry with name and mark active
await conn.query(
`
INSERT INTO user_matrix_metadata
(root_user_id, ego_activated_at, last_bfs_fill_at, immediate_children_count, first_free_position, name, is_active)
VALUES
(?, NOW(), NULL, 0, 1, ?, TRUE)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
is_active = TRUE,
ego_activated_at = COALESCE(user_matrix_metadata.ego_activated_at, VALUES(ego_activated_at)),
updated_at = CURRENT_TIMESTAMP
`,
[topUser.id, name]
);
await conn.commit();
return {
name,
masterTopUserId: topUser.id,
masterTopUserEmail: topUser.email
};
} catch (err) {
await conn.rollback();
throw err;
} finally {
conn.release();
}
}
async function ensureUserExists(conn, userId) {
const [rows] = await conn.query('SELECT id FROM users WHERE id = ?', [userId]);
if (!rows.length) {
const err = new Error('User not found');
err.status = 404;
throw err;
}
}
async function activateEgoMatrix(rootUserId) {
const conn = await pool.getConnection();
try {
await conn.beginTransaction();
await ensureUserExists(conn, rootUserId);
await conn.query(
'INSERT IGNORE INTO user_tree_closure (ancestor_user_id, descendant_user_id, depth) VALUES (?, ?, 0)',
[rootUserId, rootUserId]
);
const [childRows] = await conn.query(
'SELECT position FROM user_tree_edges WHERE parent_user_id = ? ORDER BY position ASC',
[rootUserId]
);
const positions = new Set(childRows.map(r => r.position));
let firstFreePosition = null;
for (let i = 1; i <= 5; i++) {
if (!positions.has(i)) { firstFreePosition = i; break; }
}
const immediateChildrenCount = childRows.length;
await conn.query(
`
INSERT INTO user_matrix_metadata
(root_user_id, ego_activated_at, last_bfs_fill_at, immediate_children_count, first_free_position)
VALUES
(?, NOW(), NULL, ?, ?)
ON DUPLICATE KEY UPDATE
ego_activated_at = COALESCE(ego_activated_at, VALUES(ego_activated_at)),
immediate_children_count = VALUES(immediate_children_count),
first_free_position = VALUES(first_free_position),
updated_at = CURRENT_TIMESTAMP
`,
[rootUserId, immediateChildrenCount, firstFreePosition]
);
const [metaRows] = await conn.query(
'SELECT ego_activated_at, last_bfs_fill_at, immediate_children_count, first_free_position FROM user_matrix_metadata WHERE root_user_id = ?',
[rootUserId]
);
const meta = metaRows[0] || {
ego_activated_at: null,
last_bfs_fill_at: null,
immediate_children_count: immediateChildrenCount,
first_free_position: firstFreePosition
};
await conn.commit();
return {
rootUserId,
egoActivatedAt: meta.ego_activated_at,
lastBfsFillAt: meta.last_bfs_fill_at,
immediateChildrenCount: meta.immediate_children_count,
firstFreePosition: meta.first_free_position
};
} catch (err) {
await conn.rollback();
throw err;
} finally {
conn.release();
}
}
async function getMatrixStats() {
const conn = await pool.getConnection();
try {
// list all matrices (ego entries)
const [matRows] = await conn.query(
`
SELECT m.root_user_id, m.name, m.is_active, m.ego_activated_at, u.email
FROM user_matrix_metadata m
JOIN users u ON u.id = m.root_user_id
ORDER BY COALESCE(m.ego_activated_at, u.created_at) ASC
`
);
const rootIds = matRows.map(r => r.root_user_id);
let countsByRoot = new Map();
if (rootIds.length) {
// counts per matrix up to depth 5
const [cntRows] = await conn.query(
`
SELECT ancestor_user_id AS root_id, COUNT(*) AS cnt
FROM user_tree_closure
WHERE depth BETWEEN 0 AND 5
AND ancestor_user_id IN (?)
GROUP BY ancestor_user_id
`,
[rootIds]
);
countsByRoot = new Map(cntRows.map(r => [Number(r.root_id), Number(r.cnt)]));
// total distinct users across all active matrices
const [activeIdsRows] = await conn.query(
`SELECT root_user_id FROM user_matrix_metadata WHERE is_active = TRUE AND root_user_id IN (?)`,
[rootIds]
);
const activeIds = activeIdsRows.map(r => r.root_user_id);
let totalDistinct = 0;
if (activeIds.length) {
const [totalDistinctRows] = await conn.query(
`
SELECT COUNT(DISTINCT descendant_user_id) AS total
FROM user_tree_closure
WHERE depth BETWEEN 0 AND 5
AND ancestor_user_id IN (?)
`,
[activeIds]
);
totalDistinct = Number(totalDistinctRows[0]?.total || 0);
}
const activeMatrices = matRows.filter(r => !!r.is_active).length;
const totalMatrices = matRows.length;
const matrices = matRows.map(r => ({
rootUserId: r.root_user_id,
name: r.name || null,
isActive: !!r.is_active,
usersCount: countsByRoot.get(Number(r.root_user_id)) || 0,
createdAt: r.ego_activated_at || null,
topNodeEmail: r.email
}));
return {
activeMatrices,
totalMatrices,
totalUsersSubscribed: totalDistinct,
matrices
};
}
// no matrices registered
return {
activeMatrices: 0,
totalMatrices: 0,
totalUsersSubscribed: 0,
matrices: []
};
} finally {
conn.release();
}
}
module.exports = {
createMatrix,
activateEgoMatrix,
getMatrixStats
};