CentralBackend/repositories/matrix/MatrixRepository.js
2025-11-17 22:11:28 +01:00

779 lines
24 KiB
JavaScript

const mysql = require('mysql2/promise');
const fs = require('fs');
const path = require('path');
const NODE_ENV = process.env.NODE_ENV || 'development';
function getSSLConfig() {
const useSSL = String(process.env.DB_SSL || '').toLowerCase() === 'true';
const caPath = process.env.DB_SSL_CA_PATH;
if (!useSSL) return undefined;
try {
if (caPath) {
const resolved = path.resolve(caPath);
if (fs.existsSync(resolved)) {
return { ca: fs.readFileSync(resolved), rejectUnauthorized: false };
}
}
} catch (_) {}
return { rejectUnauthorized: false };
}
let dbConfig;
if (NODE_ENV === 'development') {
dbConfig = {
host: process.env.DEV_DB_HOST || 'localhost',
port: Number(process.env.DEV_DB_PORT) || 3306,
user: process.env.DEV_DB_USER || 'root',
password: process.env.DEV_DB_PASSWORD || '',
database: process.env.DEV_DB_NAME || 'profitplanet_centralserver',
ssl: undefined
};
} else {
dbConfig = {
host: process.env.DB_HOST,
port: Number(process.env.DB_PORT) || 3306,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
ssl: getSSLConfig()
};
}
const pool = mysql.createPool({
...dbConfig,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
async function ensureUserExistsByEmail(conn, email) {
const [rows] = await conn.query('SELECT id, email FROM users WHERE LOWER(email) = LOWER(?) LIMIT 1', [email]);
if (!rows.length) {
const err = new Error('Top node user not found');
err.status = 404;
throw err;
}
return rows[0];
}
async function createMatrix({ name, topNodeEmail, force = false }) {
const conn = await pool.getConnection();
try {
await conn.beginTransaction();
// ensure "name" column exists on matrix_config (safe if already exists)
try {
await conn.query(`ALTER TABLE matrix_config ADD COLUMN name VARCHAR(255) NULL`);
} catch (_) {}
// ensure user_matrix_metadata has needed columns (safe if exists)
try { await conn.query(`ALTER TABLE user_matrix_metadata ADD COLUMN name VARCHAR(255) NULL`); } catch (_) {}
try { await conn.query(`ALTER TABLE user_matrix_metadata ADD COLUMN is_active BOOLEAN DEFAULT TRUE`); } catch (_) {}
// NEW: ensure max_depth column exists
try { await conn.query(`ALTER TABLE user_matrix_metadata ADD COLUMN max_depth INT NULL`); } catch (_) {}
const topUser = await ensureUserExistsByEmail(conn, topNodeEmail);
// lock matrix_config row if present
const [cfgRows] = await conn.query(
'SELECT id, master_top_user_id, name FROM matrix_config WHERE id = 1 FOR UPDATE'
);
if (!cfgRows.length) {
await conn.query(
'INSERT INTO matrix_config (id, master_top_user_id, name) VALUES (1, ?, ?)',
[topUser.id, name]
);
} else {
const current = cfgRows[0];
if (!force) {
const err = new Error('Matrix already exists. Pass force=true to overwrite.');
err.status = 409;
throw err;
}
await conn.query(
'UPDATE matrix_config SET master_top_user_id = ?, name = ?, updated_at = CURRENT_TIMESTAMP WHERE id = 1',
[topUser.id, name]
);
}
// ensure self-closure for the master top user
await conn.query(
'INSERT IGNORE INTO user_tree_closure (ancestor_user_id, descendant_user_id, depth) VALUES (?, ?, 0)',
[topUser.id, topUser.id]
);
// Also register this top user as an ego matrix entry with name and mark active
await conn.query(
`
INSERT INTO user_matrix_metadata
(root_user_id, ego_activated_at, last_bfs_fill_at, immediate_children_count, first_free_position, name, is_active, max_depth)
VALUES
(?, NOW(), NULL, 0, 1, ?, TRUE, NULL)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
is_active = TRUE,
max_depth = NULL,
ego_activated_at = COALESCE(user_matrix_metadata.ego_activated_at, VALUES(ego_activated_at)),
updated_at = CURRENT_TIMESTAMP
`,
[topUser.id, name]
);
await conn.commit();
return {
name,
masterTopUserId: topUser.id,
masterTopUserEmail: topUser.email
};
} catch (err) {
await conn.rollback();
throw err;
} finally {
conn.release();
}
}
async function ensureUserExists(conn, userId) {
const [rows] = await conn.query('SELECT id FROM users WHERE id = ?', [userId]);
if (!rows.length) {
const err = new Error('User not found');
err.status = 404;
throw err;
}
}
async function activateEgoMatrix(rootUserId) {
const conn = await pool.getConnection();
try {
await conn.beginTransaction();
await ensureUserExists(conn, rootUserId);
await conn.query(
'INSERT IGNORE INTO user_tree_closure (ancestor_user_id, descendant_user_id, depth) VALUES (?, ?, 0)',
[rootUserId, rootUserId]
);
const [childRows] = await conn.query(
'SELECT position FROM user_tree_edges WHERE parent_user_id = ? ORDER BY position ASC',
[rootUserId]
);
const positions = new Set(childRows.map(r => r.position));
let firstFreePosition = null;
for (let i = 1; i <= 5; i++) {
if (!positions.has(i)) { firstFreePosition = i; break; }
}
const immediateChildrenCount = childRows.length;
await conn.query(
`
INSERT INTO user_matrix_metadata
(root_user_id, ego_activated_at, last_bfs_fill_at, immediate_children_count, first_free_position, max_depth)
VALUES
(?, NOW(), NULL, ?, ?, 5)
ON DUPLICATE KEY UPDATE
ego_activated_at = COALESCE(ego_activated_at, VALUES(ego_activated_at)),
immediate_children_count = VALUES(immediate_children_count),
first_free_position = VALUES(first_free_position),
max_depth = 5,
updated_at = CURRENT_TIMESTAMP
`,
[rootUserId, immediateChildrenCount, firstFreePosition]
);
const [metaRows] = await conn.query(
'SELECT ego_activated_at, last_bfs_fill_at, immediate_children_count, first_free_position FROM user_matrix_metadata WHERE root_user_id = ?',
[rootUserId]
);
const meta = metaRows[0] || {
ego_activated_at: null,
last_bfs_fill_at: null,
immediate_children_count: immediateChildrenCount,
first_free_position: firstFreePosition
};
await conn.commit();
return {
rootUserId,
egoActivatedAt: meta.ego_activated_at,
lastBfsFillAt: meta.last_bfs_fill_at,
immediateChildrenCount: meta.immediate_children_count,
firstFreePosition: meta.first_free_position
};
} catch (err) {
await conn.rollback();
throw err;
} finally {
conn.release();
}
}
async function getMatrixStats() {
const conn = await pool.getConnection();
try {
// list all matrices (ego entries)
const [matRows] = await conn.query(
`
SELECT m.root_user_id, m.name, m.is_active, m.ego_activated_at, u.email
FROM user_matrix_metadata m
JOIN users u ON u.id = m.root_user_id
ORDER BY COALESCE(m.ego_activated_at, u.created_at) ASC
`
);
const rootIds = matRows.map(r => r.root_user_id);
let countsByRoot = new Map();
// fetch matrix_config (to expose a stable matrixId for frontend mapping)
let cfg = null;
try {
const [cfgRows] = await conn.query(
`SELECT id, master_top_user_id FROM matrix_config ORDER BY id ASC LIMIT 1`
);
cfg = cfgRows[0] || null;
} catch (_) {}
if (rootIds.length) {
// NEW: counts per matrix up to its configured max_depth (NULL => unlimited)
const [cntRows] = await conn.query(
`
SELECT c.ancestor_user_id AS root_id, COUNT(*) AS cnt
FROM user_tree_closure c
JOIN user_matrix_metadata m
ON m.root_user_id = c.ancestor_user_id
WHERE c.depth BETWEEN 0 AND COALESCE(m.max_depth, 2147483647)
AND c.ancestor_user_id IN (?)
GROUP BY c.ancestor_user_id
`,
[rootIds]
);
countsByRoot = new Map(cntRows.map(r => [Number(r.root_id), Number(r.cnt)]));
// NEW: total distinct users across all active matrices honoring max_depth
const [activeIdsRows] = await conn.query(
`SELECT root_user_id FROM user_matrix_metadata WHERE is_active = TRUE AND root_user_id IN (?)`,
[rootIds]
);
const activeIds = activeIdsRows.map(r => r.root_user_id);
let totalDistinct = 0;
if (activeIds.length) {
const [totalDistinctRows] = await conn.query(
`
SELECT COUNT(DISTINCT c.descendant_user_id) AS total
FROM user_tree_closure c
JOIN user_matrix_metadata m
ON m.root_user_id = c.ancestor_user_id
WHERE m.is_active = TRUE
AND c.ancestor_user_id IN (?)
AND c.depth BETWEEN 0 AND COALESCE(m.max_depth, 2147483647)
`,
[activeIds]
);
totalDistinct = Number(totalDistinctRows[0]?.total || 0);
}
const activeMatrices = matRows.filter(r => !!r.is_active).length;
const totalMatrices = matRows.length;
const matrices = matRows.map(r => ({
rootUserId: r.root_user_id,
name: r.name || null,
isActive: !!r.is_active,
usersCount: countsByRoot.get(Number(r.root_user_id)) || 0,
createdAt: r.ego_activated_at || null,
topNodeEmail: r.email,
// expose matrix_config id only for the master matrix (stable id frontend may route with)
matrixConfigId: cfg && Number(r.root_user_id) === Number(cfg.master_top_user_id) ? Number(cfg.id) : null
}));
return {
activeMatrices,
totalMatrices,
totalUsersSubscribed: totalDistinct,
matrices
};
}
// no matrices registered
return {
activeMatrices: 0,
totalMatrices: 0,
totalUsersSubscribed: 0,
matrices: []
};
} finally {
conn.release();
}
}
async function resolveRootUserId({ rootUserId, matrixId, topNodeEmail }) {
const conn = await pool.getConnection();
try {
// prefer direct numeric rootUserId
const rid = Number.parseInt(rootUserId, 10);
if (Number.isFinite(rid) && rid > 0) return rid;
// try matrix_config.id -> master_top_user_id
const mid = Number.parseInt(matrixId, 10);
if (Number.isFinite(mid) && mid > 0) {
const [rows] = await conn.query(
'SELECT master_top_user_id FROM matrix_config WHERE id = ? LIMIT 1',
[mid]
);
if (!rows.length) {
const err = new Error('Matrix not found');
err.status = 404;
throw err;
}
return Number(rows[0].master_top_user_id);
}
// try resolving by top node email
const email = (topNodeEmail || '').trim();
if (email) {
const [rows] = await conn.query(
'SELECT id FROM users WHERE LOWER(email) = LOWER(?) LIMIT 1',
[email]
);
if (!rows.length) {
const err = new Error('Top node user not found');
err.status = 404;
throw err;
}
return Number(rows[0].id);
}
const err = new Error('rootUserId required (or provide matrixId/topNodeEmail)');
err.status = 400;
throw err;
} finally {
conn.release();
}
}
// NEW: search user candidates to add to a matrix
async function getUserSearchCandidates({ q, type = 'all', rootUserId, limit = 20, offset = 0 }) {
const conn = await pool.getConnection();
try {
const whereParts = [];
const params = [];
// Exclude users already in the matrix (including the root)
whereParts.push(`u.id NOT IN (
SELECT descendant_user_id
FROM user_tree_closure
WHERE ancestor_user_id = ?
)`);
params.push(Number(rootUserId));
// Optional: exclude admins
whereParts.push(`u.role = 'user'`);
// Filter by user_type if needed
if (type === 'personal' || type === 'company') {
whereParts.push(`u.user_type = ?`);
params.push(type);
}
// Search filter (email OR personal full name OR company name)
const like = `%${q.toLowerCase()}%`;
whereParts.push(`(
LOWER(u.email) LIKE ?
OR LOWER(CONCAT(COALESCE(p.first_name, ''), ' ', COALESCE(p.last_name, ''))) LIKE ?
OR LOWER(COALESCE(c.company_name, '')) LIKE ?
)`);
params.push(like, like, like);
const whereSql = whereParts.length ? `WHERE ${whereParts.join(' AND ')}` : '';
const baseJoins = `
LEFT JOIN personal_profiles p ON p.user_id = u.id
LEFT JOIN company_profiles c ON c.user_id = u.id
`;
const countSql = `
SELECT COUNT(*) AS total
FROM users u
${baseJoins}
${whereSql}
`;
const [countRows] = await conn.query(countSql, params);
const total = Number(countRows[0]?.total || 0);
const selectSql = `
SELECT
u.id AS userId,
u.email,
u.user_type AS userType,
CASE
WHEN u.user_type = 'personal' THEN TRIM(CONCAT(COALESCE(p.first_name, ''), ' ', COALESCE(p.last_name, '')))
WHEN u.user_type = 'company' THEN COALESCE(c.company_name, '')
ELSE ''
END AS name
FROM users u
${baseJoins}
${whereSql}
ORDER BY u.created_at DESC, u.email ASC
LIMIT ? OFFSET ?
`;
const selectParams = params.concat([Number(limit), Number(offset)]);
const [rows] = await conn.query(selectSql, selectParams);
const items = rows.map(r => ({
userId: Number(r.userId),
email: r.email,
userType: r.userType,
name: r.name || ''
}));
return { total, items };
} finally {
conn.release();
}
}
// NEW: fetch matrix users (descendants) under a root
async function getMatrixUsers({ rootUserId, maxDepth = 5, limit = 100, offset = 0, includeRoot = false }) {
const conn = await pool.getConnection();
try {
const rid = Number(rootUserId);
if (!Number.isFinite(rid) || rid <= 0) {
const err = new Error('Invalid rootUserId');
err.status = 400;
throw err;
}
// NEW: load per-root depth policy
let policyDepth = null; // null => unlimited
try {
const [pRows] = await conn.query(
'SELECT max_depth FROM user_matrix_metadata WHERE root_user_id = ? LIMIT 1',
[rid]
);
if (pRows.length) {
policyDepth = pRows[0].max_depth == null ? null : Number(pRows[0].max_depth);
}
} catch (_) {}
// Requested depth sanitization
let requestedDepth = Number(maxDepth);
if (!Number.isFinite(requestedDepth) || requestedDepth < 0) requestedDepth = 0;
// Clamp to policy: NULL => unlimited; otherwise min(requested, policy)
let depthLimit = policyDepth == null ? requestedDepth : Math.min(requestedDepth, policyDepth);
// Starting depth (exclude root if includeRoot = false)
const startDepth = includeRoot ? 0 : 1;
if (startDepth > depthLimit) {
return [];
}
// Main query: descendants within depth range
const sql = `
SELECT
c.descendant_user_id AS userId,
c.depth AS depth,
u.email,
u.user_type AS userType,
u.role,
u.created_at AS createdAt,
e.parent_user_id AS parentUserId,
e.position AS position,
CASE
WHEN u.user_type = 'personal'
THEN TRIM(CONCAT(COALESCE(pp.first_name,''), ' ', COALESCE(pp.last_name,'')))
WHEN u.user_type = 'company'
THEN COALESCE(cp.company_name,'')
ELSE ''
END AS name
FROM user_tree_closure c
JOIN users u ON u.id = c.descendant_user_id
LEFT JOIN user_tree_edges e
ON e.child_user_id = c.descendant_user_id
AND e.parent_user_id != e.child_user_id
LEFT JOIN personal_profiles pp ON pp.user_id = u.id
LEFT JOIN company_profiles cp ON cp.user_id = u.id
WHERE c.ancestor_user_id = ?
AND c.depth BETWEEN ? AND ?
ORDER BY c.depth ASC, u.created_at ASC, u.id ASC
LIMIT ? OFFSET ?
`;
const params = [rid, startDepth, depthLimit, Number(limit), Number(offset)];
const [rows] = await conn.query(sql, params);
return rows.map(r => ({
userId: Number(r.userId),
email: r.email,
userType: r.userType,
role: r.role,
depth: Number(r.depth),
level: Number(r.depth),
parentUserId: r.parentUserId ? Number(r.parentUserId) : null,
position: r.position != null ? Number(r.position) : null,
name: r.name || r.email, // NEW
createdAt: r.createdAt
}));
} finally {
conn.release();
}
}
async function addUserToMatrix({
rootUserId,
matrixId,
topNodeEmail,
childUserId,
forceParentFallback = false,
parentUserId,
actorUserId
}) {
const conn = await pool.getConnection();
try {
await conn.beginTransaction();
const rid = await resolveRootUserId({ rootUserId, matrixId, topNodeEmail });
const cId = Number(childUserId);
if (!Number.isFinite(cId) || cId <= 0) {
const err = new Error('Invalid childUserId');
err.status = 400;
throw err;
}
const [childRows] = await conn.query('SELECT id FROM users WHERE id = ? LIMIT 1', [cId]);
if (!childRows.length) {
const err = new Error('Child user not found');
err.status = 404;
throw err;
}
let parentId = Number(parentUserId) > 0 ? Number(parentUserId) : null;
if (!parentId) {
const [refRows] = await conn.query(
`
SELECT t.created_by_user_id AS parent_user_id
FROM referral_token_usage u
JOIN referral_tokens t
ON t.id = u.referral_token_id -- FIX: correct column name
WHERE u.used_by_user_id = ?
ORDER BY u.id ASC
LIMIT 1
`,
[cId]
);
if (refRows.length) parentId = Number(refRows[0].parent_user_id);
}
if (!parentId) {
const err = new Error('Referral parent not found');
err.status = 404;
throw err;
}
const [parentInMatrixRows] = await conn.query(
`
SELECT depth FROM user_tree_closure
WHERE ancestor_user_id = ? AND descendant_user_id = ?
LIMIT 1
`,
[rid, parentId]
);
if (!parentInMatrixRows.length) {
if (forceParentFallback) {
parentId = rid;
} else {
const err = new Error('Referral parent not in matrix');
err.status = 409;
throw err;
}
}
const parentDepth = parentId === rid ? 0 : Number(parentInMatrixRows[0]?.depth || 0);
// NEW: enforce per-root max_depth
let maxDepthPolicy = null;
try {
const [pRows] = await conn.query(
'SELECT max_depth FROM user_matrix_metadata WHERE root_user_id = ? LIMIT 1',
[rid]
);
if (pRows.length) maxDepthPolicy = pRows[0].max_depth == null ? null : Number(pRows[0].max_depth);
} catch (_) {}
if (maxDepthPolicy != null && parentDepth >= maxDepthPolicy) {
const err = new Error(`Cannot add beyond max depth ${maxDepthPolicy}`);
err.status = 409;
throw err;
}
const [dupRows] = await conn.query(
`
SELECT 1 FROM user_tree_closure
WHERE ancestor_user_id = ? AND descendant_user_id = ?
LIMIT 1
`,
[rid, cId]
);
if (dupRows.length) {
const err = new Error('User already in matrix');
err.status = 409;
throw err;
}
// Try direct placement under chosen parent
const [childPosRows] = await conn.query(
`SELECT position FROM user_tree_edges WHERE parent_user_id = ? ORDER BY position ASC`,
[parentId]
);
const used = new Set(childPosRows.map(r => Number(r.position)));
let assignPos = null;
for (let i = 1; i <= 5; i++) {
if (!used.has(i)) {
assignPos = i;
break;
}
}
// Track the child count before insert for the final parent we will use
let parentChildCountBefore = childPosRows.length;
if (!assignPos) {
// NEW: BFS fallback within referral parent's subtree, honoring policy (if not master)
// Find the nearest descendant of parentId with <5 children and (depth_from_root + 1) <= policy when set
const [candRows] = await conn.query(
`
SELECT
pc.descendant_user_id AS candidate_id,
rc.depth AS depth_from_root
FROM user_tree_closure pc
JOIN user_tree_closure rc
ON rc.descendant_user_id = pc.descendant_user_id
AND rc.ancestor_user_id = ?
LEFT JOIN (
SELECT parent_user_id, COUNT(*) AS cnt
FROM user_tree_edges
GROUP BY parent_user_id
) ch ON ch.parent_user_id = pc.descendant_user_id
WHERE pc.ancestor_user_id = ?
AND pc.descendant_user_id NOT IN (?, ?)
AND COALESCE(ch.cnt, 0) < 5
AND (? IS NULL OR rc.depth + 1 <= ?)
ORDER BY pc.depth ASC, pc.descendant_user_id ASC
LIMIT 1
`,
[rid, parentId, parentId, cId, maxDepthPolicy, maxDepthPolicy]
);
if (!candRows.length) {
const err = new Error(
maxDepthPolicy != null
? `No free positions under referral parent within depth ${maxDepthPolicy}.`
: 'No free positions under referral parent.'
);
err.status = 409;
throw err;
}
// Re-check candidate's free slot to avoid races
const candidateParentId = Number(candRows[0].candidate_id);
const [candPosRows] = await conn.query(
`SELECT position FROM user_tree_edges WHERE parent_user_id = ? ORDER BY position ASC`,
[candidateParentId]
);
const candUsed = new Set(candPosRows.map(r => Number(r.position)));
let candAssign = null;
for (let i = 1; i <= 5; i++) {
if (!candUsed.has(i)) {
candAssign = i;
break;
}
}
if (!candAssign) {
const err = new Error('Concurrent update: candidate parent has no free positions');
err.status = 409;
throw err;
}
// Use candidate as new parent
parentId = candidateParentId;
assignPos = candAssign;
parentChildCountBefore = candPosRows.length;
}
await conn.query(
`INSERT INTO user_tree_edges (parent_user_id, child_user_id, position) VALUES (?,?,?)`,
[parentId, cId, assignPos]
);
await conn.query(
`INSERT IGNORE INTO user_tree_closure (ancestor_user_id, descendant_user_id, depth) VALUES (?, ?, 0)`,
[cId, cId]
);
// Add closure rows for all ancestors of parent (depth+1)
const [ancestorRows] = await conn.query(
`SELECT ancestor_user_id, depth FROM user_tree_closure WHERE descendant_user_id = ? ORDER BY depth ASC`,
[parentId]
);
if (ancestorRows.length) {
const values = ancestorRows
.map(r => `(${Number(r.ancestor_user_id)}, ${cId}, ${Number(r.depth) + 1})`)
.join(',');
await conn.query(
`INSERT IGNORE INTO user_tree_closure (ancestor_user_id, descendant_user_id, depth) VALUES ${values}`
);
}
// Update root metadata if parent is root
let remainingFreeSlots;
if (parentId === rid) {
const [rootChildrenRows] = await conn.query(
`SELECT position FROM user_tree_edges WHERE parent_user_id = ? ORDER BY position ASC`,
[rid]
);
const usedRoot = new Set(rootChildrenRows.map(r => Number(r.position)));
let firstFree = null;
for (let i = 1; i <= 5; i++) {
if (!usedRoot.has(i)) { firstFree = i; break; }
}
await conn.query(
`UPDATE user_matrix_metadata
SET immediate_children_count = ?,
first_free_position = ?
WHERE root_user_id = ?`,
[rootChildrenRows.length, firstFree, rid]
);
remainingFreeSlots = 5 - rootChildrenRows.length;
} else {
// NEW: compute remaining slots for the actual parent we inserted under
remainingFreeSlots = 5 - (parentChildCountBefore + 1);
}
// Optional audit log (ignore failures)
try {
await conn.query(
`
INSERT INTO user_action_logs
(action, performed_by_user_id, affected_user_id, details, created_at)
VALUES
('matrix_add_user', ?, ?, ?, NOW())
`,
[
Number(actorUserId) || null,
cId,
JSON.stringify({ rootUserId: rid, parentUserId: parentId, position: assignPos })
]
);
} catch (_) {}
await conn.commit();
return { rootUserId: rid, parentUserId, childUserId: cId, position: assignPos, remainingFreeSlots };
} catch (err) {
try { await conn.rollback(); } catch (_) {}
throw err;
} finally {
conn.release();
}
}
module.exports = {
createMatrix,
activateEgoMatrix,
getMatrixStats,
resolveRootUserId,
getUserSearchCandidates,
getMatrixUsers,
addUserToMatrix
};