tinylink/core/worker/worker.js
mahdahar 10f8dbbb83 refactor: consolidate core runtime and docs
Move middleware sources into core/, refresh config paths, and update design/user docs to reflect the raw payload pipeline.
2026-04-07 11:30:11 +07:00

141 lines
4.1 KiB
JavaScript

const queue = require('../queue/queue');
const client = require('./client');
const logger = require('../logger');
const config = require('../config/config');
let running = false;
let workerPromise;
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
const transientCodes = new Set(['ECONNRESET', 'ETIMEDOUT', 'ECONNREFUSED', 'EAI_AGAIN', 'ENETUNREACH']);
function buildNextAttempt(attempts) {
const schedule = config.retries.schedule;
const index = Math.min(attempts - 1, schedule.length - 1);
const delaySeconds = schedule[index] || schedule[schedule.length - 1] || 60;
return Math.floor(Date.now() / 1000) + delaySeconds;
}
function isTransientError(err) {
if (!err) return true;
if (err.code && transientCodes.has(err.code)) return true;
if (err.message && err.message.toLowerCase().includes('timeout')) return true;
return false;
}
async function handleEntry(entry) {
const payload = JSON.parse(entry.canonical_payload);
const attemptNumber = entry.attempts + 1;
let response;
let attemptStatus = 'failure';
try {
response = await client.deliver(payload);
attemptStatus = response.code >= 200 && response.code < 300 ? 'success' : 'failure';
await queue.recordDeliveryAttempt({
outboxId: entry.id,
attempt: attemptNumber,
status: attemptStatus,
responseCode: response.code,
responseBody: response.body,
latency: response.latency
});
if (attemptStatus === 'success') {
await queue.markOutboxStatus(entry.id, 'processed', {
attempts: attemptNumber,
lastError: null,
nextAttemptAt: Math.floor(Date.now() / 1000)
});
return;
}
if (response.code === 400 || response.code === 422) {
await queue.markOutboxStatus(entry.id, 'dead_letter', {
attempts: attemptNumber,
lastError: `HTTP ${response.code}`,
nextAttemptAt: Math.floor(Date.now() / 1000)
});
await queue.moveToDeadLetter(payload, `HTTP ${response.code}`);
return;
}
if (attemptNumber >= config.retries.maxAttempts) {
await queue.markOutboxStatus(entry.id, 'dead_letter', {
attempts: attemptNumber,
lastError: response.body,
nextAttemptAt: Math.floor(Date.now() / 1000)
});
await queue.moveToDeadLetter(payload, response.body);
return;
}
const nextAttemptAt = buildNextAttempt(attemptNumber);
await queue.markOutboxStatus(entry.id, 'retrying', {
attempts: attemptNumber,
lastError: `HTTP ${response.code}`,
nextAttemptAt
});
} catch (error) {
await queue.recordDeliveryAttempt({
outboxId: entry.id,
attempt: attemptNumber,
status: 'failure',
responseCode: null,
responseBody: error.message,
latency: null
});
const shouldDeadLetter = attemptNumber >= config.retries.maxAttempts || !isTransientError(error);
if (shouldDeadLetter) {
await queue.markOutboxStatus(entry.id, 'dead_letter', {
attempts: attemptNumber,
lastError: error.message,
nextAttemptAt: Math.floor(Date.now() / 1000)
});
await queue.moveToDeadLetter(payload, error.message);
return;
}
const nextAttemptAt = buildNextAttempt(attemptNumber);
await queue.markOutboxStatus(entry.id, 'retrying', {
attempts: attemptNumber,
lastError: error.message,
nextAttemptAt
});
}
}
async function loop() {
while (running) {
try {
const batch = await queue.claimPending(config.worker.batchSize, config.worker.workerId);
if (!batch.length) {
await sleep(config.worker.pollInterval);
continue;
}
for (const entry of batch) {
await handleEntry(entry);
}
} catch (err) {
logger.error({ err: err.message }, 'delivery worker error');
await sleep(config.worker.pollInterval);
}
}
}
async function startWorker() {
if (running) return;
running = true;
workerPromise = loop();
}
async function stopWorker() {
running = false;
if (workerPromise) {
await workerPromise;
}
}
module.exports = { startWorker, stopWorker };