tinylink/core/pipeline/pipeline.js

61 lines
2.3 KiB
JavaScript

const queue = require('../queue/queue');
const logger = require('../util/logger');
const { normalize } = require('./normalizer');
const { dedupeKey } = require('./hash');
const instrumentConfig = require('../config/instrumentConfig');
const translator = require('./translator');
function translatePayload(entry, parsedPayload, connector) {
const engineName = entry?.translator?.engine || entry?.translator?.name;
return translator.translate(entry, parsedPayload, connector, engineName);
}
async function processMessage(connector, rawPayload, context = {}) {
const rawRecord = await queue.insertRaw(connector, rawPayload);
const rawId = rawRecord?.lastID;
try {
const matcher = instrumentConfig.resolveForMessage(connector, context);
if (matcher.status === 'no_match') {
logger.warn({ connector, context }, 'no matching instrument config, dropping payload');
await queue.markRawParsed(rawId, 'dropped', 'no matching instrument config');
return { dropped: true };
}
if (matcher.status === 'ambiguous') {
logger.warn({ connector, context, matches: matcher.matches }, 'ambiguous instrument match, dropping payload');
await queue.markRawParsed(rawId, 'dropped', 'ambiguous instrument match');
return { dropped: true };
}
const instrumentEntry = matcher.entry;
const translated = translatePayload(instrumentEntry, {
instrument_id: instrumentEntry.instrument_id,
sample_id: String(context.sample_id || `raw-${Date.now()}`),
result_time: new Date().toISOString(),
results: [{
test_code: 'RAW',
value: String(rawPayload)
}],
meta: {
raw_payload: rawPayload,
context
}
}, connector);
const canonical = normalize(translated);
const dedupe = dedupeKey(canonical);
const inserted = await queue.insertOutbox(canonical, dedupe);
await queue.markRawParsed(rawId, 'processed');
if (inserted && inserted.duplicate) {
logger.info({ dedupe, connector }, 'duplicate payload detected');
}
return { canonical, inserted, dedupe };
} catch (error) {
logger.error({ err: error.message, connector }, 'message processing failed');
if (rawId) {
await queue.markRawParsed(rawId, 'failed', error.message);
}
throw error;
}
}
module.exports = { processMessage };