const sqlite3 = require("sqlite3").verbose(); const { open } = require("sqlite"); const path = require("path"); const logger = require("./logger"); class Database { constructor() { this.db = null; this.dbPath = path.join(__dirname, "..", "data", "outreach.db"); } async init() { try { // Create data directory if it doesn't exist const fs = require("fs"); const dataDir = path.dirname(this.dbPath); if (!fs.existsSync(dataDir)) { fs.mkdirSync(dataDir, { recursive: true }); } // Open database connection this.db = await open({ filename: this.dbPath, driver: sqlite3.Database, }); logger.info("Database connected", { dbPath: this.dbPath }); // Create tables await this.createTables(); return this.db; } catch (error) { logger.error("Database initialization failed", { error: error.message }); throw error; } } async createTables() { const tables = [ // Firms table `CREATE TABLE IF NOT EXISTS firms ( id INTEGER PRIMARY KEY AUTOINCREMENT, firm_name TEXT NOT NULL, location TEXT, website TEXT, contact_email TEXT NOT NULL, state TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE(contact_email) )`, // Email campaigns table `CREATE TABLE IF NOT EXISTS campaigns ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, subject TEXT NOT NULL, template_name TEXT NOT NULL, test_mode BOOLEAN DEFAULT 0, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, started_at DATETIME, completed_at DATETIME, total_emails INTEGER DEFAULT 0, sent_emails INTEGER DEFAULT 0, failed_emails INTEGER DEFAULT 0 )`, // Email sends table `CREATE TABLE IF NOT EXISTS email_sends ( id INTEGER PRIMARY KEY AUTOINCREMENT, campaign_id INTEGER, firm_id INTEGER, recipient_email TEXT NOT NULL, subject TEXT NOT NULL, status TEXT NOT NULL, -- 'sent', 'failed', 'retry', 'permanent_failure' error_type TEXT, error_message TEXT, retry_count INTEGER DEFAULT 0, tracking_id TEXT UNIQUE, -- Add tracking ID for email tracking sent_at DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (campaign_id) REFERENCES campaigns (id), FOREIGN KEY (firm_id) REFERENCES firms (id) )`, // Tracking events table for email opens and clicks `CREATE TABLE IF NOT EXISTS tracking_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, tracking_id TEXT NOT NULL, event_type TEXT NOT NULL, -- 'open', 'click' event_data TEXT, -- JSON data (link_id, target_url, etc.) ip_address TEXT, user_agent TEXT, referer TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (tracking_id) REFERENCES email_sends (tracking_id) )`, // Create indexes for better performance `CREATE INDEX IF NOT EXISTS idx_firms_email ON firms(contact_email)`, `CREATE INDEX IF NOT EXISTS idx_firms_state ON firms(state)`, `CREATE INDEX IF NOT EXISTS idx_email_sends_campaign ON email_sends(campaign_id)`, `CREATE INDEX IF NOT EXISTS idx_email_sends_status ON email_sends(status)`, `CREATE INDEX IF NOT EXISTS idx_email_sends_tracking ON email_sends(tracking_id)`, `CREATE INDEX IF NOT EXISTS idx_tracking_events_tracking_id ON tracking_events(tracking_id)`, `CREATE INDEX IF NOT EXISTS idx_tracking_events_type ON tracking_events(event_type)`, ]; for (const table of tables) { await this.db.exec(table); } logger.info("Database tables created/verified"); } // Firm operations async insertFirm(firmData) { const { firmName, location, website, contactEmail, state } = firmData; try { const result = await this.db.run( `INSERT OR IGNORE INTO firms (firm_name, location, website, contact_email, state) VALUES (?, ?, ?, ?, ?)`, [firmName, location, website, contactEmail, state] ); return result.lastID; } catch (error) { logger.error("Failed to insert firm", { firmData, error: error.message }); throw error; } } async insertFirmsBatch(firms) { const stmt = await this.db.prepare( `INSERT OR IGNORE INTO firms (firm_name, location, website, contact_email, state) VALUES (?, ?, ?, ?, ?)` ); let inserted = 0; for (const firm of firms) { try { const result = await stmt.run([ firm.firmName, firm.location, firm.website, firm.contactEmail || firm.email, firm.state, ]); if (result.changes > 0) inserted++; } catch (error) { logger.warn("Failed to insert firm", { firm: firm.firmName, error: error.message, }); } } await stmt.finalize(); logger.info(`Inserted ${inserted} firms into database`); return inserted; } async getFirms(limit = null, offset = 0) { const sql = limit ? `SELECT * FROM firms ORDER BY id LIMIT ? OFFSET ?` : `SELECT * FROM firms ORDER BY id`; const params = limit ? [limit, offset] : []; return await this.db.all(sql, params); } async getFirmByEmail(email) { return await this.db.get("SELECT * FROM firms WHERE contact_email = ?", [ email, ]); } async getFirmById(id) { return await this.db.get("SELECT * FROM firms WHERE id = ?", [id]); } async removeDuplicateFirms() { // Remove duplicates keeping the first occurrence const result = await this.db.run(` DELETE FROM firms WHERE id NOT IN ( SELECT MIN(id) FROM firms GROUP BY contact_email ) `); logger.info(`Removed ${result.changes} duplicate firms`); return result.changes; } // Campaign operations async createCampaign(campaignData) { const { name, subject, templateName, testMode } = campaignData; const result = await this.db.run( `INSERT INTO campaigns (name, subject, template_name, test_mode) VALUES (?, ?, ?, ?)`, [name, subject, templateName, testMode ? 1 : 0] ); logger.info("Campaign created", { campaignId: result.lastID, name }); return result.lastID; } async startCampaign(campaignId, totalEmails) { await this.db.run( `UPDATE campaigns SET started_at = CURRENT_TIMESTAMP, total_emails = ? WHERE id = ?`, [totalEmails, campaignId] ); } async completeCampaign(campaignId, stats) { await this.db.run( `UPDATE campaigns SET completed_at = CURRENT_TIMESTAMP, sent_emails = ?, failed_emails = ? WHERE id = ?`, [stats.sent, stats.failed, campaignId] ); } // Email send tracking async logEmailSend(emailData) { const { campaignId, firmId, recipientEmail, subject, status, errorType, errorMessage, retryCount, trackingId, } = emailData; const result = await this.db.run( `INSERT INTO email_sends (campaign_id, firm_id, recipient_email, subject, status, error_type, error_message, retry_count, tracking_id, sent_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, [ campaignId, firmId, recipientEmail, subject, status, errorType, errorMessage, retryCount, trackingId, status === "sent" ? new Date().toISOString() : null, ] ); return result.lastID; } async updateEmailSendStatus( emailSendId, status, errorType = null, errorMessage = null ) { await this.db.run( `UPDATE email_sends SET status = ?, error_type = ?, error_message = ?, sent_at = CASE WHEN ? = 'sent' THEN CURRENT_TIMESTAMP ELSE sent_at END WHERE id = ?`, [status, errorType, errorMessage, status, emailSendId] ); } // Statistics and reporting async getCampaignStats(campaignId) { const stats = await this.db.all( ` SELECT status, COUNT(*) as count FROM email_sends WHERE campaign_id = ? GROUP BY status `, [campaignId] ); const result = { sent: 0, failed: 0, retry: 0, permanent_failure: 0, }; stats.forEach((stat) => { result[stat.status] = stat.count; }); return result; } async getFailedEmails(campaignId) { return await this.db.all( ` SELECT es.*, f.firm_name, f.contact_email FROM email_sends es JOIN firms f ON es.firm_id = f.id WHERE es.campaign_id = ? AND es.status IN ('failed', 'permanent_failure') ORDER BY es.created_at DESC `, [campaignId] ); } async close() { if (this.db) { await this.db.close(); this.db = null; logger.info("Database connection closed"); } } // Utility methods async getTableCounts() { const counts = {}; const tables = ["firms", "campaigns", "email_sends", "tracking_events"]; for (const table of tables) { const result = await this.db.get( `SELECT COUNT(*) as count FROM ${table}` ); counts[table] = result.count; } return counts; } // Tracking events methods async storeTrackingEvent(trackingId, eventType, eventData = {}) { const query = ` INSERT INTO tracking_events (tracking_id, event_type, event_data, ip_address, user_agent, referer) VALUES (?, ?, ?, ?, ?, ?) `; await this.db.run(query, [ trackingId, eventType, JSON.stringify(eventData), eventData.ip || null, eventData.userAgent || null, eventData.referer || null, ]); } async getTrackingEvents(trackingId) { const query = ` SELECT * FROM tracking_events WHERE tracking_id = ? ORDER BY created_at ASC `; const events = await this.db.all(query, [trackingId]); return events.map((event) => ({ ...event, event_data: JSON.parse(event.event_data || "{}"), })); } async getTrackingStats(campaignId) { const query = ` SELECT es.tracking_id, es.recipient_email, COUNT(CASE WHEN te.event_type = 'open' THEN 1 END) as opens, COUNT(CASE WHEN te.event_type = 'click' THEN 1 END) as clicks, MIN(CASE WHEN te.event_type = 'open' THEN te.created_at END) as first_open, MIN(CASE WHEN te.event_type = 'click' THEN te.created_at END) as first_click FROM email_sends es LEFT JOIN tracking_events te ON es.tracking_id = te.tracking_id WHERE es.campaign_id = ? AND es.status = 'sent' GROUP BY es.tracking_id, es.recipient_email ORDER BY es.sent_at ASC `; return await this.db.all(query, [campaignId]); } } module.exports = new Database();