refactor: modularize middleware runtime and domain layers

- move pipeline, queue, and config logic into runtime/domain modules

- remove legacy client/routes/normalizers/pipeline wiring from core

- update connector usage and instrument check to new domain entrypoints

- document new structure and add translator engine overrides in configs

- align package metadata name in lockfile
This commit is contained in:
mahdahar 2026-04-07 07:30:07 +07:00
parent dc6cca71cf
commit 10638ceb1b
22 changed files with 367 additions and 338 deletions

View File

@ -65,6 +65,7 @@ inst1:
location: lab-a
translator:
parser: astm
engine: overrides
forceInstrumentId: true
meta:
profile: astm-default

View File

@ -37,12 +37,10 @@ middleware/
src/
connectors/
parsers/
normalizers/
pipeline/
queue/
client/
domain/
runtime/
storage/
routes/
scripts/
utils/
index.js
db/migrations/

View File

@ -18,6 +18,7 @@ inst1:
note: ASTM instrument over serial COM
translator:
parser: astm
engine: overrides
forceInstrumentId: true
meta:
translator: msg1

21
middleware/config/astm.js Normal file
View File

@ -0,0 +1,21 @@
const { parse } = require('../src/parsers/astmParser');
function translate(entry, parsedPayload, connector) {
const translator = entry && typeof entry.translator === 'object' ? entry.translator : {};
const overrides = translator.overrides && typeof translator.overrides === 'object'
? translator.overrides
: {};
const canonical = { ...parsedPayload, ...overrides };
if (translator.forceInstrumentId !== false) {
canonical.instrument_id = entry.instrument_id;
}
canonical.meta = {
...(parsedPayload.meta || {}),
...(translator.meta && typeof translator.meta === 'object' ? translator.meta : {}),
connector,
instrument_config: entry.config
};
return canonical;
}
module.exports = { parse, translate };

View File

