const mysql = require('mysql2/promise'); const fs = require('fs'); const path = require('path'); const NODE_ENV = process.env.NODE_ENV || 'development'; function getSSLConfig() { const useSSL = String(process.env.DB_SSL || '').toLowerCase() === 'true'; const caPath = process.env.DB_SSL_CA_PATH; if (!useSSL) return undefined; try { if (caPath) { const resolved = path.resolve(caPath); if (fs.existsSync(resolved)) { return { ca: fs.readFileSync(resolved), rejectUnauthorized: false }; } } } catch (_) {} return { rejectUnauthorized: false }; } let dbConfig; if (NODE_ENV === 'development') { dbConfig = { host: process.env.DEV_DB_HOST || 'localhost', port: Number(process.env.DEV_DB_PORT) || 3306, user: process.env.DEV_DB_USER || 'root', password: process.env.DEV_DB_PASSWORD || '', database: process.env.DEV_DB_NAME || 'profitplanet_centralserver', ssl: undefined }; } else { dbConfig = { host: process.env.DB_HOST, port: Number(process.env.DB_PORT) || 3306, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, ssl: getSSLConfig() }; } const pool = mysql.createPool({ ...dbConfig, waitForConnections: true, connectionLimit: 10, queueLimit: 0 }); async function ensureUserExistsByEmail(conn, email) { const [rows] = await conn.query('SELECT id, email FROM users WHERE LOWER(email) = LOWER(?) LIMIT 1', [email]); if (!rows.length) { const err = new Error('Top node user not found'); err.status = 404; throw err; } return rows[0]; } async function createMatrix({ name, topNodeEmail, force = false }) { const conn = await pool.getConnection(); try { await conn.beginTransaction(); // ensure "name" column exists on matrix_config (safe if already exists) try { await conn.query(`ALTER TABLE matrix_config ADD COLUMN name VARCHAR(255) NULL`); } catch (_) {} // ensure user_matrix_metadata has needed columns (safe if exists) try { await conn.query(`ALTER TABLE user_matrix_metadata ADD COLUMN name VARCHAR(255) NULL`); } catch (_) {} try { await conn.query(`ALTER TABLE user_matrix_metadata ADD COLUMN is_active BOOLEAN DEFAULT TRUE`); } catch (_) {} const topUser = await ensureUserExistsByEmail(conn, topNodeEmail); // lock matrix_config row if present const [cfgRows] = await conn.query( 'SELECT id, master_top_user_id, name FROM matrix_config WHERE id = 1 FOR UPDATE' ); if (!cfgRows.length) { await conn.query( 'INSERT INTO matrix_config (id, master_top_user_id, name) VALUES (1, ?, ?)', [topUser.id, name] ); } else { const current = cfgRows[0]; if (!force) { const err = new Error('Matrix already exists. Pass force=true to overwrite.'); err.status = 409; throw err; } await conn.query( 'UPDATE matrix_config SET master_top_user_id = ?, name = ?, updated_at = CURRENT_TIMESTAMP WHERE id = 1', [topUser.id, name] ); } // ensure self-closure for the master top user await conn.query( 'INSERT IGNORE INTO user_tree_closure (ancestor_user_id, descendant_user_id, depth) VALUES (?, ?, 0)', [topUser.id, topUser.id] ); // Also register this top user as an ego matrix entry with name and mark active await conn.query( ` INSERT INTO user_matrix_metadata (root_user_id, ego_activated_at, last_bfs_fill_at, immediate_children_count, first_free_position, name, is_active) VALUES (?, NOW(), NULL, 0, 1, ?, TRUE) ON DUPLICATE KEY UPDATE name = VALUES(name), is_active = TRUE, ego_activated_at = COALESCE(user_matrix_metadata.ego_activated_at, VALUES(ego_activated_at)), updated_at = CURRENT_TIMESTAMP `, [topUser.id, name] ); await conn.commit(); return { name, masterTopUserId: topUser.id, masterTopUserEmail: topUser.email }; } catch (err) { await conn.rollback(); throw err; } finally { conn.release(); } } async function ensureUserExists(conn, userId) { const [rows] = await conn.query('SELECT id FROM users WHERE id = ?', [userId]); if (!rows.length) { const err = new Error('User not found'); err.status = 404; throw err; } } async function activateEgoMatrix(rootUserId) { const conn = await pool.getConnection(); try { await conn.beginTransaction(); await ensureUserExists(conn, rootUserId); await conn.query( 'INSERT IGNORE INTO user_tree_closure (ancestor_user_id, descendant_user_id, depth) VALUES (?, ?, 0)', [rootUserId, rootUserId] ); const [childRows] = await conn.query( 'SELECT position FROM user_tree_edges WHERE parent_user_id = ? ORDER BY position ASC', [rootUserId] ); const positions = new Set(childRows.map(r => r.position)); let firstFreePosition = null; for (let i = 1; i <= 5; i++) { if (!positions.has(i)) { firstFreePosition = i; break; } } const immediateChildrenCount = childRows.length; await conn.query( ` INSERT INTO user_matrix_metadata (root_user_id, ego_activated_at, last_bfs_fill_at, immediate_children_count, first_free_position) VALUES (?, NOW(), NULL, ?, ?) ON DUPLICATE KEY UPDATE ego_activated_at = COALESCE(ego_activated_at, VALUES(ego_activated_at)), immediate_children_count = VALUES(immediate_children_count), first_free_position = VALUES(first_free_position), updated_at = CURRENT_TIMESTAMP `, [rootUserId, immediateChildrenCount, firstFreePosition] ); const [metaRows] = await conn.query( 'SELECT ego_activated_at, last_bfs_fill_at, immediate_children_count, first_free_position FROM user_matrix_metadata WHERE root_user_id = ?', [rootUserId] ); const meta = metaRows[0] || { ego_activated_at: null, last_bfs_fill_at: null, immediate_children_count: immediateChildrenCount, first_free_position: firstFreePosition }; await conn.commit(); return { rootUserId, egoActivatedAt: meta.ego_activated_at, lastBfsFillAt: meta.last_bfs_fill_at, immediateChildrenCount: meta.immediate_children_count, firstFreePosition: meta.first_free_position }; } catch (err) { await conn.rollback(); throw err; } finally { conn.release(); } } async function getMatrixStats() { const conn = await pool.getConnection(); try { // list all matrices (ego entries) const [matRows] = await conn.query( ` SELECT m.root_user_id, m.name, m.is_active, m.ego_activated_at, u.email FROM user_matrix_metadata m JOIN users u ON u.id = m.root_user_id ORDER BY COALESCE(m.ego_activated_at, u.created_at) ASC ` ); const rootIds = matRows.map(r => r.root_user_id); let countsByRoot = new Map(); if (rootIds.length) { // counts per matrix up to depth 5 const [cntRows] = await conn.query( ` SELECT ancestor_user_id AS root_id, COUNT(*) AS cnt FROM user_tree_closure WHERE depth BETWEEN 0 AND 5 AND ancestor_user_id IN (?) GROUP BY ancestor_user_id `, [rootIds] ); countsByRoot = new Map(cntRows.map(r => [Number(r.root_id), Number(r.cnt)])); // total distinct users across all active matrices const [activeIdsRows] = await conn.query( `SELECT root_user_id FROM user_matrix_metadata WHERE is_active = TRUE AND root_user_id IN (?)`, [rootIds] ); const activeIds = activeIdsRows.map(r => r.root_user_id); let totalDistinct = 0; if (activeIds.length) { const [totalDistinctRows] = await conn.query( ` SELECT COUNT(DISTINCT descendant_user_id) AS total FROM user_tree_closure WHERE depth BETWEEN 0 AND 5 AND ancestor_user_id IN (?) `, [activeIds] ); totalDistinct = Number(totalDistinctRows[0]?.total || 0); } const activeMatrices = matRows.filter(r => !!r.is_active).length; const totalMatrices = matRows.length; const matrices = matRows.map(r => ({ rootUserId: r.root_user_id, name: r.name || null, isActive: !!r.is_active, usersCount: countsByRoot.get(Number(r.root_user_id)) || 0, createdAt: r.ego_activated_at || null, topNodeEmail: r.email })); return { activeMatrices, totalMatrices, totalUsersSubscribed: totalDistinct, matrices }; } // no matrices registered return { activeMatrices: 0, totalMatrices: 0, totalUsersSubscribed: 0, matrices: [] }; } finally { conn.release(); } } module.exports = { createMatrix, activateEgoMatrix, getMatrixStats };