outreach/lib/database.js
2025-08-15 01:03:38 -08:00

399 lines
11 KiB
JavaScript

const sqlite3 = require("sqlite3").verbose();
const { open } = require("sqlite");
const path = require("path");
const logger = require("./logger");
class Database {
constructor() {
this.db = null;
this.dbPath = path.join(__dirname, "..", "data", "outreach.db");
}
async init() {
try {
// Create data directory if it doesn't exist
const fs = require("fs");
const dataDir = path.dirname(this.dbPath);
if (!fs.existsSync(dataDir)) {
fs.mkdirSync(dataDir, { recursive: true });
}
// Open database connection
this.db = await open({
filename: this.dbPath,
driver: sqlite3.Database,
});
logger.info("Database connected", { dbPath: this.dbPath });
// Create tables
await this.createTables();
return this.db;
} catch (error) {
logger.error("Database initialization failed", { error: error.message });
throw error;
}
}
async createTables() {
const tables = [
// Firms table
`CREATE TABLE IF NOT EXISTS firms (
id INTEGER PRIMARY KEY AUTOINCREMENT,
firm_name TEXT NOT NULL,
location TEXT,
website TEXT,
contact_email TEXT NOT NULL,
state TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(contact_email)
)`,
// Email campaigns table
`CREATE TABLE IF NOT EXISTS campaigns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
subject TEXT NOT NULL,
template_name TEXT NOT NULL,
test_mode BOOLEAN DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
started_at DATETIME,
completed_at DATETIME,
total_emails INTEGER DEFAULT 0,
sent_emails INTEGER DEFAULT 0,
failed_emails INTEGER DEFAULT 0
)`,
// Email sends table
`CREATE TABLE IF NOT EXISTS email_sends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
campaign_id INTEGER,
firm_id INTEGER,
recipient_email TEXT NOT NULL,
subject TEXT NOT NULL,
status TEXT NOT NULL, -- 'sent', 'failed', 'retry', 'permanent_failure'
error_type TEXT,
error_message TEXT,
retry_count INTEGER DEFAULT 0,
tracking_id TEXT UNIQUE, -- Add tracking ID for email tracking
sent_at DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (campaign_id) REFERENCES campaigns (id),
FOREIGN KEY (firm_id) REFERENCES firms (id)
)`,
// Tracking events table for email opens and clicks
`CREATE TABLE IF NOT EXISTS tracking_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tracking_id TEXT NOT NULL,
event_type TEXT NOT NULL, -- 'open', 'click'
event_data TEXT, -- JSON data (link_id, target_url, etc.)
ip_address TEXT,
user_agent TEXT,
referer TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (tracking_id) REFERENCES email_sends (tracking_id)
)`,
// Create indexes for better performance
`CREATE INDEX IF NOT EXISTS idx_firms_email ON firms(contact_email)`,
`CREATE INDEX IF NOT EXISTS idx_firms_state ON firms(state)`,
`CREATE INDEX IF NOT EXISTS idx_email_sends_campaign ON email_sends(campaign_id)`,
`CREATE INDEX IF NOT EXISTS idx_email_sends_status ON email_sends(status)`,
`CREATE INDEX IF NOT EXISTS idx_email_sends_tracking ON email_sends(tracking_id)`,
`CREATE INDEX IF NOT EXISTS idx_tracking_events_tracking_id ON tracking_events(tracking_id)`,
`CREATE INDEX IF NOT EXISTS idx_tracking_events_type ON tracking_events(event_type)`,
];
for (const table of tables) {
await this.db.exec(table);
}
logger.info("Database tables created/verified");
}
// Firm operations
async insertFirm(firmData) {
const { firmName, location, website, contactEmail, state } = firmData;
try {
const result = await this.db.run(
`INSERT OR IGNORE INTO firms (firm_name, location, website, contact_email, state)
VALUES (?, ?, ?, ?, ?)`,
[firmName, location, website, contactEmail, state]
);
return result.lastID;
} catch (error) {
logger.error("Failed to insert firm", { firmData, error: error.message });
throw error;
}
}
async insertFirmsBatch(firms) {
const stmt = await this.db.prepare(
`INSERT OR IGNORE INTO firms (firm_name, location, website, contact_email, state)
VALUES (?, ?, ?, ?, ?)`
);
let inserted = 0;
for (const firm of firms) {
try {
const result = await stmt.run([
firm.firmName,
firm.location,
firm.website,
firm.contactEmail || firm.email,
firm.state,
]);
if (result.changes > 0) inserted++;
} catch (error) {
logger.warn("Failed to insert firm", {
firm: firm.firmName,
error: error.message,
});
}
}
await stmt.finalize();
logger.info(`Inserted ${inserted} firms into database`);
return inserted;
}
async getFirms(limit = null, offset = 0) {
const sql = limit
? `SELECT * FROM firms ORDER BY id LIMIT ? OFFSET ?`
: `SELECT * FROM firms ORDER BY id`;
const params = limit ? [limit, offset] : [];
return await this.db.all(sql, params);
}
async getFirmByEmail(email) {
return await this.db.get("SELECT * FROM firms WHERE contact_email = ?", [
email,
]);
}
async getFirmById(id) {
return await this.db.get("SELECT * FROM firms WHERE id = ?", [id]);
}
async removeDuplicateFirms() {
// Remove duplicates keeping the first occurrence
const result = await this.db.run(`
DELETE FROM firms
WHERE id NOT IN (
SELECT MIN(id)
FROM firms
GROUP BY contact_email
)
`);
logger.info(`Removed ${result.changes} duplicate firms`);
return result.changes;
}
// Campaign operations
async createCampaign(campaignData) {
const { name, subject, templateName, testMode } = campaignData;
const result = await this.db.run(
`INSERT INTO campaigns (name, subject, template_name, test_mode)
VALUES (?, ?, ?, ?)`,
[name, subject, templateName, testMode ? 1 : 0]
);
logger.info("Campaign created", { campaignId: result.lastID, name });
return result.lastID;
}
async startCampaign(campaignId, totalEmails) {
await this.db.run(
`UPDATE campaigns
SET started_at = CURRENT_TIMESTAMP, total_emails = ?
WHERE id = ?`,
[totalEmails, campaignId]
);
}
async completeCampaign(campaignId, stats) {
await this.db.run(
`UPDATE campaigns
SET completed_at = CURRENT_TIMESTAMP,
sent_emails = ?,
failed_emails = ?
WHERE id = ?`,
[stats.sent, stats.failed, campaignId]
);
}
// Email send tracking
async logEmailSend(emailData) {
const {
campaignId,
firmId,
recipientEmail,
subject,
status,
errorType,
errorMessage,
retryCount,
trackingId,
} = emailData;
const result = await this.db.run(
`INSERT INTO email_sends
(campaign_id, firm_id, recipient_email, subject, status, error_type, error_message, retry_count, tracking_id, sent_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
[
campaignId,
firmId,
recipientEmail,
subject,
status,
errorType,
errorMessage,
retryCount,
trackingId,
status === "sent" ? new Date().toISOString() : null,
]
);
return result.lastID;
}
async updateEmailSendStatus(
emailSendId,
status,
errorType = null,
errorMessage = null
) {
await this.db.run(
`UPDATE email_sends
SET status = ?, error_type = ?, error_message = ?,
sent_at = CASE WHEN ? = 'sent' THEN CURRENT_TIMESTAMP ELSE sent_at END
WHERE id = ?`,
[status, errorType, errorMessage, status, emailSendId]
);
}
// Statistics and reporting
async getCampaignStats(campaignId) {
const stats = await this.db.all(
`
SELECT
status,
COUNT(*) as count
FROM email_sends
WHERE campaign_id = ?
GROUP BY status
`,
[campaignId]
);
const result = {
sent: 0,
failed: 0,
retry: 0,
permanent_failure: 0,
};
stats.forEach((stat) => {
result[stat.status] = stat.count;
});
return result;
}
async getFailedEmails(campaignId) {
return await this.db.all(
`
SELECT es.*, f.firm_name, f.contact_email
FROM email_sends es
JOIN firms f ON es.firm_id = f.id
WHERE es.campaign_id = ? AND es.status IN ('failed', 'permanent_failure')
ORDER BY es.created_at DESC
`,
[campaignId]
);
}
async close() {
if (this.db) {
await this.db.close();
this.db = null;
logger.info("Database connection closed");
}
}
// Utility methods
async getTableCounts() {
const counts = {};
const tables = ["firms", "campaigns", "email_sends", "tracking_events"];
for (const table of tables) {
const result = await this.db.get(
`SELECT COUNT(*) as count FROM ${table}`
);
counts[table] = result.count;
}
return counts;
}
// Tracking events methods
async storeTrackingEvent(trackingId, eventType, eventData = {}) {
const query = `
INSERT INTO tracking_events (tracking_id, event_type, event_data, ip_address, user_agent, referer)
VALUES (?, ?, ?, ?, ?, ?)
`;
await this.db.run(query, [
trackingId,
eventType,
JSON.stringify(eventData),
eventData.ip || null,
eventData.userAgent || null,
eventData.referer || null,
]);
}
async getTrackingEvents(trackingId) {
const query = `
SELECT * FROM tracking_events
WHERE tracking_id = ?
ORDER BY created_at ASC
`;
const events = await this.db.all(query, [trackingId]);
return events.map((event) => ({
...event,
event_data: JSON.parse(event.event_data || "{}"),
}));
}
async getTrackingStats(campaignId) {
const query = `
SELECT
es.tracking_id,
es.recipient_email,
COUNT(CASE WHEN te.event_type = 'open' THEN 1 END) as opens,
COUNT(CASE WHEN te.event_type = 'click' THEN 1 END) as clicks,
MIN(CASE WHEN te.event_type = 'open' THEN te.created_at END) as first_open,
MIN(CASE WHEN te.event_type = 'click' THEN te.created_at END) as first_click
FROM email_sends es
LEFT JOIN tracking_events te ON es.tracking_id = te.tracking_id
WHERE es.campaign_id = ? AND es.status = 'sent'
GROUP BY es.tracking_id, es.recipient_email
ORDER BY es.sent_at ASC
`;
return await this.db.all(query, [campaignId]);
}
}
module.exports = new Database();