diff --git a/docs/user_manual.md b/docs/user_manual.md index 629ba2e..d16bc80 100644 --- a/docs/user_manual.md +++ b/docs/user_manual.md @@ -65,6 +65,7 @@ inst1: location: lab-a translator: parser: astm + engine: overrides forceInstrumentId: true meta: profile: astm-default diff --git a/docs/workstation_plan.md b/docs/workstation_plan.md index 51ccbb5..e2fe791 100644 --- a/docs/workstation_plan.md +++ b/docs/workstation_plan.md @@ -37,12 +37,10 @@ middleware/ src/ connectors/ parsers/ - normalizers/ - pipeline/ - queue/ - client/ + domain/ + runtime/ storage/ - routes/ + scripts/ utils/ index.js db/migrations/ diff --git a/middleware/config/app.yaml b/middleware/config/app.yaml index af6c8fa..8da07bc 100644 --- a/middleware/config/app.yaml +++ b/middleware/config/app.yaml @@ -18,6 +18,7 @@ inst1: note: ASTM instrument over serial COM translator: parser: astm + engine: overrides forceInstrumentId: true meta: translator: msg1 diff --git a/middleware/config/astm.js b/middleware/config/astm.js new file mode 100644 index 0000000..862468c --- /dev/null +++ b/middleware/config/astm.js @@ -0,0 +1,21 @@ +const { parse } = require('../src/parsers/astmParser'); + +function translate(entry, parsedPayload, connector) { + const translator = entry && typeof entry.translator === 'object' ? entry.translator : {}; + const overrides = translator.overrides && typeof translator.overrides === 'object' + ? translator.overrides + : {}; + const canonical = { ...parsedPayload, ...overrides }; + if (translator.forceInstrumentId !== false) { + canonical.instrument_id = entry.instrument_id; + } + canonical.meta = { + ...(parsedPayload.meta || {}), + ...(translator.meta && typeof translator.meta === 'object' ? translator.meta : {}), + connector, + instrument_config: entry.config + }; + return canonical; +} + +module.exports = { parse, translate }; diff --git a/middleware/src/instrumentConfig/validator.js b/middleware/src/domain/instrumentConfig.js similarity index 56% rename from middleware/src/instrumentConfig/validator.js rename to middleware/src/domain/instrumentConfig.js index ba94076..9d500fe 100644 --- a/middleware/src/instrumentConfig/validator.js +++ b/middleware/src/domain/instrumentConfig.js @@ -1,5 +1,9 @@ const fs = require('fs'); const config = require('../../config/app'); +const logger = require('../utils/logger'); + +let cache = new Map(); +let refreshInterval; function normalizeConnectorType(type) { const value = String(type || '').trim().toLowerCase(); @@ -103,6 +107,10 @@ function validateAndLoadInstrumentConfigs({ errors.push(`${label}: translator.parser is required`); continue; } + if (resolvedTranslator.engine && typeof resolvedTranslator.engine !== 'string') { + errors.push(`${label}: translator.engine must be a string`); + continue; + } const connectorConfig = item.connectorConfig && typeof item.connectorConfig === 'object' ? item.connectorConfig @@ -146,7 +154,102 @@ function validateAndLoadInstrumentConfigs({ return entries; } +async function reload() { + const rows = validateAndLoadInstrumentConfigs(); + const next = new Map(); + rows.forEach((row) => { + try { + if (row.error) { + throw new Error(row.error); + } + if (!row.instrument_id || !row.connector) { + throw new Error('instrument_id and connector are required'); + } + next.set(row.instrument_id, { + instrument_id: row.instrument_id, + connector: row.connector, + enabled: Boolean(row.enabled), + config: row.config || {}, + match: row.match || {}, + translator: row.translator || {}, + connectorConfig: row.connectorConfig || {}, + files: row.files || null + }); + } catch (err) { + logger.warn({ instrument: row.instrument_id, err: err.message }, 'failed parsing instrument config, skipping'); + } + }); + cache = next; +} + +async function init({ refreshMs = 30_000 } = {}) { + await reload(); + if (refreshInterval) clearInterval(refreshInterval); + refreshInterval = setInterval(() => { + reload().catch((err) => logger.error({ err: err.message }, 'instrument config reload failed')); + }, refreshMs); +} + +function list() { + return Array.from(cache.values()); +} + +function get(instrumentId) { + return cache.get(instrumentId) || null; +} + +function byConnector(connector) { + return list().filter((entry) => entry.connector === connector && entry.enabled); +} + +function addressesEqual(expected, actual) { + if (!expected) return true; + if (!actual) return false; + const normalize = (value) => String(value).replace('::ffff:', ''); + return normalize(expected) === normalize(actual); +} + +function portsEqual(expected, actual) { + if (expected === undefined || expected === null || expected === '') return true; + if (actual === undefined || actual === null || actual === '') return false; + return Number(expected) === Number(actual); +} + +function comPortsEqual(expected, actual) { + if (expected === undefined || expected === null || expected === '') return true; + if (actual === undefined || actual === null || actual === '') return false; + return String(expected).trim().toLowerCase() === String(actual).trim().toLowerCase(); +} + +function matches(entry, connector, context = {}) { + if (context.instrument_id && context.instrument_id !== entry.instrument_id) return false; + if (!entry.enabled || entry.connector !== connector) return false; + const rule = entry.match || {}; + if (!portsEqual(rule.localPort, context.localPort)) return false; + if (!portsEqual(rule.remotePort, context.remotePort)) return false; + if (!addressesEqual(rule.remoteAddress, context.remoteAddress)) return false; + if (!comPortsEqual(rule.comPort || rule.serialPort, context.comPort || context.serialPort)) return false; + return true; +} + +function resolveForMessage(connector, context = {}) { + const candidates = byConnector(connector).filter((entry) => matches(entry, connector, context)); + if (!candidates.length) { + return { status: 'no_match', matches: [] }; + } + if (candidates.length > 1) { + return { status: 'ambiguous', matches: candidates.map((entry) => entry.instrument_id) }; + } + return { status: 'matched', entry: candidates[0] }; +} + module.exports = { InstrumentConfigValidationError, - validateAndLoadInstrumentConfigs + validateAndLoadInstrumentConfigs, + init, + list, + get, + byConnector, + reload, + resolveForMessage }; diff --git a/middleware/src/normalizers/index.js b/middleware/src/domain/normalizer.js similarity index 100% rename from middleware/src/normalizers/index.js rename to middleware/src/domain/normalizer.js diff --git a/middleware/src/domain/parsers.js b/middleware/src/domain/parsers.js new file mode 100644 index 0000000..a38959d --- /dev/null +++ b/middleware/src/domain/parsers.js @@ -0,0 +1,34 @@ +const parserMap = { + 'http-json': require('../parsers/httpParser'), + 'hl7-tcp': require('../parsers/hl7Parser'), + 'astm-serial': require('../parsers/astmParser'), + hl7: require('../parsers/hl7Parser'), + astm: require('../parsers/astmParser'), + http: require('../parsers/httpParser') +}; + +function resolveCustomParser(parserName) { + if (!parserName || typeof parserName !== 'string') return null; + try { + const custom = require(`../../config/${parserName}`); + if (custom && typeof custom.parse === 'function') { + return custom; + } + } catch (error) { + return null; + } + return null; +} + +function resolveParser(connector, instrumentEntry) { + const parserName = instrumentEntry?.translator?.parser || connector; + const parser = parserMap[parserName] || resolveCustomParser(parserName); + if (!parser) { + throw new Error(`no parser registered for ${parserName}`); + } + return parser; +} + +module.exports = { + resolveParser +}; diff --git a/middleware/src/domain/translator.js b/middleware/src/domain/translator.js new file mode 100644 index 0000000..8b0210d --- /dev/null +++ b/middleware/src/domain/translator.js @@ -0,0 +1,41 @@ +function translateOverrides(entry, parsedPayload, connector) { + const translator = entry && typeof entry.translator === 'object' ? entry.translator : {}; + const overrides = translator.overrides && typeof translator.overrides === 'object' + ? translator.overrides + : {}; + const canonical = { ...parsedPayload, ...overrides }; + if (translator.forceInstrumentId !== false) { + canonical.instrument_id = entry.instrument_id; + } + canonical.meta = { + ...(parsedPayload.meta || {}), + ...(translator.meta && typeof translator.meta === 'object' ? translator.meta : {}), + connector, + instrument_config: entry.config + }; + return canonical; +} + +const registry = new Map([ + ['overrides', { translate: translateOverrides }] +]); + +function resolve(name) { + if (!name) return registry.get('overrides'); + const key = String(name).trim().toLowerCase(); + return registry.get(key) || null; +} + +function translate(entry, parsedPayload, connector, engineName) { + const engine = resolve(engineName); + if (!engine) { + const options = engineName ? ` (requested: ${engineName})` : ''; + throw new Error(`translator engine not found${options}`); + } + return engine.translate(entry, parsedPayload, connector); +} + +module.exports = { + resolve, + translate +}; diff --git a/middleware/src/index.js b/middleware/src/index.js index 9b977f2..2da24d4 100644 --- a/middleware/src/index.js +++ b/middleware/src/index.js @@ -1,92 +1,21 @@ -const express = require('express'); -const config = require('../config/app'); const logger = require('./utils/logger'); -const migrate = require('./storage/migrate'); -const { createHttpJsonConnector } = require('./connectors/httpJsonConnector'); -const { createHl7TcpConnector } = require('./connectors/hl7TcpConnector'); -const { createAstmSerialConnector } = require('./connectors/astmSerialConnector'); -const { processMessage } = require('./pipeline/workflow'); -const { startWorker, stopWorker } = require('./pipeline/deliveryWorker'); -const instrumentService = require('./instrumentConfig/service'); -const { validateAndLoadInstrumentConfigs } = require('./instrumentConfig/validator'); -const { createHealthRouter } = require('./routes/health'); -const { router: instrumentRouter } = require('./routes/instrumentConfig'); -const metricsRouter = require('./routes/metrics'); +const { start } = require('./runtime/app'); async function bootstrap() { - validateAndLoadInstrumentConfigs(); - await migrate(); - await instrumentService.init(); - - const connectorFactories = { - 'http-json': createHttpJsonConnector, - 'hl7-tcp': createHl7TcpConnector, - 'astm-serial': createAstmSerialConnector - }; - const connectors = instrumentService.list() - .filter((entry) => entry.enabled) - .map((entry) => { - const createConnector = connectorFactories[entry.connector]; - if (!createConnector) { - logger.warn({ connector: entry.connector, instrument_id: entry.instrument_id }, 'unknown connector in instrument config, skipping startup'); - return null; - } - return createConnector({ - ...(entry.connectorConfig || {}), - instrument_id: entry.instrument_id - }); - }) - .filter(Boolean); - - if (!connectors.length) { - logger.warn('no enabled connectors configured, ingestion listeners are disabled'); - } - - connectors.forEach((connector) => { - connector.onMessage(async (incoming) => { - try { - const payload = incoming && Object.prototype.hasOwnProperty.call(incoming, 'payload') - ? incoming.payload - : incoming; - const context = incoming && incoming.context ? incoming.context : {}; - await processMessage(connector.name(), payload, context); - } catch (err) { - logger.error({ err: err.message, connector: connector.name() }, 'pipeline error'); - } - }); - connector.onError((err) => { - logger.error({ err: err.message }, `${connector.name()} emitted error`); - }); - }); - - await Promise.all(connectors.map((connector) => connector.start())); - await startWorker(); - - const app = express(); - app.use('/health', createHealthRouter(connectors)); - app.use('/instruments', instrumentRouter); - app.use('/metrics', metricsRouter); - app.listen(config.healthPort, () => { - logger.info({ port: config.healthPort }, 'health server ready'); - }); + const { shutdown } = await start(); process.on('SIGINT', async () => { logger.info('shutdown signal received'); - await shutdown(connectors); + await shutdown(); process.exit(0); }); process.on('SIGTERM', async () => { logger.info('terminate signal received'); - await shutdown(connectors); + await shutdown(); process.exit(0); }); } -async function shutdown(connectors) { - await stopWorker(); - await Promise.all(connectors.map((connector) => connector.stop())); -} - bootstrap().catch((err) => { logger.fatal({ err: err.message }, 'failed to start middleware'); process.exit(1); diff --git a/middleware/src/instrumentConfig/service.js b/middleware/src/instrumentConfig/service.js deleted file mode 100644 index fbca788..0000000 --- a/middleware/src/instrumentConfig/service.js +++ /dev/null @@ -1,122 +0,0 @@ -const store = require('../storage/instrumentConfigFileStore'); -const logger = require('../utils/logger'); - -let cache = new Map(); -let refreshInterval; - -async function reload() { - const rows = await store.list(); - const next = new Map(); - rows.forEach((row) => { - try { - if (row.error) { - throw new Error(row.error); - } - if (!row.instrument_id || !row.connector) { - throw new Error('instrument_id and connector are required'); - } - next.set(row.instrument_id, { - instrument_id: row.instrument_id, - connector: row.connector, - enabled: Boolean(row.enabled), - config: row.config || {}, - match: row.match || {}, - translator: row.translator || {}, - connectorConfig: row.connectorConfig || {}, - files: row.files || null - }); - } catch (err) { - logger.warn({ instrument: row.instrument_id, err: err.message }, 'failed parsing instrument config, skipping'); - } - }); - cache = next; -} - -async function init({ refreshMs = 30_000 } = {}) { - await reload(); - if (refreshInterval) clearInterval(refreshInterval); - refreshInterval = setInterval(() => { - reload().catch((err) => logger.error({ err: err.message }, 'instrument config reload failed')); - }, refreshMs); -} - -function list() { - return Array.from(cache.values()); -} - -function get(instrumentId) { - return cache.get(instrumentId) || null; -} - -function byConnector(connector) { - return list().filter((entry) => entry.connector === connector && entry.enabled); -} - -function addressesEqual(expected, actual) { - if (!expected) return true; - if (!actual) return false; - const normalize = (value) => String(value).replace('::ffff:', ''); - return normalize(expected) === normalize(actual); -} - -function portsEqual(expected, actual) { - if (expected === undefined || expected === null || expected === '') return true; - if (actual === undefined || actual === null || actual === '') return false; - return Number(expected) === Number(actual); -} - -function comPortsEqual(expected, actual) { - if (expected === undefined || expected === null || expected === '') return true; - if (actual === undefined || actual === null || actual === '') return false; - return String(expected).trim().toLowerCase() === String(actual).trim().toLowerCase(); -} - -function matches(entry, connector, context = {}) { - if (context.instrument_id && context.instrument_id !== entry.instrument_id) return false; - if (!entry.enabled || entry.connector !== connector) return false; - const rule = entry.match || {}; - if (!portsEqual(rule.localPort, context.localPort)) return false; - if (!portsEqual(rule.remotePort, context.remotePort)) return false; - if (!addressesEqual(rule.remoteAddress, context.remoteAddress)) return false; - if (!comPortsEqual(rule.comPort || rule.serialPort, context.comPort || context.serialPort)) return false; - return true; -} - -function resolveForMessage(connector, context = {}) { - const candidates = byConnector(connector).filter((entry) => matches(entry, connector, context)); - if (!candidates.length) { - return { status: 'no_match', matches: [] }; - } - if (candidates.length > 1) { - return { status: 'ambiguous', matches: candidates.map((entry) => entry.instrument_id) }; - } - return { status: 'matched', entry: candidates[0] }; -} - -function applyTranslator(entry, parsedPayload, connector) { - const translator = entry.translator || {}; - const overrides = translator.overrides && typeof translator.overrides === 'object' - ? translator.overrides - : {}; - const canonical = { ...parsedPayload, ...overrides }; - if (translator.forceInstrumentId !== false) { - canonical.instrument_id = entry.instrument_id; - } - canonical.meta = { - ...(parsedPayload.meta || {}), - ...(translator.meta && typeof translator.meta === 'object' ? translator.meta : {}), - connector, - instrument_config: entry.config - }; - return canonical; -} - -module.exports = { - init, - list, - get, - byConnector, - reload, - resolveForMessage, - applyTranslator -}; diff --git a/middleware/src/routes/health.js b/middleware/src/routes/health.js deleted file mode 100644 index c26ff7f..0000000 --- a/middleware/src/routes/health.js +++ /dev/null @@ -1,35 +0,0 @@ -const express = require('express'); -const queue = require('../queue/sqliteQueue'); - -function createHealthRouter(connectors = []) { - const router = express.Router(); - - router.get('/', async (req, res) => { - const connectorStatuses = connectors.map((connector) => connector.health()); - const pending = await queue.pendingCount(); - const retrying = await queue.retryingCount(); - const deadLetters = await queue.deadLetterCount(); - res.json({ - status: 'ok', - connectors: connectorStatuses, - metrics: { - pending, - retrying, - deadLetters - } - }); - }); - - router.get('/ready', async (req, res) => { - try { - await queue.ping(); - res.json({ status: 'ready' }); - } catch (err) { - res.status(503).json({ status: 'unready', reason: err.message }); - } - }); - - return router; -} - -module.exports = { createHealthRouter }; diff --git a/middleware/src/routes/instrumentConfig.js b/middleware/src/routes/instrumentConfig.js deleted file mode 100644 index 1872f7b..0000000 --- a/middleware/src/routes/instrumentConfig.js +++ /dev/null @@ -1,18 +0,0 @@ -const express = require('express'); -const service = require('../instrumentConfig/service'); - -const router = express.Router(); - -router.get('/', async (req, res) => { - res.json(service.list()); -}); - -router.get('/:id', async (req, res) => { - const entry = service.get(req.params.id); - if (!entry) { - return res.status(404).json({ error: 'not found' }); - } - res.json(entry); -}); - -module.exports = { router }; diff --git a/middleware/src/runtime/app.js b/middleware/src/runtime/app.js new file mode 100644 index 0000000..38eded8 --- /dev/null +++ b/middleware/src/runtime/app.js @@ -0,0 +1,84 @@ +const config = require('../../config/app'); +const logger = require('../utils/logger'); +const migrate = require('../storage/migrate'); +const { createHttpJsonConnector } = require('../connectors/httpJsonConnector'); +const { createHl7TcpConnector } = require('../connectors/hl7TcpConnector'); +const { createAstmSerialConnector } = require('../connectors/astmSerialConnector'); +const { processMessage } = require('./pipeline'); +const { startWorker, stopWorker } = require('./worker'); +const instrumentConfig = require('../domain/instrumentConfig'); +const { createHttpServer } = require('./http'); + +const connectorFactories = { + 'http-json': createHttpJsonConnector, + 'hl7-tcp': createHl7TcpConnector, + 'astm-serial': createAstmSerialConnector +}; + +function buildConnectors() { + return instrumentConfig.list() + .filter((entry) => entry.enabled) + .map((entry) => { + const createConnector = connectorFactories[entry.connector]; + if (!createConnector) { + logger.warn({ connector: entry.connector, instrument_id: entry.instrument_id }, 'unknown connector in instrument config, skipping startup'); + return null; + } + return createConnector({ + ...(entry.connectorConfig || {}), + instrument_id: entry.instrument_id + }); + }) + .filter(Boolean); +} + +function attachConnectorHandlers(connectors) { + connectors.forEach((connector) => { + connector.onMessage(async (incoming) => { + try { + const payload = incoming && Object.prototype.hasOwnProperty.call(incoming, 'payload') + ? incoming.payload + : incoming; + const context = incoming && incoming.context ? incoming.context : {}; + await processMessage(connector.name(), payload, context); + } catch (err) { + logger.error({ err: err.message, connector: connector.name() }, 'pipeline error'); + } + }); + connector.onError((err) => { + logger.error({ err: err.message }, `${connector.name()} emitted error`); + }); + }); +} + +async function start() { + instrumentConfig.validateAndLoadInstrumentConfigs(); + await migrate(); + await instrumentConfig.init(); + + const connectors = buildConnectors(); + if (!connectors.length) { + logger.warn('no enabled connectors configured, ingestion listeners are disabled'); + } + + attachConnectorHandlers(connectors); + await Promise.all(connectors.map((connector) => connector.start())); + await startWorker(); + + const app = createHttpServer(connectors); + const server = app.listen(config.healthPort, () => { + logger.info({ port: config.healthPort }, 'health server ready'); + }); + + async function shutdown() { + await stopWorker(); + await Promise.all(connectors.map((connector) => connector.stop())); + if (server) { + await new Promise((resolve) => server.close(resolve)); + } + } + + return { connectors, server, shutdown }; +} + +module.exports = { start }; diff --git a/middleware/src/client/clqmsClient.js b/middleware/src/runtime/client.js similarity index 100% rename from middleware/src/client/clqmsClient.js rename to middleware/src/runtime/client.js diff --git a/middleware/src/routes/metrics.js b/middleware/src/runtime/http.js similarity index 50% rename from middleware/src/routes/metrics.js rename to middleware/src/runtime/http.js index 0a3df95..535de6a 100644 --- a/middleware/src/routes/metrics.js +++ b/middleware/src/runtime/http.js @@ -1,7 +1,53 @@ const express = require('express'); -const queue = require('../queue/sqliteQueue'); +const queue = require('./queue'); +const instrumentConfig = require('../domain/instrumentConfig'); -const router = express.Router(); +function createHealthRouter(connectors = []) { + const router = express.Router(); + + router.get('/', async (req, res) => { + const connectorStatuses = connectors.map((connector) => connector.health()); + const pending = await queue.pendingCount(); + const retrying = await queue.retryingCount(); + const deadLetters = await queue.deadLetterCount(); + res.json({ + status: 'ok', + connectors: connectorStatuses, + metrics: { + pending, + retrying, + deadLetters + } + }); + }); + + router.get('/ready', async (req, res) => { + try { + await queue.ping(); + res.json({ status: 'ready' }); + } catch (err) { + res.status(503).json({ status: 'unready', reason: err.message }); + } + }); + + return router; +} + +const instrumentRouter = express.Router(); + +instrumentRouter.get('/', async (req, res) => { + res.json(instrumentConfig.list()); +}); + +instrumentRouter.get('/:id', async (req, res) => { + const entry = instrumentConfig.get(req.params.id); + if (!entry) { + return res.status(404).json({ error: 'not found' }); + } + res.json(entry); +}); + +const metricsRouter = express.Router(); function formatMetric(name, value, type = 'gauge', help = '') { const lines = []; @@ -13,7 +59,7 @@ function formatMetric(name, value, type = 'gauge', help = '') { return lines.join('\n'); } -router.get('/', async (req, res) => { +metricsRouter.get('/', async (req, res) => { try { const pending = await queue.pendingCount(); const retrying = await queue.retryingCount(); @@ -37,4 +83,12 @@ router.get('/', async (req, res) => { } }); -module.exports = router; +function createHttpServer(connectors) { + const app = express(); + app.use('/health', createHealthRouter(connectors)); + app.use('/instruments', instrumentRouter); + app.use('/metrics', metricsRouter); + return app; +} + +module.exports = { createHttpServer }; diff --git a/middleware/src/pipeline/workflow.js b/middleware/src/runtime/pipeline.js similarity index 63% rename from middleware/src/pipeline/workflow.js rename to middleware/src/runtime/pipeline.js index fb05753..b5e917f 100644 --- a/middleware/src/pipeline/workflow.js +++ b/middleware/src/runtime/pipeline.js @@ -1,32 +1,21 @@ -const queue = require('../queue/sqliteQueue'); +const queue = require('./queue'); const logger = require('../utils/logger'); -const { normalize } = require('../normalizers'); +const { normalize } = require('../domain/normalizer'); const { dedupeKey } = require('../utils/hash'); -const instrumentService = require('../instrumentConfig/service'); +const instrumentConfig = require('../domain/instrumentConfig'); +const { resolveParser } = require('../domain/parsers'); +const translator = require('../domain/translator'); -const parserMap = { - 'http-json': require('../parsers/httpParser'), - 'hl7-tcp': require('../parsers/hl7Parser'), - 'astm-serial': require('../parsers/astmParser'), - hl7: require('../parsers/hl7Parser'), - astm: require('../parsers/astmParser'), - http: require('../parsers/httpParser') -}; - -function resolveParser(connector, instrumentEntry) { - const parserName = instrumentEntry?.translator?.parser || connector; - const parser = parserMap[parserName]; - if (!parser) { - throw new Error(`no parser registered for ${parserName}`); - } - return parser; +function translatePayload(entry, parsedPayload, connector) { + const engineName = entry?.translator?.engine || entry?.translator?.name; + return translator.translate(entry, parsedPayload, connector, engineName); } async function processMessage(connector, rawPayload, context = {}) { const rawRecord = await queue.insertRaw(connector, rawPayload); const rawId = rawRecord?.lastID; try { - const matcher = instrumentService.resolveForMessage(connector, context); + const matcher = instrumentConfig.resolveForMessage(connector, context); if (matcher.status === 'no_match') { logger.warn({ connector, context }, 'no matching instrument config, dropping payload'); await queue.markRawParsed(rawId, 'dropped', 'no matching instrument config'); @@ -41,7 +30,9 @@ async function processMessage(connector, rawPayload, context = {}) { const instrumentEntry = matcher.entry; const parser = resolveParser(connector, instrumentEntry); const parsed = await parser.parse(rawPayload); - const translated = instrumentService.applyTranslator(instrumentEntry, parsed, connector); + const translated = parser.translate + ? parser.translate(instrumentEntry, parsed, connector) + : translatePayload(instrumentEntry, parsed, connector); const canonical = normalize(translated); const dedupe = dedupeKey(canonical); const inserted = await queue.insertOutbox(canonical, dedupe); diff --git a/middleware/src/queue/sqliteQueue.js b/middleware/src/runtime/queue.js similarity index 100% rename from middleware/src/queue/sqliteQueue.js rename to middleware/src/runtime/queue.js diff --git a/middleware/src/pipeline/deliveryWorker.js b/middleware/src/runtime/worker.js similarity index 97% rename from middleware/src/pipeline/deliveryWorker.js rename to middleware/src/runtime/worker.js index 323dcb6..be7b9df 100644 --- a/middleware/src/pipeline/deliveryWorker.js +++ b/middleware/src/runtime/worker.js @@ -1,5 +1,5 @@ -const queue = require('../queue/sqliteQueue'); -const client = require('../client/clqmsClient'); +const queue = require('./queue'); +const client = require('./client'); const logger = require('../utils/logger'); const config = require('../../config/app'); diff --git a/middleware/src/scripts/instrumentCheck.js b/middleware/src/scripts/instrumentCheck.js index 6780baa..60b473a 100644 --- a/middleware/src/scripts/instrumentCheck.js +++ b/middleware/src/scripts/instrumentCheck.js @@ -1,7 +1,7 @@ const { InstrumentConfigValidationError, validateAndLoadInstrumentConfigs -} = require('../instrumentConfig/validator'); +} = require('../domain/instrumentConfig'); function main() { try { diff --git a/middleware/src/storage/instrumentConfigFileStore.js b/middleware/src/storage/instrumentConfigFileStore.js deleted file mode 100644 index af39fc4..0000000 --- a/middleware/src/storage/instrumentConfigFileStore.js +++ /dev/null @@ -1,9 +0,0 @@ -const { validateAndLoadInstrumentConfigs } = require('../instrumentConfig/validator'); - -class InstrumentConfigFileStore { - async list() { - return validateAndLoadInstrumentConfigs(); - } -} - -module.exports = new InstrumentConfigFileStore(); diff --git a/middleware/src/storage/instrumentConfigStore.js b/middleware/src/storage/instrumentConfigStore.js deleted file mode 100644 index cee56c4..0000000 --- a/middleware/src/storage/instrumentConfigStore.js +++ /dev/null @@ -1,31 +0,0 @@ -const DatabaseClient = require('./db'); -const config = require('../../config/app'); - -class InstrumentConfigStore { - constructor() { - this.db = new DatabaseClient(config.db); - } - - async list() { - return this.db.all(`SELECT instrument_id, connector, enabled, config FROM instrument_config`); - } - - async get(instrumentId) { - return this.db.get( - `SELECT instrument_id, connector, enabled, config FROM instrument_config WHERE instrument_id = ?`, - [instrumentId] - ); - } - - async upsert({ instrument_id, connector, enabled = 1, config: cfg = {} }) { - const payload = JSON.stringify(cfg); - await this.db.run( - `INSERT INTO instrument_config (instrument_id, connector, enabled, config) VALUES (?, ?, ?, ?) - ON CONFLICT(instrument_id) DO UPDATE SET connector = excluded.connector, enabled = excluded.enabled, config = excluded.config`, - [instrument_id, connector, Number(enabled), payload] - ); - return this.get(instrument_id); - } -} - -module.exports = new InstrumentConfigStore(); diff --git a/package-lock.json b/package-lock.json index 28d5be8..ccd5e73 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5,7 +5,7 @@ "requires": true, "packages": { "": { - "name": "pandalink", + "name": "tinylink", "version": "1.0.0", "license": "ISC", "dependencies": { @@ -1577,19 +1577,6 @@ "url": "https://opencollective.com/express" } }, - "node_modules/picomatch": { - "version": "4.0.4", - "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.4.tgz", - "integrity": "sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A==", - "license": "MIT", - "optional": true, - "engines": { - "node": ">=12" - }, - "funding": { - "url": "https://github.com/sponsors/jonschlinkert" - } - }, "node_modules/pino": { "version": "10.3.1", "resolved": "https://registry.npmjs.org/pino/-/pino-10.3.1.tgz",