53 lines
2.2 KiB
JavaScript
Raw Normal View History

const queue = require('../queue/sqliteQueue');
const config = require('../../config/default');
const logger = require('../utils/logger');
const { normalize } = require('../normalizers');
const { dedupeKey } = require('../utils/hash');
const instrumentService = require('../instrumentConfig/service');
const parserMap = {
'http-json': require('../parsers/httpParser'),
'hl7-tcp': require('../parsers/hl7Parser'),
'astm-serial': require('../parsers/astmParser')
};
async function processMessage(connector, rawPayload) {
const rawRecord = await queue.insertRaw(connector, rawPayload);
const rawId = rawRecord?.lastID;
try {
const parser = parserMap[connector];
if (!parser) {
throw new Error(`no parser registered for ${connector}`);
}
const parsed = await parser.parse(rawPayload);
const canonical = normalize(parsed);
const instrumentEntry = instrumentService.get(canonical.instrument_id);
if (!instrumentEntry || !instrumentEntry.enabled) {
logger.warn({ instrument: canonical.instrument_id, connector }, 'no enabled instrument config, dropping payload');
await queue.markRawParsed(rawId, 'dropped', 'instrument disabled/no config');
return { dropped: true };
}
if (instrumentEntry.connector !== connector) {
logger.warn({ instrument: canonical.instrument_id, expected: instrumentEntry.connector, actual: connector }, 'connector mismatch for instrument');
await queue.markRawParsed(rawId, 'dropped', 'connector mismatch');
return { dropped: true };
}
const dedupe = dedupeKey(canonical);
canonical.meta = { ...(canonical.meta || {}), instrument_config: instrumentEntry.config };
const inserted = await queue.insertOutbox(canonical, dedupe);
await queue.markRawParsed(rawId, 'processed');
if (inserted && inserted.duplicate) {
logger.info({ dedupe, connector }, 'duplicate payload detected');
}
return { canonical, inserted, dedupe };
} catch (error) {
logger.error({ err: error.message, connector }, 'message processing failed');
if (rawId) {
await queue.markRawParsed(rawId, 'failed', error.message);
}
throw error;
}
}
module.exports = { processMessage };