const pool = require('../../database/database'); const Abonemment = require('../../models/Abonemment'); class AbonemmentRepository { // NEW: cache table columns once per process static _columnsCache = null; // NEW: load columns for coffee_abonements async loadColumns() { if (AbonemmentRepository._columnsCache) return AbonemmentRepository._columnsCache; const [rows] = await pool.query( `SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'coffee_abonements'` ); const set = new Set(rows.map(r => r.COLUMN_NAME)); AbonemmentRepository._columnsCache = set; return set; } // NEW: helpers to include fields conditionally async hasColumn(name) { const cols = await this.loadColumns(); return cols.has(name); } async createAbonement(referredBy, snapshot) { const conn = await pool.getConnection(); try { await conn.beginTransaction(); // NEW: dynamically assemble column list const cols = [ 'pack_group','status','started_at','ended_at','next_billing_at','billing_interval','interval_count', 'price','currency','is_auto_renew','notes','pack_breakdown','first_name','last_name','email','street', 'postal_code','city','country','frequency','referred_by' ]; const vals = [ snapshot.pack_group || '', snapshot.status || 'active', snapshot.started_at, snapshot.ended_at || null, snapshot.next_billing_at || null, snapshot.billing_interval, snapshot.interval_count, snapshot.price, snapshot.currency, snapshot.is_auto_renew ? 1 : 0, snapshot.notes || null, snapshot.pack_breakdown ? JSON.stringify(snapshot.pack_breakdown) : null, snapshot.first_name || null, snapshot.last_name || null, snapshot.email || null, snapshot.street || null, snapshot.postal_code || null, snapshot.city || null, snapshot.country || null, snapshot.frequency || null, referredBy || null, ]; console.log('[CREATE ABONEMENT] Incoming snapshot user linking:', { actor_user_id: snapshot.actor_user_id, user_id: snapshot.user_id, purchaser_user_id: snapshot.purchaser_user_id, referred_by: referredBy, }); // NEW: optionally include user_id and purchaser_user_id if (await this.hasColumn('user_id')) { cols.push('user_id'); vals.push(snapshot.user_id ?? null); } if (await this.hasColumn('purchaser_user_id')) { cols.push('purchaser_user_id'); vals.push(snapshot.purchaser_user_id ?? null); } const placeholders = cols.map(() => '?').join(', '); console.log('[CREATE ABONEMENT] Final columns:', cols); console.log('[CREATE ABONEMENT] Final values preview:', { pack_group: vals[0], status: vals[1], user_id: cols.includes('user_id') ? vals[cols.indexOf('user_id')] : undefined, purchaser_user_id: cols.includes('purchaser_user_id') ? vals[cols.indexOf('purchaser_user_id')] : undefined, }); const [res] = await conn.query( `INSERT INTO coffee_abonements (${cols.join(', ')}, created_at, updated_at) VALUES (${placeholders}, NOW(), NOW())`, vals ); const abonementId = res.insertId; console.log('[CREATE ABONEMENT] Inserted abonement ID:', abonementId); const historyDetails = { ...(snapshot.details || {}), pack_group: snapshot.pack_group }; await conn.query( `INSERT INTO coffee_abonement_history (abonement_id, event_type, event_at, actor_user_id, details, created_at) VALUES (?, ?, ?, ?, ?, NOW())`, [ abonementId, 'created', snapshot.started_at, referredBy || snapshot.actor_user_id || null, JSON.stringify(historyDetails), ], ); await conn.commit(); const created = await this.getAbonementById(abonementId); console.log('[CREATE ABONEMENT] Loaded abonement row after insert:', { id: created?.id, user_id: created?.user_id, purchaser_user_id: created?.purchaser_user_id, pack_group: created?.pack_group, status: created?.status, }); return created; } catch (err) { await conn.rollback(); throw err; } finally { conn.release(); } } async getAbonementById(id) { const [rows] = await pool.query(`SELECT * FROM coffee_abonements WHERE id = ?`, [id]); return rows[0] ? new Abonemment(rows[0]) : null; } async listByUser(userId, { status, limit = 50, offset = 0 } = {}) { const params = [userId]; let sql = `SELECT * FROM coffee_abonements WHERE user_id = ?`; if (status) { sql += ` AND status = ?`; params.push(status); } sql += ` ORDER BY created_at DESC LIMIT ? OFFSET ?`; params.push(Number(limit), Number(offset)); const [rows] = await pool.query(sql, params); return rows.map((r) => new Abonemment(r)); } async updateStatus(id, newStatus, { ended_at, updated_at = new Date() } = {}) { await pool.query( `UPDATE coffee_abonements SET status = ?, ended_at = ?, updated_at = ? WHERE id = ?`, [newStatus, ended_at || null, updated_at, id], ); return this.getAbonementById(id); } async updateBilling(id, next_billing_at, updated_at = new Date()) { await pool.query( `UPDATE coffee_abonements SET next_billing_at = ?, updated_at = ? WHERE id = ?`, [next_billing_at, updated_at, id], ); return this.getAbonementById(id); } async appendHistory(abonementId, eventType, actorUserId, details = {}, eventAt = new Date()) { await pool.query( `INSERT INTO coffee_abonement_history (abonement_id, event_type, event_at, actor_user_id, details, created_at) VALUES (?, ?, ?, ?, ?, NOW())`, [abonementId, eventType, eventAt, actorUserId || null, JSON.stringify(details || {})], ); } async transitionStatus(id, newStatus, historyPayload = {}) { const conn = await pool.getConnection(); try { await conn.beginTransaction(); await conn.query( `UPDATE coffee_abonements SET status = ?, ended_at = ?, updated_at = ? WHERE id = ?`, [ newStatus, historyPayload.ended_at || null, historyPayload.updated_at || new Date(), id, ], ); await conn.query( `INSERT INTO coffee_abonement_history (abonement_id, event_type, event_at, actor_user_id, details, created_at) VALUES (?, ?, ?, ?, ?, NOW())`, [ id, historyPayload.event_type || 'updated', historyPayload.event_at || new Date(), historyPayload.actor_user_id || null, JSON.stringify(historyPayload.details || {}), ], ); await conn.commit(); } catch (err) { await conn.rollback(); throw err; } finally { conn.release(); } return this.getAbonementById(id); } async transitionBilling(id, nextBillingAt, historyPayload = {}) { const conn = await pool.getConnection(); try { await conn.beginTransaction(); await conn.query( `UPDATE coffee_abonements SET next_billing_at = ?, updated_at = ? WHERE id = ?`, [nextBillingAt, historyPayload.updated_at || new Date(), id], ); await conn.query( `INSERT INTO coffee_abonement_history (abonement_id, event_type, event_at, actor_user_id, details, created_at) VALUES (?, ?, ?, ?, ?, NOW())`, [ id, historyPayload.event_type || 'renewed', historyPayload.event_at || new Date(), historyPayload.actor_user_id || null, JSON.stringify(historyPayload.details || {}), ], ); await conn.commit(); } catch (err) { await conn.rollback(); throw err; } finally { conn.release(); } return this.getAbonementById(id); } async listDueForBilling(now) { const [rows] = await pool.query( `SELECT * FROM coffee_abonements WHERE status = 'active' AND is_auto_renew = 1 AND next_billing_at IS NOT NULL AND next_billing_at <= ? ORDER BY next_billing_at ASC`, [now], ); return rows.map((r) => new Abonemment(r)); } async listActiveByProduct(productId) { const [rows] = await pool.query( `SELECT * FROM coffee_abonements WHERE coffee_table_id = ? AND status = 'active'`, [productId], ); return rows.map((r) => new Abonemment(r)); } async listHistory(abonementId) { const [rows] = await pool.query( `SELECT * FROM coffee_abonement_history WHERE abonement_id = ? ORDER BY event_at ASC, id ASC`, [abonementId], ); return rows.map((r) => ({ id: r.id, abonement_id: r.abonement_id, event_type: r.event_type, event_at: r.event_at, actor_user_id: r.actor_user_id, details: r.details ? JSON.parse(r.details) : {}, created_at: r.created_at, })); } async findActiveOrPausedByUserAndProduct(userId, packGroup) { const normalizedUserId = userId ?? null; const normalizedPackGroup = packGroup || ''; const [rows] = await pool.query( `SELECT * FROM coffee_abonements WHERE pack_group = ? AND user_id <=> ? AND status IN ('active','paused') ORDER BY created_at DESC LIMIT 1`, [normalizedPackGroup, normalizedUserId], ); return rows[0] ? new Abonemment(rows[0]) : null; } async updateExistingAbonementForSubscribe(id, snapshot) { const conn = await pool.getConnection(); try { await conn.beginTransaction(); // NEW: detect optional purchaser_user_id column const hasPurchaser = await this.hasColumn('purchaser_user_id'); // Build SET clause dynamically const sets = [ `status = 'active'`, `pack_group = ?`, `started_at = IFNULL(started_at, ?)`, `next_billing_at = ?`, `billing_interval = ?`, `interval_count = ?`, `price = ?`, `currency = ?`, `is_auto_renew = ?`, `notes = ?`, `recipient_email = ?`, `pack_breakdown = ?`, `updated_at = NOW()`, ]; const params = [ snapshot.pack_group || '', snapshot.started_at, snapshot.next_billing_at, snapshot.billing_interval, snapshot.interval_count, snapshot.price, snapshot.currency, snapshot.is_auto_renew ? 1 : 0, snapshot.notes || null, snapshot.recipient_email || null, snapshot.pack_breakdown ? JSON.stringify(snapshot.pack_breakdown) : null, ]; if (hasPurchaser) { sets.splice(11, 0, `purchaser_user_id = IFNULL(purchaser_user_id, ?)`); // before pack_breakdown params.splice(11, 0, snapshot.purchaser_user_id ?? null); } await conn.query( `UPDATE coffee_abonements SET ${sets.join(', ')} WHERE id = ?`, [...params, id], ); const historyDetails = { ...(snapshot.details || {}), reused_existing: true, pack_group: snapshot.pack_group }; if (snapshot.recipient) historyDetails.recipient = snapshot.recipient; await conn.query( `INSERT INTO coffee_abonement_history (abonement_id, event_type, event_at, actor_user_id, details, created_at) VALUES (?, ?, ?, ?, ?, NOW())`, [id, 'merged_subscribe', snapshot.started_at, snapshot.actor_user_id || null, JSON.stringify(historyDetails)], ); await conn.commit(); return this.getAbonementById(id); } catch (err) { await conn.rollback(); throw err; } finally { conn.release(); } } // Helper: attempt insert into alias table if it exists async tryAliasInsert(email, abonementId, createdByUserId) { try { await pool.query( `INSERT INTO no_user_aboo_mails (email, abonement_id, status, source, created_by_user_id, created_at, updated_at) VALUES (?, ?, 'pending', 'subscribe', ?, NOW(), NOW()) ON DUPLICATE KEY UPDATE status = 'pending', source = 'subscribe', created_by_user_id = VALUES(created_by_user_id), updated_at = NOW()`, [email, abonementId, createdByUserId || null], ); } catch (e) { if (!(e && e.code === 'ER_NO_SUCH_TABLE')) throw e; } } // Insert/update pending ownership by email (primary table), with alias fallback async upsertNoUserAboMail(email, abonementId, createdByUserId) { const normalized = typeof email === 'string' ? email.trim().toLowerCase() : null; console.log('[UPSERT NO USER ABO MAIL] Normalized email:', normalized); // NEW console.log('[UPSERT NO USER ABO MAIL] Abonement ID:', abonementId); // NEW console.log('[UPSERT NO USER ABO MAIL] Created by user ID:', createdByUserId); // NEW if (!normalized) { console.log('[UPSERT NO USER ABO MAIL] Skipping due to invalid email'); // NEW return; } try { await pool.query( `INSERT INTO no_user_abo_mails (email, abonement_id, status, source, created_by_user_id, created_at, updated_at) VALUES (?, ?, 'pending', 'subscribe', ?, NOW(), NOW()) ON DUPLICATE KEY UPDATE status = 'pending', source = 'subscribe', created_by_user_id = VALUES(created_by_user_id), updated_at = NOW()`, [normalized, abonementId, createdByUserId || null], ); console.log('[UPSERT NO USER ABO MAIL] Successfully inserted/updated record'); // NEW } catch (err) { console.error('[UPSERT NO USER ABO MAIL] Error inserting/updating record:', err); // NEW throw err; } } // Fetch pending ownership rows for an email async findPendingNoUserAboMailsByEmail(email) { const normalized = typeof email === 'string' ? email.trim().toLowerCase() : null; console.log('[FIND PENDING NO USER ABO MAILS] Normalized email:', normalized); // NEW if (!normalized) { console.log('[FIND PENDING NO USER ABO MAILS] Skipping due to invalid email'); // NEW return []; } const [rows] = await pool.query( `SELECT * FROM no_user_abo_mails WHERE email = ? AND status = 'pending'`, [normalized], ); console.log('[FIND PENDING NO USER ABO MAILS] Found rows:', rows); // NEW return rows || []; } // Mark a pending ownership record as linked async markNoUserAboMailLinked(id) { console.log('[MARK NO USER ABO MAIL LINKED] Marking record as linked for ID:', id); // NEW await pool.query(`UPDATE no_user_abo_mails SET status = 'linked', updated_at = NOW() WHERE id = ?`, [id]); console.log('[MARK NO USER ABO MAIL LINKED] Successfully marked record as linked'); // NEW } // Link abonement to a user (on registration) async linkAbonementToUser(abonementId, userId) { await pool.query(`UPDATE coffee_abonements SET user_id = ?, updated_at = NOW() WHERE id = ?`, [userId, abonementId]); } async findByReferredByAndEmail(referredBy, email) { const [rows] = await pool.query( `SELECT * FROM coffee_abonements WHERE referred_by = ? AND email = ? ORDER BY created_at DESC`, [referredBy, email], ); return rows.map((row) => new Abonemment(row)); } } module.exports = AbonemmentRepository;