@ -1,5 +1,9 @@
const fs = require('fs');
const config = require('../../config/app');
const logger = require('../utils/logger');
let cache = new Map();
let refreshInterval;
function normalizeConnectorType(type) {
const value = String(type || '').trim().toLowerCase();
@ -103,6 +107,10 @@ function validateAndLoadInstrumentConfigs({
errors.push(`${label}: translator.parser is required`);
continue;
}
if (resolvedTranslator.engine && typeof resolvedTranslator.engine !== 'string') {
errors.push(`${label}: translator.engine must be a string`);
continue;
}
const connectorConfig = item.connectorConfig && typeof item.connectorConfig === 'object'
? item.connectorConfig
@ -146,7 +154,102 @@ function validateAndLoadInstrumentConfigs({
return entries;
}
async function reload() {
const rows = validateAndLoadInstrumentConfigs();
const next = new Map();
rows.forEach((row) => {
try {
if (row.error) {
throw new Error(row.error);
}
if (!row.instrument_id || !row.connector) {
throw new Error('instrument_id and connector are required');
}
next.set(row.instrument_id, {
instrument_id: row.instrument_id,
connector: row.connector,
enabled: Boolean(row.enabled),
config: row.config || {},
match: row.match || {},
translator: row.translator || {},
connectorConfig: row.connectorConfig || {},
files: row.files || null
});
} catch (err) {
logger.warn({ instrument: row.instrument_id, err: err.message }, 'failed parsing instrument config, skipping');
}
});
cache = next;
}
async function init({ refreshMs = 30_000 } = {}) {
await reload();
if (refreshInterval) clearInterval(refreshInterval);
refreshInterval = setInterval(() => {
reload().catch((err) => logger.error({ err: err.message }, 'instrument config reload failed'));
}, refreshMs);
}
function list() {
return Array.from(cache.values());
}
function get(instrumentId) {
return cache.get(instrumentId) || null;
}
function byConnector(connector) {
return list().filter((entry) => entry.connector === connector && entry.enabled);
}
function addressesEqual(expected, actual) {
if (!expected) return true;
if (!actual) return false;
const normalize = (value) => String(value).replace('::ffff:', '');
return normalize(expected) === normalize(actual);
}
function portsEqual(expected, actual) {
if (expected === undefined || expected === null || expected === '') return true;
if (actual === undefined || actual === null || actual === '') return false;
return Number(expected) === Number(actual);
}
function comPortsEqual(expected, actual) {
if (expected === undefined || expected === null || expected === '') return true;
if (actual === undefined || actual === null || actual === '') return false;
return String(expected).trim().toLowerCase() === String(actual).trim().toLowerCase();
}
function matches(entry, connector, context = {}) {
if (context.instrument_id && context.instrument_id !== entry.instrument_id) return false;
if (!entry.enabled || entry.connector !== connector) return false;
const rule = entry.match || {};
if (!portsEqual(rule.localPort, context.localPort)) return false;
if (!portsEqual(rule.remotePort, context.remotePort)) return false;
if (!addressesEqual(rule.remoteAddress, context.remoteAddress)) return false;
if (!comPortsEqual(rule.comPort || rule.serialPort, context.comPort || context.serialPort)) return false;
return true;
}
function resolveForMessage(connector, context = {}) {
const candidates = byConnector(connector).filter((entry) => matches(entry, connector, context));
if (!candidates.length) {
return { status: 'no_match', matches: [] };
}
if (candidates.length > 1) {
return { status: 'ambiguous', matches: candidates.map((entry) => entry.instrument_id) };
}
return { status: 'matched', entry: candidates[0] };
}
module.exports = {
InstrumentConfigValidationError,
validateAndLoadInstrumentConfigs
validateAndLoadInstrumentConfigs,
init,
list,
get,
byConnector,
reload,
resolveForMessage
};

View File

@ -0,0 +1,34 @@
const parserMap = {
'http-json': require('../parsers/httpParser'),
'hl7-tcp': require('../parsers/hl7Parser'),
'astm-serial': require('../parsers/astmParser'),
hl7: require('../parsers/hl7Parser'),
astm: require('../parsers/astmParser'),
http: require('../parsers/httpParser')
};
function resolveCustomParser(parserName) {
if (!parserName || typeof parserName !== 'string') return null;
try {
const custom = require(`../../config/${parserName}`);
if (custom && typeof custom.parse === 'function') {
return custom;
}
} catch (error) {
return null;
}
return null;
}
function resolveParser(connector, instrumentEntry) {
const parserName = instrumentEntry?.translator?.parser || connector;
const parser = parserMap[parserName] || resolveCustomParser(parserName);
if (!parser) {
throw new Error(`no parser registered for ${parserName}`);
}
return parser;
}
module.exports = {
resolveParser
};

View File

@ -0,0 +1,41 @@
function translateOverrides(entry, parsedPayload, connector) {
const translator = entry && typeof entry.translator === 'object' ? entry.translator : {};
const overrides = translator.overrides && typeof translator.overrides === 'object'
? translator.overrides
: {};
const canonical = { ...parsedPayload, ...overrides };
if (translator.forceInstrumentId !== false) {
canonical.instrument_id = entry.instrument_id;
}
canonical.meta = {
...(parsedPayload.meta || {}),
...(translator.meta && typeof translator.meta === 'object' ? translator.meta : {}),
connector,
instrument_config: entry.config
};
return canonical;
}
const registry = new Map([
['overrides', { translate: translateOverrides }]
]);
function resolve(name) {
if (!name) return registry.get('overrides');
const key = String(name).trim().toLowerCase();
return registry.get(key) || null;
}
function translate(entry, parsedPayload, connector, engineName) {
const engine = resolve(engineName);
if (!engine) {
const options = engineName ? ` (requested: ${engineName})` : '';
throw new Error(`translator engine not found${options}`);
}
return engine.translate(entry, parsedPayload, connector);
}
module.exports = {
resolve,
translate
};

View File

@ -1,92 +1,21 @@
const express = require('express');
const config = require('../config/app');
const logger = require('./utils/logger');
const migrate = require('./storage/migrate');
const { createHttpJsonConnector } = require('./connectors/httpJsonConnector');
const { createHl7TcpConnector } = require('./connectors/hl7TcpConnector');
const { createAstmSerialConnector } = require('./connectors/astmSerialConnector');
const { processMessage } = require('./pipeline/workflow');
const { startWorker, stopWorker } = require('./pipeline/deliveryWorker');
const instrumentService = require('./instrumentConfig/service');
const { validateAndLoadInstrumentConfigs } = require('./instrumentConfig/validator');
const { createHealthRouter } = require('./routes/health');
const { router: instrumentRouter } = require('./routes/instrumentConfig');
const metricsRouter = require('./routes/metrics');
const { start } = require('./runtime/app');
async function bootstrap() {
validateAndLoadInstrumentConfigs();
await migrate();
await instrumentService.init();
const connectorFactories = {
'http-json': createHttpJsonConnector,
'hl7-tcp': createHl7TcpConnector,
'astm-serial': createAstmSerialConnector
};
const connectors = instrumentService.list()
.filter((entry) => entry.enabled)
.map((entry) => {
const createConnector = connectorFactories[entry.connector];
if (!createConnector) {
logger.warn({ connector: entry.connector, instrument_id: entry.instrument_id }, 'unknown connector in instrument config, skipping startup');
return null;
}
return createConnector({
...(entry.connectorConfig || {}),
instrument_id: entry.instrument_id
});
})
.filter(Boolean);
if (!connectors.length) {
logger.warn('no enabled connectors configured, ingestion listeners are disabled');
}
connectors.forEach((connector) => {
connector.onMessage(async (incoming) => {
try {
const payload = incoming && Object.prototype.hasOwnProperty.call(incoming, 'payload')
? incoming.payload
: incoming;
const context = incoming && incoming.context ? incoming.context : {};
await processMessage(connector.name(), payload, context);
} catch (err) {
logger.error({ err: err.message, connector: connector.name() }, 'pipeline error');
}
});
connector.onError((err) => {
logger.error({ err: err.message }, `${connector.name()} emitted error`);
});
});
await Promise.all(connectors.map((connector) => connector.start()));
await startWorker();
const app = express();
app.use('/health', createHealthRouter(connectors));
app.use('/instruments', instrumentRouter);
app.use('/metrics', metricsRouter);
app.listen(config.healthPort, () => {
logger.info({ port: config.healthPort }, 'health server ready');
});
const { shutdown } = await start();
process.on('SIGINT', async () => {
logger.info('shutdown signal received');
await shutdown(connectors);
await shutdown();
process.exit(0);
});
process.on('SIGTERM', async () => {
logger.info('terminate signal received');
await shutdown(connectors);
await shutdown();
process.exit(0);
});
}
async function shutdown(connectors) {
await stopWorker();
await Promise.all(connectors.map((connector) => connector.stop()));
}
bootstrap().catch((err) => {
logger.fatal({ err: err.message }, 'failed to start middleware');
process.exit(1);

View File

@ -1,122 +0,0 @@
const store = require('../storage/instrumentConfigFileStore');
const logger = require('../utils/logger');
let cache = new Map();
let refreshInterval;
async function reload() {
const rows = await store.list();
const next = new Map();
rows.forEach((row) => {
try {
if (row.error) {
throw new Error(row.error);
}
if (!row.instrument_id || !row.connector) {
throw new Error('instrument_id and connector are required');
}
next.set(row.instrument_id, {
instrument_id: row.instrument_id,
connector: row.connector,
enabled: Boolean(row.enabled),
config: row.config || {},
match: row.match || {},
translator: row.translator || {},
connectorConfig: row.connectorConfig || {},
files: row.files || null
});
} catch (err) {
logger.warn({ instrument: row.instrument_id, err: err.message }, 'failed parsing instrument config, skipping');
}
});
cache = next;
}
async function init({ refreshMs = 30_000 } = {}) {
await reload();
if (refreshInterval) clearInterval(refreshInterval);
refreshInterval = setInterval(() => {
reload().catch((err) => logger.error({ err: err.message }, 'instrument config reload failed'));
}, refreshMs);
}
function list() {
return Array.from(cache.values());
}
function get(instrumentId) {
return cache.get(instrumentId) || null;
}
function byConnector(connector) {
return list().filter((entry) => entry.connector === connector && entry.enabled);
}
function addressesEqual(expected, actual) {
if (!expected) return true;
if (!actual) return false;
const normalize = (value) => String(value).replace('::ffff:', '');
return normalize(expected) === normalize(actual);
}
function portsEqual(expected, actual) {
if (expected === undefined || expected === null || expected === '') return true;
if (actual === undefined || actual === null || actual === '') return false;
return Number(expected) === Number(actual);
}
function comPortsEqual(expected, actual) {
if (expected === undefined || expected === null || expected === '') return true;
if (actual === undefined || actual === null || actual === '') return false;
return String(expected).trim().toLowerCase() === String(actual).trim().toLowerCase();
}
function matches(entry, connector, context = {}) {
if (context.instrument_id && context.instrument_id !== entry.instrument_id) return false;
if (!entry.enabled || entry.connector !== connector) return false;
const rule = entry.match || {};
if (!portsEqual(rule.localPort, context.localPort)) return false;
if (!portsEqual(rule.remotePort, context.remotePort)) return false;
if (!addressesEqual(rule.remoteAddress, context.remoteAddress)) return false;
if (!comPortsEqual(rule.comPort || rule.serialPort, context.comPort || context.serialPort)) return false;
return true;
}
function resolveForMessage(connector, context = {}) {
const candidates = byConnector(connector).filter((entry) => matches(entry, connector, context));
if (!candidates.length) {
return { status: 'no_match', matches: [] };
}
if (candidates.length > 1) {
return { status: 'ambiguous', matches: candidates.map((entry) => entry.instrument_id) };
}
return { status: 'matched', entry: candidates[0] };
}
function applyTranslator(entry, parsedPayload, connector) {
const translator = entry.translator || {};
const overrides = translator.overrides && typeof translator.overrides === 'object'
? translator.overrides
: {};
const canonical = { ...parsedPayload, ...overrides };
if (translator.forceInstrumentId !== false) {
canonical.instrument_id = entry.instrument_id;
}
canonical.meta = {
...(parsedPayload.meta || {}),
...(translator.meta && typeof translator.meta === 'object' ? translator.meta : {}),
connector,
instrument_config: entry.config
};
return canonical;
}
module.exports = {
init,
list,
get,
byConnector,
reload,
resolveForMessage,
applyTranslator
};

View File

@ -1,35 +0,0 @@
const express = require('express');
const queue = require('../queue/sqliteQueue');
function createHealthRouter(connectors = []) {
const router = express.Router();
router.get('/', async (req, res) => {
const connectorStatuses = connectors.map((connector) => connector.health());
const pending = await queue.pendingCount();
const retrying = await queue.retryingCount();
const deadLetters = await queue.deadLetterCount();
res.json({
status: 'ok',
connectors: connectorStatuses,
metrics: {
pending,
retrying,
deadLetters
}
});
});
router.get('/ready', async (req, res) => {
try {
await queue.ping();
res.json({ status: 'ready' });
} catch (err) {
res.status(503).json({ status: 'unready', reason: err.message });
}
});
return router;
}
module.exports = { createHealthRouter };

View File

@ -1,18 +0,0 @@
const express = require('express');
const service = require('../instrumentConfig/service');
const router = express.Router();
router.get('/', async (req, res) => {
res.json(service.list());
});
router.get('/:id', async (req, res) => {
const entry = service.get(req.params.id);
if (!entry) {
return res.status(404).json({ error: 'not found' });
}
res.json(entry);
});
module.exports = { router };

View File

@ -0,0 +1,84 @@
const config = require('../../config/app');
const logger = require('../utils/logger');
const migrate = require('../storage/migrate');
const { createHttpJsonConnector } = require('../connectors/httpJsonConnector');
const { createHl7TcpConnector } = require('../connectors/hl7TcpConnector');
const { createAstmSerialConnector } = require('../connectors/astmSerialConnector');
const { processMessage } = require('./pipeline');
const { startWorker, stopWorker } = require('./worker');
const instrumentConfig = require('../domain/instrumentConfig');
const { createHttpServer } = require('./http');
const connectorFactories = {
'http-json': createHttpJsonConnector,
'hl7-tcp': createHl7TcpConnector,
'astm-serial': createAstmSerialConnector
};
function buildConnectors() {
return instrumentConfig.list()
.filter((entry) => entry.enabled)
.map((entry) => {
const createConnector = connectorFactories[entry.connector];
if (!createConnector) {
logger.warn({ connector: entry.connector, instrument_id: entry.instrument_id }, 'unknown connector in instrument config, skipping startup');
return null;
}
return createConnector({
...(entry.connectorConfig || {}),
instrument_id: entry.instrument_id
});
})
.filter(Boolean);
}
function attachConnectorHandlers(connectors) {
connectors.forEach((connector) => {
connector.onMessage(async (incoming) => {
try {
const payload = incoming && Object.prototype.hasOwnProperty.call(incoming, 'payload')
? incoming.payload
: incoming;
const context = incoming && incoming.context ? incoming.context : {};
await processMessage(connector.name(), payload, context);
} catch (err) {
logger.error({ err: err.message, connector: connector.name() }, 'pipeline error');
}
});
connector.onError((err) => {
logger.error({ err: err.message }, `${connector.name()} emitted error`);
});
});
}
async function start() {
instrumentConfig.validateAndLoadInstrumentConfigs();
await migrate();
await instrumentConfig.init();
const connectors = buildConnectors();
if (!connectors.length) {
logger.warn('no enabled connectors configured, ingestion listeners are disabled');
}
attachConnectorHandlers(connectors);
await Promise.all(connectors.map((connector) => connector.start()));
await startWorker();
const app = createHttpServer(connectors);
const server = app.listen(config.healthPort, () => {
logger.info({ port: config.healthPort }, 'health server ready');
});
async function shutdown() {
await stopWorker();
await Promise.all(connectors.map((connector) => connector.stop()));
if (server) {
await new Promise((resolve) => server.close(resolve));
}
}
return { connectors, server, shutdown };
}
module.exports = { start };

View File

@ -1,7 +1,53 @@
const express = require('express');
const queue = require('../queue/sqliteQueue');
const queue = require('./queue');
const instrumentConfig = require('../domain/instrumentConfig');
const router = express.Router();
function createHealthRouter(connectors = []) {
const router = express.Router();
router.get('/', async (req, res) => {
const connectorStatuses = connectors.map((connector) => connector.health());
const pending = await queue.pendingCount();
const retrying = await queue.retryingCount();
const deadLetters = await queue.deadLetterCount();
res.json({
status: 'ok',
connectors: connectorStatuses,
metrics: {
pending,
retrying,
deadLetters
}
});
});
router.get('/ready', async (req, res) => {
try {
await queue.ping();
res.json({ status: 'ready' });
} catch (err) {
res.status(503).json({ status: 'unready', reason: err.message });
}
});
return router;
}
const instrumentRouter = express.Router();
instrumentRouter.get('/', async (req, res) => {
res.json(instrumentConfig.list());
});
instrumentRouter.get('/:id', async (req, res) => {
const entry = instrumentConfig.get(req.params.id);
if (!entry) {
return res.status(404).json({ error: 'not found' });
}
res.json(entry);
});
const metricsRouter = express.Router();
function formatMetric(name, value, type = 'gauge', help = '') {
const lines = [];
@ -13,7 +59,7 @@ function formatMetric(name, value, type = 'gauge', help = '') {
return lines.join('\n');
}
router.get('/', async (req, res) => {
metricsRouter.get('/', async (req, res) => {
try {
const pending = await queue.pendingCount();
const retrying = await queue.retryingCount();
@ -37,4 +83,12 @@ router.get('/', async (req, res) => {
}
});
module.exports = router;
function createHttpServer(connectors) {
const app = express();
app.use('/health', createHealthRouter(connectors));
app.use('/instruments', instrumentRouter);
app.use('/metrics', metricsRouter);
return app;
}
module.exports = { createHttpServer };

View File

@ -1,32 +1,21 @@
const queue = require('../queue/sqliteQueue');
const queue = require('./queue');
const logger = require('../utils/logger');
const { normalize } = require('../normalizers');
const { normalize } = require('../domain/normalizer');
const { dedupeKey } = require('../utils/hash');
const instrumentService = require('../instrumentConfig/service');
const instrumentConfig = require('../domain/instrumentConfig');
const { resolveParser } = require('../domain/parsers');
const translator = require('../domain/translator');
const parserMap = {
'http-json': require('../parsers/httpParser'),
'hl7-tcp': require('../parsers/hl7Parser'),
'astm-serial': require('../parsers/astmParser'),
hl7: require('../parsers/hl7Parser'),
astm: require('../parsers/astmParser'),
http: require('../parsers/httpParser')
};
function resolveParser(connector, instrumentEntry) {
const parserName = instrumentEntry?.translator?.parser || connector;
const parser = parserMap[parserName];
if (!parser) {
throw new Error(`no parser registered for ${parserName}`);
}
return parser;
function translatePayload(entry, parsedPayload, connector) {
const engineName = entry?.translator?.engine || entry?.translator?.name;
return translator.translate(entry, parsedPayload, connector, engineName);
}
async function processMessage(connector, rawPayload, context = {}) {
const rawRecord = await queue.insertRaw(connector, rawPayload);
const rawId = rawRecord?.lastID;
try {
const matcher = instrumentService.resolveForMessage(connector, context);
const matcher = instrumentConfig.resolveForMessage(connector, context);
if (matcher.status === 'no_match') {
logger.warn({ connector, context }, 'no matching instrument config, dropping payload');
await queue.markRawParsed(rawId, 'dropped', 'no matching instrument config');
@ -41,7 +30,9 @@ async function processMessage(connector, rawPayload, context = {}) {
const instrumentEntry = matcher.entry;
const parser = resolveParser(connector, instrumentEntry);
const parsed = await parser.parse(rawPayload);
const translated = instrumentService.applyTranslator(instrumentEntry, parsed, connector);
const translated = parser.translate
? parser.translate(instrumentEntry, parsed, connector)
: translatePayload(instrumentEntry, parsed, connector);
const canonical = normalize(translated);
const dedupe = dedupeKey(canonical);
const inserted = await queue.insertOutbox(canonical, dedupe);

View File

@ -1,5 +1,5 @@
const queue = require('../queue/sqliteQueue');
const client = require('../client/clqmsClient');
const queue = require('./queue');
const client = require('./client');
const logger = require('../utils/logger');
const config = require('../../config/app');

View File

@ -1,7 +1,7 @@
const {
InstrumentConfigValidationError,
validateAndLoadInstrumentConfigs
} = require('../instrumentConfig/validator');
} = require('../domain/instrumentConfig');
function main() {
try {

View File

@ -1,9 +0,0 @@
const { validateAndLoadInstrumentConfigs } = require('../instrumentConfig/validator');
class InstrumentConfigFileStore {
async list() {
return validateAndLoadInstrumentConfigs();
}
}
module.exports = new InstrumentConfigFileStore();

View File

@ -1,31 +0,0 @@
const DatabaseClient = require('./db');
const config = require('../../config/app');
class InstrumentConfigStore {
constructor() {
this.db = new DatabaseClient(config.db);
}
async list() {
return this.db.all(`SELECT instrument_id, connector, enabled, config FROM instrument_config`);
}
async get(instrumentId) {
return this.db.get(
`SELECT instrument_id, connector, enabled, config FROM instrument_config WHERE instrument_id = ?`,
[instrumentId]
);
}
async upsert({ instrument_id, connector, enabled = 1, config: cfg = {} }) {
const payload = JSON.stringify(cfg);
await this.db.run(
`INSERT INTO instrument_config (instrument_id, connector, enabled, config) VALUES (?, ?, ?, ?)
ON CONFLICT(instrument_id) DO UPDATE SET connector = excluded.connector, enabled = excluded.enabled, config = excluded.config`,
[instrument_id, connector, Number(enabled), payload]
);
return this.get(instrument_id);
}
}
module.exports = new InstrumentConfigStore();

15
package-lock.json generated
View File

@ -5,7 +5,7 @@
"requires": true,
"packages": {
"": {
"name": "pandalink",
"name": "tinylink",
"version": "1.0.0",
"license": "ISC",
"dependencies": {
@ -1577,19 +1577,6 @@
"url": "https://opencollective.com/express"
}
},
"node_modules/picomatch": {
"version": "4.0.4",
"resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.4.tgz",
"integrity": "sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A==",
"license": "MIT",
"optional": true,
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/jonschlinkert"
}
},
"node_modules/pino": {
"version": "10.3.1",
"resolved": "https://registry.npmjs.org/pino/-/pino-10.3.1.tgz",