const queue = require('../queue/sqliteQueue'); const logger = require('../utils/logger'); const { normalize } = require('../normalizers'); const { dedupeKey } = require('../utils/hash'); const instrumentService = require('../instrumentConfig/service'); const parserMap = { 'http-json': require('../parsers/httpParser'), 'hl7-tcp': require('../parsers/hl7Parser'), 'astm-serial': require('../parsers/astmParser'), hl7: require('../parsers/hl7Parser'), astm: require('../parsers/astmParser'), http: require('../parsers/httpParser') }; function resolveParser(connector, instrumentEntry) { const parserName = instrumentEntry?.translator?.parser || connector; const parser = parserMap[parserName]; if (!parser) { throw new Error(`no parser registered for ${parserName}`); } return parser; } async function processMessage(connector, rawPayload, context = {}) { const rawRecord = await queue.insertRaw(connector, rawPayload); const rawId = rawRecord?.lastID; try { const matcher = instrumentService.resolveForMessage(connector, context); if (matcher.status === 'no_match') { logger.warn({ connector, context }, 'no matching instrument config, dropping payload'); await queue.markRawParsed(rawId, 'dropped', 'no matching instrument config'); return { dropped: true }; } if (matcher.status === 'ambiguous') { logger.warn({ connector, context, matches: matcher.matches }, 'ambiguous instrument match, dropping payload'); await queue.markRawParsed(rawId, 'dropped', 'ambiguous instrument match'); return { dropped: true }; } const instrumentEntry = matcher.entry; const parser = resolveParser(connector, instrumentEntry); const parsed = await parser.parse(rawPayload); const translated = instrumentService.applyTranslator(instrumentEntry, parsed, connector); const canonical = normalize(translated); const dedupe = dedupeKey(canonical); const inserted = await queue.insertOutbox(canonical, dedupe); await queue.markRawParsed(rawId, 'processed'); if (inserted && inserted.duplicate) { logger.info({ dedupe, connector }, 'duplicate payload detected'); } return { canonical, inserted, dedupe }; } catch (error) { logger.error({ err: error.message, connector }, 'message processing failed'); if (rawId) { await queue.markRawParsed(rawId, 'failed', error.message); } throw error; } } module.exports = { processMessage };