diff --git a/AGENTS.md b/AGENTS.md index bcb6484..205927f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,151 +1,22 @@ # AGENTS -## Mission & Audience -- This document lives at the root so every agentic helper knows how to make, run, and reason about the middleware. -- Refer back to `docs/workstation_plan.md` for the architectural story, expected flows, and the canonical payload contract before touching new features. -- Preserve the operational stability that the SQLite queue + delivery worker already provides; avoid accidental schema drift or config leaks. -- Tailor every change to the Node 20+ CommonJS ecosystem and the SQLite-backed persistence layer this repo already embraces. +## Build, Run, Test +- Install dependencies: `npm install` (uses committed `package-lock.json`). +- Start service: `npm start` (runs `node core/index.js`). +- Apply schema: `npm run migrate` (runs `core/maintenance/migrate.js`, reads `core/maintenance/schema.sql`). +- Maintenance CLI: `npm run maintenance -- ` (runs `core/maintenance/maintenance.js`). +- Parser smoke test: `npm test` (runs `node core/parsers.test.js`). +- Instrument config check: `npm run instrument:check` (runs `core/config/instrumentCheck.js`). -## Command Reference +## Entry Points & Data Flow +- Main runtime entrypoint is `core/index.js` (bootstraps app + shutdown hooks). +- Runtime config is read from `config/app.yaml` via `core/config.js` (host + instruments). +- SQLite schema source of truth is `core/maintenance/schema.sql` and is applied by `core/maintenance/migrate.js`. +- Runtime database file is `data/workstation.sqlite` (keep it out of git). -### Install & Bootstrapping -- `npm install` populates `node_modules` (no lockfile generation beyond the committed `package-lock.json`). -- `npm start` is the go-to run command; it migrates the database, primes the instrument cache, spins up connectors, and starts the delivery worker plus health/metrics services. -- `npm run migrate` runs `middleware/src/storage/migrate.js` on demand; use it before seeding schema migrations in new environments or CI jobs. +## Config + Environment +- CLQMS auth is `host.apikey` in `config/app.yaml`; avoid committing real tokens. +- `config/app.yaml` contains instrument definitions; onboarding is file-driven. -### Maintenance & Database Care -- `npm run maintenance -- backup` copies `middleware/data/workstation.sqlite` to `workstation.sqlite.bak-`; this file should stay in place and not be committed or removed. -- `npm run maintenance -- vacuum` runs SQLite's `VACUUM` via `middleware/src/scripts/maintenance.js` and logs success/failure to stdout/stderr. -- `npm run maintenance -- prune --days=` deletes `delivery_log` entries older than `` days; default is 30 if `--days` is omitted. - -### Testing & Single-Test Command -- `npm test` executes `node middleware/test/parsers.test.js` and serves as the allowable smoke check until a richer test harness exists. -- To rerun the single parser suite manually, target `node middleware/test/parsers.test.js` directly; it logs success via `console.log` and exits non-zero on failure. - -## Environment & Secrets -- Node 20+ is assumed because the code uses optional chaining, `String.raw`, and other modern primitives; keep the same runtime for development and CI. -- All ports, DB paths, and CLQMS credentials are sourced from `middleware/config/app.yaml` (loaded by `middleware/config/default.js`) as the single runtime config file. -- Treat `CLQMS_TOKEN`, database files, and other secrets as environment-provided values; never embed them in checked-in files. -- `middleware/data/workstation.sqlite` is the runtime database. Don’t delete or reinitialize it from the repository tree unless part of an explicit migration/backup operation. - -## Observability Endpoints -- `/health` returns connector statuses plus pending/retrying/dead-letter counts from `middleware/src/routes/health.js`. -- `/health/ready` pings the SQLite queue; any failure there should log an error and respond with `503` per the existing route logic. -- `/metrics` exposes Prometheus-style gauges/counters that read straight from `queue/sqliteQueue`; keep the plaintext format exactly as defined so Prometheus scrapers don't break. -- Health and metrics routers are mounted on `middleware/src/index.js` at ports declared in the config, so any addition should remain consistent with Express middleware ordering. - -## Delivery Runbook & Retry Behavior -- Backoff: `30s -> 2m -> 10m -> 30m -> 2h -> 6h`, max 10 attempts as defined in `config.retries.schedule`. The worker taps `buildNextAttempt` in `deliveryWorker.js` to honor this array. -- Retry transient failures (timeouts, DNS/connection, HTTP 5xx); skip HTTP 400/422 or validation errors and ship those payloads immediately to `dead_letter` with the response body. -- After max attempts move the canonical payload to `dead_letter` with the final error message so postmortem tooling can surface the failure. -- `queue.recordDeliveryAttempt` accompanies every outbound delivery, so keep latency, status, and response code logging aligned with this helper. -- Duplicate detection relies on `utils/hash.dedupeKey`; keep `results` sorted and hashed consistently so deduplication stays stable. -- `deliveryWorker` marks `locked_at`/`locked_by` using `queue.claimPending` and always releases them via `queue.markOutboxStatus` to avoid worker starvation. - -## Instrument Configuration Cache -- Instrument configuration is cached in `instrumentConfig/service.js`; reloads happen on init and via `setInterval`, so mutate the cache through `service.upsert` rather than touching `store` directly. -- `service.reload` parses JSON in the `config` column, logs parsing failures with `logger.warn`, and only keeps rows that successfully parse. -- Service helpers expose `list`, `get`, and `byConnector` so connectors can fetch the subset they care about without iterating raw rows. -- Store interactions use `middleware/src/storage/instrumentConfigStore.js`, which leverages `DatabaseClient` and parameterized `ON CONFLICT` upserts; follow that pattern when extending tables. -- `instrumentService.init` must run before connectors start so `processMessage` can enforce instrument-enabled checks and connector matching. -- Always drop payloads with no enabled config or connector mismatch and mark the raw row as `dropped` so operators can trace why a message was ignored. - -## Metrics & Logging Enhancements -- `metrics.js` builds human-readable Prometheus strings via `formatMetric`; keep the helper intact when adding new metrics so type/help annotations stay formatted correctly. -- Metrics route reports pending, retrying, dead letters, delivery attempts, last success timestamp, and average latency; add new stats only when there is a clear operational need. -- Use `queue` helpers (`pendingCount`, `retryingCount`, `deadLetterCount`, `getLastSuccessTimestamp`, `getAverageLatency`, `getDeliveryAttempts`) rather than running fresh queries in routes. -- Always set the response content type to `text/plain; version=0.0.4; charset=utf-8` before returning metrics so Prometheus scrapers accept the payload. -- Health logs should cite both connectors and queue metrics so failure contexts are actionable and correlate with the operational dashboards referenced in `docs/workstation_plan.md`. -- Mask sensitive fields and avoid dumping raw payloads in logs; connectors and parsers add context objects to errors rather than full payload dumps. - -## Maintenance Checklist -- `middleware/src/scripts/maintenance.js` supports the commands `backup`, `vacuum`, and `prune --days=` (default 30); call these from CI or ops scripts when the backlog grows. -- `backup` copies the SQLite file before running migrations or schema updates so you can roll back quickly. -- `vacuum` recalculates and rebuilds the DB; wrap it in maintenance windows because it briefly locks the database. -- `prune` deletes old rows from `delivery_log`; use the same threshold as `docs/workstation_plan.md` (default 30 days) unless stakeholders approve a different retention. -- `maintenance` logging uses `console.log`/`console.error` because the script runs outside the Express app; keep those calls simple and exit with non-zero codes on failure to alert CI. -- Document every manual maintenance action in the repository README or a runbook so second-tier operators know what happened. - -## Data & Schema Source of Truth -- All schema statements live in `middleware/db/migrations/00*_*.sql`; the bootstrapper iterates over these files alphabetically via `fs.readdirSync` and `db.exec`, so keep new migrations in that folder and add them with increasing numeric prefixes. -- Table definitions include: `inbox_raw`, `outbox_result`, `delivery_log`, `instrument_config`, and `dead_letter`. An additional migration adds `locked_at` and `locked_by` to `outbox_result`. -- `middleware/src/storage/migrate.js` is idempotent; it applies every `.sql` in the migrations folder unconditionally. Avoid writing irreversible SQL (DROP, ALTER without fallback) unless you also add compensating migrations. -- `DatabaseClient` in `middleware/src/storage/db.js` wraps sqlite3 callbacks in promises; reuse its `run`, `get`, and `all` helpers to keep SQL parameterization consistent and to centralize `busyTimeout` configuration. - -## Code Style Guidelines - -### Modules, Imports, and Exports -- Prefer CommonJS `const ... = require(...)` at the top of each module; grouping local `require`s by directory depth (config, utils, domain) keeps files predictable. -- Export objects/functions via `module.exports = { ... }` or `module.exports = ` depending on whether multiple helpers are exported. -- When a file exposes a factory (connectors, queue), return named methods (`start`, `stop`, `onMessage`, `health`) to keep the bootstrapper happy. - -### Formatting & Layout -- Use two spaces for indentation and include semicolons at the end of statements; this matches existing files such as `middleware/src/utils/logger.js` and `index.js`. -- Keep line length reasonable (~100 characters) and break wrapped strings with template literals (see metric formatters) rather than concatenating with `+`. -- Prefer single quotes for strings unless interpolation or escaping makes backticks clearer. -- Keep helper functions (splitters, builders) at the top of parser modules, followed by the main exported parse function. - -### Naming Conventions -- Stick to camelCase for functions, methods, and variables (`processMessage`, `buildNextAttempt`, `messageHandler`). -- Use descriptive object properties that mirror domain terms (`instrument_id`, `result_time`, `connector`, `status`). -- Constants for configuration or retry schedules stay uppercase/lowercase as seen in `config.retries.schedule`; keep them grouped inside `config/default.js`. - -### Async Flow & Error Handling -- Embrace `async/await` everywhere; existing code rarely uses raw promises (except for wrappers like `new Promise((resolve) => ...)`). -- Wrap I/O boundaries in `try/catch` blocks and log failures with structured data via `logger.error({ err: err.message }, '...')` so Pino hooks can parse them. -- When rethrowing an error, ensure the calling context knows whether the failure is fatal (e.g., `processMessage` rethrows after queue logging). -- For connectors, propagate errors through `onError` hooks so the bootstrapper can log them consistently. - -### Logging & Diagnostics -- Always prefer `middleware/src/utils/logger.js` instead of `console.log`/`console.error` inside core services; the exception is low-level scripts like `maintenance.js` and migration runners. -- Use structured objects for context (`{ err: err.message, connector: connector.name() }`), especially around delivery failures and config reloads. -- Log positive states (start listening, health server ready) along with port numbers so the runtime state can be traced during deployment. - -### Validation & Canonical Payloads -- Use `zod` for inbound schema checks; validators already live in `middleware/src/routes/instrumentConfig.js` and `middleware/src/normalizers/index.js`. -- Always normalize parser output via `normalize(parsed)` before queue insertion to guarantee `instrument_id`, `sample_id`, `result_time`, and `results` conform to expectations. -- If `normalize` throws, let the caller log the failure and drop the payload silently after marking `inbox_raw` as `failed` to avoid partial writes. - -### Database & Queue Best Practices -- Use `DatabaseClient` for all SQL interactions; it centralizes `busyTimeout` and promise conversion and prevents sqlite3 callback spaghetti. -- Parameterize every statement with `?` placeholders (see `queue/sqliteQueue.js` and `instrumentConfigStore.js`) to avoid SQL injection hazards. -- Always mark `inbox_raw` rows as `processed`, `failed`, or `dropped` after parsing to keep operators aware of what happened. -- When marking `outbox_result` statuses, clear `locked_at/locked_by` and update `attempts`/`next_attempt_at` in one statement so watchers can rely on atomic semantics. - -### Connectors & Pipeline Contracts -- Each connector must provide `name`, `type`, `start`, `stop`, `health`, `onMessage`, and `onError` per the current implementation; keep this contract if you add new protocols. -- Keep connector internals event-driven: emit `messageHandler(payload)` and handle `.catch(errorHandler)` to ensure downstream failures get logged. -- For TCP connectors, track connections in `Set`s so `stop()` can destroy them before closing the server. -- Do not assume payload framing beyond what the current parser needs; let the parser module handle splitting text and trimming. - -### Worker & Delivery Guidelines -- The delivery worker polls the queue (`config.worker.batchSize`) and records every attempt via `queue.recordDeliveryAttempt`; add retries in the same pattern if you introduce new failure-handling logic. -- Respect the retry schedule defined in `config.retries.schedule`; `buildNextAttempt` uses `Math.min` to cap indexes, so new delays should append to `config.retries.schedule` only. -- Duplicate detection relies on `utils/hash.dedupeKey`; keep `results` sorted and hashed consistently so deduplication stays stable. -- On HTTP 400/422 responses or too many retries, move payloads to `dead_letter` and log the reason to keep operators informed. - -### Testing & Coverage Expectations -- Parser tests live in `middleware/test/parsers.test.js`; they rely on `node:assert` and deliberately simple sample payloads to avoid external dependencies. -- Add new tests by mimicking that file’s style—plain `assert.strictEqual` checks, no test framework dependencies, and `console.log` success acknowledgment. -- If you enhance the test surface, keep it runnable via `npm test` so agents and CI scripts can still rely on a single command line. - -### Documentation & Storytelling -- Keep `docs/workstation_plan.md` in sync with architectural changes; it surfaces connector flows, phases, retry policies, and maintenance checklists that agents rely on. -- When adding routes/features, document the endpoint, request payload, and expected responses in either `docs/` or inline comments near the route. - -## Cursor & Copilot Rules -- No `.cursor/rules/` or `.cursorrules` directories are present in this repo; therefore there are no Cursor-specific constraints to copy here. -- `.github/copilot-instructions.md` is absent as well, so there are no Copilot instructions to enforce or repeat. - -## Final Notes for Agents -- Keep changes isolated to their area of responsibility; the middleware is intentionally minimal, so avoid introducing new bundlers/languages. -- Before opening PRs, rerun `npm run migrate` and `npm test` to verify schema/app coherence. -- Use environment variable overrides from `middleware/config/default.js` when running in staging/production so the same config file can stay committed. -## Additional Notes -- Never revert existing changes you did not make unless explicitly requested, since those changes were made by the user. -- If there are unrelated changes in the working tree, leave them untouched and focus on the files that matter for the ticket. -- Avoid destructive git commands (`git reset --hard`, `git checkout --`) unless the user explicitly requests them. -- If documentation updates were part of your change, add them to `docs/workstation_plan.md` or explain why the doc already covers the behavior. -- When a connector or parser handles a new instrument, double-check `instrument_config` rows to ensure the connector name matches the incoming protocol. -- The `queue` keeps `status`, `attempts`, `next_attempt_at`, and `locked_*` in sync; always update all relevant columns in a single SQL call to avoid race conditions. -- Keep the SQL schema in sync with `middleware/db/migrations`; add new migrations rather than editing existing ones when altering tables. +## Docs Worth Reading +- `docs/workstation_plan.md` describes the intended architecture, retry policy, and maintenance runbook. diff --git a/middleware/config/app.yaml b/config/app.yaml similarity index 69% rename from middleware/config/app.yaml rename to config/app.yaml index 8da07bc..acd816a 100644 --- a/middleware/config/app.yaml +++ b/config/app.yaml @@ -1,7 +1,6 @@ host: - url: http://localhost:4000/api/results + url: http://localhost/clqms01/api/results apikey: "" - port: 4001 inst1: enabled: true @@ -18,7 +17,13 @@ inst1: note: ASTM instrument over serial COM translator: parser: astm - engine: overrides + engine: template + file: config/translators/inst1.map + messages: + - HEADER + - PATIENT + - ORDER + - TERMINATOR forceInstrumentId: true meta: translator: msg1 diff --git a/config/translators/inst1.map b/config/translators/inst1.map new file mode 100644 index 0000000..07b9f73 --- /dev/null +++ b/config/translators/inst1.map @@ -0,0 +1,5 @@ +# TinyLink clean-room map template +HEADER = H|\^&|||WST^P1|||||{instrument_id}^System1||P|1|{specimen_id} +PATIENT = P|{patient_id}|{sample_id}|||{last_name}^{first_name}||{birth_date}|{sex}|||||{doctor}| +ORDER = O|1|{sample_id}||{order_tests}||||||{specimen_type}||||{tube_type}||||||||||O| +TERMINATOR = L|1|N diff --git a/middleware/src/runtime/app.js b/core/app.js similarity index 81% rename from middleware/src/runtime/app.js rename to core/app.js index 38eded8..2941a43 100644 --- a/middleware/src/runtime/app.js +++ b/core/app.js @@ -1,12 +1,12 @@ -const config = require('../../config/app'); -const logger = require('../utils/logger'); -const migrate = require('../storage/migrate'); -const { createHttpJsonConnector } = require('../connectors/httpJsonConnector'); -const { createHl7TcpConnector } = require('../connectors/hl7TcpConnector'); -const { createAstmSerialConnector } = require('../connectors/astmSerialConnector'); -const { processMessage } = require('./pipeline'); -const { startWorker, stopWorker } = require('./worker'); -const instrumentConfig = require('../domain/instrumentConfig'); +const config = require('./config/config'); +const logger = require('./logger'); +const migrate = require('./maintenance/migrate'); +const { createHttpJsonConnector } = require('./connectors/httpJson'); +const { createHl7TcpConnector } = require('./connectors/tcp'); +const { createAstmSerialConnector } = require('./connectors/serial'); +const { processMessage } = require('./pipeline/pipeline'); +const { startWorker, stopWorker } = require('./worker/worker'); +const instrumentConfig = require('./config/instrumentConfig'); const { createHttpServer } = require('./http'); const connectorFactories = { diff --git a/middleware/config/app.js b/core/config/config.js similarity index 94% rename from middleware/config/app.js rename to core/config/config.js index 2739438..ff9494d 100644 --- a/middleware/config/app.js +++ b/core/config/config.js @@ -2,7 +2,7 @@ const fs = require('fs'); const path = require('path'); const YAML = require('yaml'); -const configPath = path.join(__dirname, 'app.yaml'); +const configPath = path.join(__dirname, '..', '..', 'config', 'app.yaml'); function toInt(value, fallback) { const parsed = Number.parseInt(value, 10); @@ -28,7 +28,7 @@ function buildConfig() { return { env: host.env || 'development', db: { - path: host.db?.path || 'middleware/data/workstation.sqlite', + path: host.db?.path || 'data/workstation.sqlite', busyTimeout: toInt(host.db?.busyTimeout, 5000) }, connectors: { diff --git a/middleware/src/scripts/instrumentCheck.js b/core/config/instrumentCheck.js similarity index 93% rename from middleware/src/scripts/instrumentCheck.js rename to core/config/instrumentCheck.js index 60b473a..e63981e 100644 --- a/middleware/src/scripts/instrumentCheck.js +++ b/core/config/instrumentCheck.js @@ -1,7 +1,7 @@ const { InstrumentConfigValidationError, validateAndLoadInstrumentConfigs -} = require('../domain/instrumentConfig'); +} = require('./instrumentConfig'); function main() { try { diff --git a/middleware/src/domain/instrumentConfig.js b/core/config/instrumentConfig.js similarity index 84% rename from middleware/src/domain/instrumentConfig.js rename to core/config/instrumentConfig.js index 9d500fe..acd1d30 100644 --- a/middleware/src/domain/instrumentConfig.js +++ b/core/config/instrumentConfig.js @@ -1,10 +1,27 @@ const fs = require('fs'); -const config = require('../../config/app'); -const logger = require('../utils/logger'); +const path = require('path'); +const config = require('./config'); +const logger = require('../logger'); let cache = new Map(); let refreshInterval; +function resolveTranslatorFilePath(filePath, configFilePath) { + if (!filePath || typeof filePath !== 'string') return ''; + if (path.isAbsolute(filePath)) return filePath; + + const candidates = [ + path.resolve(process.cwd(), filePath) + ]; + + if (configFilePath) { + candidates.push(path.resolve(path.dirname(configFilePath), filePath)); + } + + const matched = candidates.find((candidate) => fs.existsSync(candidate)); + return matched || candidates[0]; +} + function normalizeConnectorType(type) { const value = String(type || '').trim().toLowerCase(); if (value === 'serial' || value === 'astm-serial') return 'astm-serial'; @@ -14,13 +31,6 @@ function normalizeConnectorType(type) { return ''; } -function defaultParserForConnector(connector) { - if (connector === 'astm-serial') return 'astm'; - if (connector === 'hl7-tcp') return 'hl7'; - if (connector === 'http-json') return 'http-json'; - return ''; -} - function toEntityRows(entities = []) { return entities.map((entity) => { const connector = entity.connector && typeof entity.connector === 'object' ? entity.connector : {}; @@ -98,20 +108,30 @@ function validateAndLoadInstrumentConfigs({ } if (!translator || typeof translator !== 'object' || Array.isArray(translator)) { - item.translator = { parser: defaultParserForConnector(connector) }; + item.translator = {}; } const resolvedTranslator = item.translator; - if (!resolvedTranslator.parser || typeof resolvedTranslator.parser !== 'string') { - errors.push(`${label}: translator.parser is required`); - continue; - } if (resolvedTranslator.engine && typeof resolvedTranslator.engine !== 'string') { errors.push(`${label}: translator.engine must be a string`); continue; } + const translatorEngine = String(resolvedTranslator.engine || 'overrides').trim().toLowerCase(); + if (translatorEngine === 'template') { + if (!resolvedTranslator.file || typeof resolvedTranslator.file !== 'string') { + errors.push(`${label}: translator.file is required when translator.engine=template`); + continue; + } + const resolvedTranslatorFilePath = resolveTranslatorFilePath(resolvedTranslator.file, configFilePath); + if (!fs.existsSync(resolvedTranslatorFilePath)) { + errors.push(`${label}: translator.file not found: ${resolvedTranslator.file}`); + continue; + } + resolvedTranslator.resolvedFile = resolvedTranslatorFilePath; + } + const connectorConfig = item.connectorConfig && typeof item.connectorConfig === 'object' ? item.connectorConfig : {}; @@ -142,7 +162,8 @@ function validateAndLoadInstrumentConfigs({ translator: resolvedTranslator, connectorConfig, files: { - config: configFilePath + config: configFilePath, + translator: resolvedTranslator.resolvedFile || null } }); } diff --git a/middleware/src/connectors/httpJsonConnector.js b/core/connectors/httpJson.js similarity index 95% rename from middleware/src/connectors/httpJsonConnector.js rename to core/connectors/httpJson.js index b1627f1..18b63dc 100644 --- a/middleware/src/connectors/httpJsonConnector.js +++ b/core/connectors/httpJson.js @@ -1,6 +1,6 @@ const express = require('express'); -const config = require('../../config/app'); -const logger = require('../utils/logger'); +const config = require('../config/config'); +const logger = require('../logger'); function createHttpJsonConnector(options = {}) { let server; diff --git a/middleware/src/connectors/astmSerialConnector.js b/core/connectors/serial.js similarity index 97% rename from middleware/src/connectors/astmSerialConnector.js rename to core/connectors/serial.js index 515f8aa..3b3ad16 100644 --- a/middleware/src/connectors/astmSerialConnector.js +++ b/core/connectors/serial.js @@ -1,6 +1,6 @@ const { SerialPort } = require('serialport'); -const config = require('../../config/app'); -const logger = require('../utils/logger'); +const config = require('../config/config'); +const logger = require('../logger'); function createAstmSerialConnector(options = {}) { let port; diff --git a/middleware/src/connectors/hl7TcpConnector.js b/core/connectors/tcp.js similarity index 83% rename from middleware/src/connectors/hl7TcpConnector.js rename to core/connectors/tcp.js index a4b6c57..748cf69 100644 --- a/middleware/src/connectors/hl7TcpConnector.js +++ b/core/connectors/tcp.js @@ -1,6 +1,6 @@ const net = require('net'); -const config = require('../../config/app'); -const logger = require('../utils/logger'); +const config = require('../config/config'); +const logger = require('../logger'); function createHl7TcpConnector(options = {}) { let server; @@ -16,14 +16,14 @@ function createHl7TcpConnector(options = {}) { if (!payload) return; messageHandler({ payload, - context: { - connector: 'hl7-tcp', - instrument_id: instrumentId, - remoteAddress: socket.remoteAddress, - remotePort: socket.remotePort, - localPort: socket.localPort || port - } - }).catch(errorHandler); + context: { + connector: 'hl7-tcp', + instrument_id: instrumentId, + remoteAddress: socket.remoteAddress, + remotePort: socket.remotePort, + localPort: socket.localPort || port + } + }).catch(errorHandler); } function attach(socket) { diff --git a/middleware/src/runtime/http.js b/core/http.js similarity index 96% rename from middleware/src/runtime/http.js rename to core/http.js index 535de6a..942be83 100644 --- a/middleware/src/runtime/http.js +++ b/core/http.js @@ -1,6 +1,6 @@ const express = require('express'); -const queue = require('./queue'); -const instrumentConfig = require('../domain/instrumentConfig'); +const queue = require('./queue/queue'); +const instrumentConfig = require('./config/instrumentConfig'); function createHealthRouter(connectors = []) { const router = express.Router(); diff --git a/middleware/src/index.js b/core/index.js similarity index 84% rename from middleware/src/index.js rename to core/index.js index 2da24d4..68f385c 100644 --- a/middleware/src/index.js +++ b/core/index.js @@ -1,5 +1,5 @@ -const logger = require('./utils/logger'); -const { start } = require('./runtime/app'); +const logger = require('./logger'); +const { start } = require('./app'); async function bootstrap() { const { shutdown } = await start(); diff --git a/middleware/src/utils/logger.js b/core/logger.js similarity index 81% rename from middleware/src/utils/logger.js rename to core/logger.js index 8b10802..3bee495 100644 --- a/middleware/src/utils/logger.js +++ b/core/logger.js @@ -1,5 +1,5 @@ const pino = require('pino'); -const config = require('../../config/app'); +const config = require('./config/config'); const logger = pino({ level: process.env.LOG_LEVEL || 'info', diff --git a/middleware/src/scripts/maintenance.js b/core/maintenance/maintenance.js similarity index 95% rename from middleware/src/scripts/maintenance.js rename to core/maintenance/maintenance.js index 5aaf219..2fb9f87 100644 --- a/middleware/src/scripts/maintenance.js +++ b/core/maintenance/maintenance.js @@ -1,7 +1,6 @@ const fs = require('fs'); -const path = require('path'); const sqlite3 = require('sqlite3'); -const config = require('../../config/app'); +const config = require('../config/config'); const dbPath = config.db.path; diff --git a/core/maintenance/migrate.js b/core/maintenance/migrate.js new file mode 100644 index 0000000..cb2d31c --- /dev/null +++ b/core/maintenance/migrate.js @@ -0,0 +1,23 @@ +const fs = require('fs'); +const path = require('path'); +const DatabaseClient = require('../queue/db'); +const config = require('../config/config'); + +async function migrate() { + const db = new DatabaseClient(config.db); + const schemaPath = path.join(__dirname, 'schema.sql'); + const payload = fs.readFileSync(schemaPath, 'utf8'); + await db.exec(payload); + await db.close(); +} + +if (require.main === module) { + migrate() + .then(() => console.log('migrations applied')) + .catch((err) => { + console.error('migration failed', err); + process.exit(1); + }); +} + +module.exports = migrate; diff --git a/middleware/db/migrations/001_init.sql b/core/maintenance/schema.sql similarity index 93% rename from middleware/db/migrations/001_init.sql rename to core/maintenance/schema.sql index 086f1a9..6610aaa 100644 --- a/middleware/db/migrations/001_init.sql +++ b/core/maintenance/schema.sql @@ -16,7 +16,9 @@ CREATE TABLE IF NOT EXISTS outbox_result ( attempts INTEGER NOT NULL DEFAULT 0, next_attempt_at INTEGER NOT NULL DEFAULT 0, last_error TEXT NULL, - created_at TEXT NOT NULL DEFAULT (datetime('now')) + created_at TEXT NOT NULL DEFAULT (datetime('now')), + locked_at INTEGER NULL, + locked_by TEXT NULL ); CREATE UNIQUE INDEX IF NOT EXISTS idx_outbox_result_dedupe_key diff --git a/middleware/src/utils/hash.js b/core/pipeline/hash.js similarity index 100% rename from middleware/src/utils/hash.js rename to core/pipeline/hash.js diff --git a/middleware/src/domain/normalizer.js b/core/pipeline/normalizer.js similarity index 100% rename from middleware/src/domain/normalizer.js rename to core/pipeline/normalizer.js diff --git a/middleware/src/runtime/pipeline.js b/core/pipeline/pipeline.js similarity index 71% rename from middleware/src/runtime/pipeline.js rename to core/pipeline/pipeline.js index b5e917f..a95fe3f 100644 --- a/middleware/src/runtime/pipeline.js +++ b/core/pipeline/pipeline.js @@ -1,10 +1,9 @@ -const queue = require('./queue'); -const logger = require('../utils/logger'); -const { normalize } = require('../domain/normalizer'); -const { dedupeKey } = require('../utils/hash'); -const instrumentConfig = require('../domain/instrumentConfig'); -const { resolveParser } = require('../domain/parsers'); -const translator = require('../domain/translator'); +const queue = require('../queue/queue'); +const logger = require('../logger'); +const { normalize } = require('./normalizer'); +const { dedupeKey } = require('./hash'); +const instrumentConfig = require('../config/instrumentConfig'); +const translator = require('./translator'); function translatePayload(entry, parsedPayload, connector) { const engineName = entry?.translator?.engine || entry?.translator?.name; @@ -28,11 +27,19 @@ async function processMessage(connector, rawPayload, context = {}) { } const instrumentEntry = matcher.entry; - const parser = resolveParser(connector, instrumentEntry); - const parsed = await parser.parse(rawPayload); - const translated = parser.translate - ? parser.translate(instrumentEntry, parsed, connector) - : translatePayload(instrumentEntry, parsed, connector); + const translated = translatePayload(instrumentEntry, { + instrument_id: instrumentEntry.instrument_id, + sample_id: String(context.sample_id || `raw-${Date.now()}`), + result_time: new Date().toISOString(), + results: [{ + test_code: 'RAW', + value: String(rawPayload) + }], + meta: { + raw_payload: rawPayload, + context + } + }, connector); const canonical = normalize(translated); const dedupe = dedupeKey(canonical); const inserted = await queue.insertOutbox(canonical, dedupe); diff --git a/core/pipeline/translator.js b/core/pipeline/translator.js new file mode 100644 index 0000000..080c9fd --- /dev/null +++ b/core/pipeline/translator.js @@ -0,0 +1,195 @@ +const fs = require('fs'); +const path = require('path'); + +const mapCache = new Map(); + +function buildCanonical(entry, parsedPayload, connector) { + const translator = entry && typeof entry.translator === 'object' ? entry.translator : {}; + const canonical = { ...parsedPayload }; + if (translator.forceInstrumentId !== false) { + canonical.instrument_id = entry.instrument_id; + } + canonical.meta = { + ...(parsedPayload.meta || {}), + ...(translator.meta && typeof translator.meta === 'object' ? translator.meta : {}), + connector, + instrument_config: entry.config + }; + return canonical; +} + +function resolveTranslatorFilePath(filePath, configFilePath) { + if (!filePath || typeof filePath !== 'string') return ''; + if (path.isAbsolute(filePath)) return filePath; + + const candidates = [ + path.resolve(process.cwd(), filePath) + ]; + + if (configFilePath) { + candidates.push(path.resolve(path.dirname(configFilePath), filePath)); + } + + const matched = candidates.find((candidate) => fs.existsSync(candidate)); + return matched || candidates[0]; +} + +function parseMapFile(fileContent, filePath) { + const lines = fileContent.split(/\r?\n/); + const rows = new Map(); + + lines.forEach((line, index) => { + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith('#')) return; + + const separator = line.indexOf('='); + if (separator < 0) { + throw new Error(`${filePath}:${index + 1} invalid mapping line (expected KEY = value)`); + } + + const key = line.slice(0, separator).trim(); + const value = line.slice(separator + 1).trim(); + + if (!key) { + throw new Error(`${filePath}:${index + 1} mapping key is required`); + } + + rows.set(key, value); + }); + + return rows; +} + +function loadMapFile(filePath) { + const stat = fs.statSync(filePath); + const cached = mapCache.get(filePath); + if (cached && cached.mtimeMs === stat.mtimeMs) { + return cached.rows; + } + + const content = fs.readFileSync(filePath, 'utf8'); + const rows = parseMapFile(content, filePath); + mapCache.set(filePath, { mtimeMs: stat.mtimeMs, rows }); + return rows; +} + +function getPlaceholderValue(name, context) { + if (Object.hasOwn(context.flat, name)) { + return context.flat[name]; + } + + if (!name.includes('.')) { + return ''; + } + + const parts = name.split('.').filter(Boolean); + let current = context.root; + + for (let i = 0; i < parts.length; i += 1) { + if (!current || typeof current !== 'object') return ''; + current = current[parts[i]]; + } + + return current === undefined || current === null ? '' : current; +} + +function renderTemplate(template, context) { + return String(template).replace(/\{([^{}]+)\}/g, (_, rawName) => { + const name = String(rawName || '').trim(); + if (!name) return ''; + const value = getPlaceholderValue(name, context); + return value === undefined || value === null ? '' : String(value); + }); +} + +function buildTemplateContext(entry, parsedPayload, connector) { + const root = { + ...parsedPayload, + instrument_id: parsedPayload.instrument_id || entry.instrument_id, + connector, + config: entry.config || {}, + meta: parsedPayload.meta || {} + }; + + const flat = { + ...root, + ...(root.meta && typeof root.meta === 'object' ? root.meta : {}), + ...(root.config && typeof root.config === 'object' ? root.config : {}) + }; + + if (Array.isArray(parsedPayload.results)) { + flat.order_tests = parsedPayload.results + .map((item) => item && item.test_code) + .filter(Boolean) + .map((testCode) => `^^^${testCode}`) + .join('\\'); + } + + return { root, flat }; +} + +function translateOverrides(entry, parsedPayload, connector) { + const translator = entry && typeof entry.translator === 'object' ? entry.translator : {}; + const overrides = translator.overrides && typeof translator.overrides === 'object' + ? translator.overrides + : {}; + const canonical = buildCanonical(entry, { ...parsedPayload, ...overrides }, connector); + return canonical; +} + +function translateTemplate(entry, parsedPayload, connector) { + const translator = entry && typeof entry.translator === 'object' ? entry.translator : {}; + if (!translator.file || typeof translator.file !== 'string') { + throw new Error('translator.file is required for template engine'); + } + + const resolvedFilePath = resolveTranslatorFilePath(translator.file, entry?.files?.config); + if (!fs.existsSync(resolvedFilePath)) { + throw new Error(`translator file not found: ${translator.file}`); + } + + const mapRows = loadMapFile(resolvedFilePath); + const messageKeys = Array.isArray(translator.messages) && translator.messages.length + ? translator.messages.map((value) => String(value)) + : Array.from(mapRows.keys()); + const context = buildTemplateContext(entry, parsedPayload, connector); + const renderedMessages = messageKeys.map((messageKey) => { + if (!mapRows.has(messageKey)) { + throw new Error(`translator message key not found in map file: ${messageKey}`); + } + return { + key: messageKey, + body: renderTemplate(mapRows.get(messageKey), context) + }; + }); + + const canonical = buildCanonical(entry, parsedPayload, connector); + canonical.meta.rendered_messages = renderedMessages; + canonical.meta.translator_file = resolvedFilePath; + return canonical; +} + +const registry = new Map([ + ['overrides', { translate: translateOverrides }], + ['template', { translate: translateTemplate }] +]); + +function resolve(name) { + if (!name) return registry.get('overrides'); + const key = String(name).trim().toLowerCase(); + return registry.get(key) || null; +} + +function translate(entry, parsedPayload, connector, engineName) { + const engine = resolve(engineName); + if (!engine) { + const options = engineName ? ` (requested: ${engineName})` : ''; + throw new Error(`translator engine not found${options}`); + } + return engine.translate(entry, parsedPayload, connector); +} + +module.exports = { + resolve, + translate +}; diff --git a/middleware/src/storage/db.js b/core/queue/db.js similarity index 97% rename from middleware/src/storage/db.js rename to core/queue/db.js index c04a151..bd26b69 100644 --- a/middleware/src/storage/db.js +++ b/core/queue/db.js @@ -1,5 +1,4 @@ const sqlite3 = require('sqlite3'); -const { promisify } = require('util'); class DatabaseClient { constructor({ path, busyTimeout = 5000 }) { diff --git a/middleware/src/runtime/queue.js b/core/queue/queue.js similarity index 95% rename from middleware/src/runtime/queue.js rename to core/queue/queue.js index 33d9a95..e50b825 100644 --- a/middleware/src/runtime/queue.js +++ b/core/queue/queue.js @@ -1,6 +1,5 @@ -const crypto = require('crypto'); -const DatabaseClient = require('../storage/db'); -const config = require('../../config/app'); +const DatabaseClient = require('./db'); +const config = require('../config/config'); class SqliteQueue { constructor() { @@ -94,14 +93,14 @@ class SqliteQueue { async recordDeliveryAttempt({ outboxId, attempt, status, responseCode, responseBody, latency }) { await this.db.run( - `INSERT INTO delivery_log (outbox_id, attempt, status, response_code, response_body, latency_ms) VALUES (?, ?, ?, ?, ?, ?)` , + `INSERT INTO delivery_log (outbox_id, attempt, status, response_code, response_body, latency_ms) VALUES (?, ?, ?, ?, ?, ?)`, [outboxId, attempt, status, responseCode, responseBody, latency] ); } async moveToDeadLetter(payload, reason) { await this.db.run( - `INSERT INTO dead_letter (payload, reason) VALUES (?, ?)` , + `INSERT INTO dead_letter (payload, reason) VALUES (?, ?)`, [this._serial(payload), reason] ); } diff --git a/core/rawPipeline.test.js b/core/rawPipeline.test.js new file mode 100644 index 0000000..45cce54 --- /dev/null +++ b/core/rawPipeline.test.js @@ -0,0 +1,65 @@ +const assert = require('node:assert'); +const fs = require('fs'); +const os = require('os'); +const path = require('path'); +const translator = require('./pipeline/translator'); +const { normalize } = require('./pipeline/normalizer'); + +function run() { + const entry = { + instrument_id: 'inst1', + config: { location: 'default-lab' }, + translator: { + engine: 'overrides', + forceInstrumentId: true, + meta: { direction: 'mono' } + } + }; + + const rawPayload = 'ABC|123'; + const canonicalSeed = { + instrument_id: entry.instrument_id, + sample_id: 'raw-1', + result_time: new Date().toISOString(), + results: [{ test_code: 'RAW', value: rawPayload }], + meta: { raw_payload: rawPayload } + }; + + const translated = translator.translate(entry, canonicalSeed, 'astm-serial', entry.translator.engine); + const normalized = normalize(translated); + + assert.strictEqual(normalized.instrument_id, 'inst1'); + assert.strictEqual(normalized.results[0].test_code, 'RAW'); + assert.strictEqual(normalized.results[0].value, rawPayload); + + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'tinylink-map-')); + const mapFilePath = path.join(tempDir, 'inst1.map'); + fs.writeFileSync(mapFilePath, [ + '# smoke test map', + 'HEADER = H|{instrument_id}|{connector}', + 'ORDER = O|{sample_id}|{order_tests}' + ].join('\n')); + + const templateEntry = { + ...entry, + files: { config: path.join(process.cwd(), 'config', 'app.yaml') }, + translator: { + engine: 'template', + file: mapFilePath, + messages: ['HEADER', 'ORDER'], + forceInstrumentId: true + } + }; + + const templateTranslated = translator.translate(templateEntry, canonicalSeed, 'astm-serial', templateEntry.translator.engine); + const templateNormalized = normalize(templateTranslated); + + assert.strictEqual(templateNormalized.meta.rendered_messages.length, 2); + assert.strictEqual(templateNormalized.meta.rendered_messages[0].body, 'H|inst1|astm-serial'); + assert.strictEqual(templateNormalized.meta.rendered_messages[1].body, 'O|raw-1|^^^RAW'); + + fs.rmSync(tempDir, { recursive: true, force: true }); + console.log('Raw pipeline smoke test passed'); +} + +run(); diff --git a/middleware/src/runtime/client.js b/core/worker/client.js similarity index 93% rename from middleware/src/runtime/client.js rename to core/worker/client.js index 546b799..bdf7a62 100644 --- a/middleware/src/runtime/client.js +++ b/core/worker/client.js @@ -1,5 +1,5 @@ const { request } = require('undici'); -const config = require('../../config/app'); +const config = require('../config/config'); async function deliver(payload) { const body = JSON.stringify(payload); diff --git a/middleware/src/runtime/worker.js b/core/worker/worker.js similarity index 97% rename from middleware/src/runtime/worker.js rename to core/worker/worker.js index be7b9df..672592d 100644 --- a/middleware/src/runtime/worker.js +++ b/core/worker/worker.js @@ -1,7 +1,7 @@ -const queue = require('./queue'); +const queue = require('../queue/queue'); const client = require('./client'); -const logger = require('../utils/logger'); -const config = require('../../config/app'); +const logger = require('../logger'); +const config = require('../config/config'); let running = false; let workerPromise; diff --git a/docs/design.md b/docs/design.md new file mode 100644 index 0000000..acfd7e6 --- /dev/null +++ b/docs/design.md @@ -0,0 +1,68 @@ +# TinyLink Integration Design + +## Overview +TinyLink is a bi-directional integration hub between laboratory instruments and the CLQMS01 host. It receives instrument results, wraps raw payloads into a canonical envelope, translates them to JSON, and pushes them to CLQMS01. It also retrieves test requests from CLQMS01 and delivers them to instruments through download and query workflows (planned). + +## Goals +- Provide reliable, automated data exchange between instruments and CLQMS01. +- Normalize and translate messages between instrument formats and JSON for CLQMS01. +- Support both download (new requests) and query (SampleID) workflows. + +## Non-Goals +- User interface or manual data entry. +- Business rule orchestration beyond mapping and routing. +- Long-term analytics or reporting. + +## High-Level Architecture +```mermaid +flowchart LR + subgraph Instrument + INST[Instrument] + end + + subgraph App + RCV[Receiver / Raw Capture] + XLT[Translator] + ROUTE[Message Router (planned)] + POLL[Scheduler / Poller (planned)] + end + + subgraph CLQMS01 + CLQ[CLQMS01 Host] + end + + %% Result flow + INST -->|Result data| RCV --> XLT -->|JSON| CLQ + + %% Download request flow + POLL -->|Check new requests| CLQ -->|New requests| ROUTE -->|Instrument message| INST + + %% Query flow + INST -->|Query + SampleID| ROUTE -->|SampleID| CLQ -->|Ordered tests| XLT -->|Instrument response| INST +``` + +## Data Flows +- Result flow (Instrument → CLQMS01): receive instrument output, wrap raw payload into a canonical envelope, translate to JSON, send to CLQMS01. +- Download request flow (CLQMS01 → Instrument): poll CLQMS01 for new requests, map to instrument message, send to instrument. (planned) +- Query flow (Instrument → CLQMS01 → Instrument): instrument sends query with SampleID, fetch ordered tests from CLQMS01, translate to instrument response, send back. (planned) + +## Current vs Planned +- Current: result ingest → raw capture → translator → queue → delivery worker → CLQMS01. +- Planned: scheduler/poller for download requests and message router for download/query workflows. + +## Key Components +- Receiver/Raw Capture: accepts instrument output and records it as raw payload for translation. +- Translator: maps instrument fields to JSON schema and vice versa. +- Message Router: routes messages to the correct workflow and destination. (planned) +- Scheduler/Poller: periodically checks CLQMS01 for new requests. (planned) +- CLQMS01 Adapter: handles request/response and authentication to the host. + +## Reliability and Error Handling +- Retry on transient network failures. +- Log and flag translation/normalization errors with raw payload references. +- Idempotency safeguards for resends when possible. + +## Security and Configuration +- CLQMS01 host and API key stored in config. +- Instrument connection details stored in config. +- No secrets committed to source control. diff --git a/docs/example.par b/docs/example.par new file mode 100644 index 0000000..6061f65 --- /dev/null +++ b/docs/example.par @@ -0,0 +1,15 @@ +Prestige24i + +S9 ="Header_record_from_WST". +MESSAGE_I =STX,SP,1,"H|\^&|||WST^P1|||||Prestige24i^System1||P|1|",{SPEC7},CR,ETX,SP,2,CR,LF. + +S9 ="Patient_record_from_WST". +MESSAGE_D =STX,SP,1,"P|",{SPEC1},"|",{PATNUM},"|||", +{LSTNAME},"^",{FSTNAME},"||",{BIRYEAR},{BIRMONT},{BIRDAY},"|",{SEX},"|||||",{DOCTOR},"|",CR,ETX,SP,2,CR,LF. + +S9 ="Order_record_from_WST". +MESSAGE_Y =STX,SP,1,"O|1|",{IDEE},"||", 30("^^^",{CHEMNUM},"\"),{URGENT}, +"||||||",{SPEC4},"||||",{TUBTYPE},"||||||||||O|",CR,ETX,SP,2,CR,LF. + +S9 ="Terminator_record_from_WST". +MESSAGE_F =STX,SP,1,"L|1|N",CR,ETX,SP,2,CR,LF. \ No newline at end of file diff --git a/docs/user_manual.md b/docs/user_manual.md index d16bc80..3022719 100644 --- a/docs/user_manual.md +++ b/docs/user_manual.md @@ -10,7 +10,7 @@ TinyLink sits between laboratory instruments and CLQMS, then handles the heavy l - `http-json` (HTTP endpoint) - `hl7-tcp` (TCP socket) - `astm-serial` (physical serial port/COM) -- Parses incoming payloads and normalizes them into one canonical JSON format. +- Wraps incoming payloads as raw content, then normalizes them into one canonical JSON format via the translator pipeline. - Stores raw and normalized data in SQLite for durability. - Deduplicates repeated payloads using a hash key. - Sends results to CLQMS with automatic retry and backoff. @@ -21,7 +21,7 @@ Think of it as a reliable translator + traffic controller for instrument data. ## Default Ports and Endpoints -By default (from `middleware/config/app.yaml`): +By default (from `config/app.yaml`): - Instrument config + health + metrics API: `4001` (from `host.port`) - Instrument connectors are configured per instrument entity (`inst1`, `inst2`, ...) @@ -40,7 +40,7 @@ TinyLink now uses a single file-based configuration. There is no `POST /instrume To add an instrument, keep one config file in: ```text -middleware/ +core/ config/ app.yaml ``` @@ -64,19 +64,55 @@ inst1: config: location: lab-a translator: - parser: astm engine: overrides forceInstrumentId: true meta: profile: astm-default ``` +### Optional: Use per-instrument `.map` translator files + +If you want a simpler per-instrument translator file, use `translator.engine: template` with a `.map` file. + +`config/app.yaml` example: + +```yaml +inst1: + enabled: true + connector: + type: serial + port: COM1 + translator: + engine: template + file: config/translators/inst1.map + messages: [HEADER, PATIENT, ORDER, TERMINATOR] + forceInstrumentId: true +``` + +`config/translators/inst1.map` example: + +```text +# KEY = message body template +HEADER = H|\^&|||WST^P1|||||{instrument_id}^System1||P|1|{specimen_id} +PATIENT = P|{patient_id}|{sample_id}|||{last_name}^{first_name}||{birth_date}|{sex}|||||{doctor}| +ORDER = O|1|{sample_id}||{order_tests}||||||{specimen_type}||||{tube_type}||||||||||O| +TERMINATOR = L|1|N +``` + +Notes: + +- `.map` supports one `KEY = value` template per line. +- Blank lines and `#` comments are ignored. +- Placeholders use `{name}` and missing values default to empty strings. +- Keep protocol framing (`STX`, `ETX`, `CR`, `LF`) outside `.map`; use message body templates only. +- `translator.messages` controls output order. If omitted, all keys from the `.map` file are rendered in file order. + What it does: - `host` contains upstream endpoint and API settings (`url`, `apikey`, `port`). - Every top-level entity other than `host` is an instrument (`inst1`, `inst2`, ...). - `connector` is embedded per instrument, so each instrument has its own connector type/settings. -- `translator` stays embedded per instrument, so each instrument can define parser and metadata. +- `translator` stays embedded per instrument, so each instrument can define mapping rules and metadata. ### Step 2: Restart TinyLink @@ -110,7 +146,7 @@ TinyLink is strict because your audit trail deserves peace and quiet. ## Canonical Payload Shape -After parsing and normalization, TinyLink expects this shape. +After wrapping and normalization, TinyLink expects this shape. Required fields: @@ -178,8 +214,8 @@ Use these to check system status: ### Instrument returns `404` - Confirm exact `instrument_id` spelling and casing. -- Verify the instrument exists in `middleware/config/app.yaml` as its own top-level key (`inst1`, `inst2`, ...). -- Verify that instrument has `connector.type`, connector settings, and `translator.parser`. +- Verify the instrument exists in `config/app.yaml` as its own top-level key (`inst1`, `inst2`, ...). +- Verify that instrument has `connector.type`, connector settings, and `translator.engine`. ### Data is not flowing diff --git a/docs/workstation_plan.md b/docs/workstation_plan.md index e2fe791..47e3043 100644 --- a/docs/workstation_plan.md +++ b/docs/workstation_plan.md @@ -10,13 +10,13 @@ Build a lightweight Node.js service that: ## Responsibilities -- **Middleware:** connector protocols (HTTP JSON, HL7 TCP, ASTM serial), parsing/normalization, schema checks, durable queue, retries, dead-letter, logging, health endpoints. +- **Middleware:** connector protocols (HTTP JSON, HL7 TCP, ASTM serial), raw payload capture + translation/normalization, schema checks, durable queue, retries, dead-letter, logging, health endpoints. - **CLQMS:** domain validation, mapping rules, result persistence, workflow/flags/audit. ## Flow 1. Connector captures raw message and writes to `inbox_raw`. -2. Parser turns protocol text into a structured object. +2. Pipeline wraps raw payloads into a canonical envelope. 3. Normalizer maps the object to canonical JSON. 4. Payload lands in `outbox_result` as `pending`. 5. Delivery worker sends to CLQMS and logs attempts. @@ -33,10 +33,10 @@ Build a lightweight Node.js service that: ## Suggested Layout ```text -middleware/ +core/ src/ connectors/ - parsers/ + pipeline/ domain/ runtime/ storage/ @@ -130,7 +130,7 @@ Example: ## Phase 2 Completion Notes -- Instruments are provisioned from `middleware/config/app.yaml`, a single file containing `host` runtime settings and `instruments[]` entries with embedded connector, match, config, and translator settings. +- Instruments are provisioned from `config/app.yaml`, a single file containing `host` runtime settings and `instruments[]` entries with embedded connector, match, config, and translator settings. - The `/instruments` route is read-only for visibility; instrument onboarding is file-driven. - Each connector validates against loaded instrument files so only known, enabled equipment is accepted. - Deduplication now guarded by SHA-256 `dedupe_key`, and instrument metadata is carried through the pipeline. @@ -143,12 +143,12 @@ Example: ## Maintenance, Runbook & Automation -- SQLite maintenance script (`node middleware/src/scripts/maintenance.js`) supports `backup`, `vacuum`, and `prune --days=` to keep the DB performant and reproducible. +- SQLite maintenance script (`node core/maintenance/maintenance.js`) supports `backup`, `vacuum`, and `prune --days=` to keep the DB performant and reproducible. - Daily/weekly checklist: run backup before deployments, vacuum monthly, and prune `delivery_log` older than 30 days (configurable via CLI). - Incident checklist: 1) check `/health/ready`; 2) inspect `outbox_result` + `dead_letter`; 3) replay payloads with `pending` or `retrying` status; 4) rotate CLQMS token via env + restart; 5) escalate when dead letters spike or metrics show stale success timestamp. ## Testing & Validation -- Parser smoke tests under `middleware/test/parsers.test.js` verify HL7/ASTM canonical output and keep `normalize()` coverage intact. Run via `npm test`. +- Raw payload smoke tests under `core/rawPipeline.test.js` verify canonical output and keep `normalize()` coverage intact. Run via `npm test`. - Instrument config integrity check runs via `npm run instrument:check`; startup performs the same validation and fails fast on errors. - Future CI can run the same script plus `npm run migrate` ahead of any pull request to ensure schema/queue logic still applies. diff --git a/middleware/config/astm.js b/middleware/config/astm.js deleted file mode 100644 index 862468c..0000000 --- a/middleware/config/astm.js +++ /dev/null @@ -1,21 +0,0 @@ -const { parse } = require('../src/parsers/astmParser'); - -function translate(entry, parsedPayload, connector) { - const translator = entry && typeof entry.translator === 'object' ? entry.translator : {}; - const overrides = translator.overrides && typeof translator.overrides === 'object' - ? translator.overrides - : {}; - const canonical = { ...parsedPayload, ...overrides }; - if (translator.forceInstrumentId !== false) { - canonical.instrument_id = entry.instrument_id; - } - canonical.meta = { - ...(parsedPayload.meta || {}), - ...(translator.meta && typeof translator.meta === 'object' ? translator.meta : {}), - connector, - instrument_config: entry.config - }; - return canonical; -} - -module.exports = { parse, translate }; diff --git a/middleware/data/workstation.sqlite b/middleware/data/workstation.sqlite deleted file mode 100644 index 1d16709..0000000 Binary files a/middleware/data/workstation.sqlite and /dev/null differ diff --git a/middleware/db/migrations/002_outbox_locks.sql b/middleware/db/migrations/002_outbox_locks.sql deleted file mode 100644 index 6c1f1b6..0000000 --- a/middleware/db/migrations/002_outbox_locks.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE outbox_result ADD COLUMN locked_at INTEGER NULL; -ALTER TABLE outbox_result ADD COLUMN locked_by TEXT NULL; diff --git a/middleware/src/domain/parsers.js b/middleware/src/domain/parsers.js deleted file mode 100644 index a38959d..0000000 --- a/middleware/src/domain/parsers.js +++ /dev/null @@ -1,34 +0,0 @@ -const parserMap = { - 'http-json': require('../parsers/httpParser'), - 'hl7-tcp': require('../parsers/hl7Parser'), - 'astm-serial': require('../parsers/astmParser'), - hl7: require('../parsers/hl7Parser'), - astm: require('../parsers/astmParser'), - http: require('../parsers/httpParser') -}; - -function resolveCustomParser(parserName) { - if (!parserName || typeof parserName !== 'string') return null; - try { - const custom = require(`../../config/${parserName}`); - if (custom && typeof custom.parse === 'function') { - return custom; - } - } catch (error) { - return null; - } - return null; -} - -function resolveParser(connector, instrumentEntry) { - const parserName = instrumentEntry?.translator?.parser || connector; - const parser = parserMap[parserName] || resolveCustomParser(parserName); - if (!parser) { - throw new Error(`no parser registered for ${parserName}`); - } - return parser; -} - -module.exports = { - resolveParser -}; diff --git a/middleware/src/domain/translator.js b/middleware/src/domain/translator.js deleted file mode 100644 index 8b0210d..0000000 --- a/middleware/src/domain/translator.js +++ /dev/null @@ -1,41 +0,0 @@ -function translateOverrides(entry, parsedPayload, connector) { - const translator = entry && typeof entry.translator === 'object' ? entry.translator : {}; - const overrides = translator.overrides && typeof translator.overrides === 'object' - ? translator.overrides - : {}; - const canonical = { ...parsedPayload, ...overrides }; - if (translator.forceInstrumentId !== false) { - canonical.instrument_id = entry.instrument_id; - } - canonical.meta = { - ...(parsedPayload.meta || {}), - ...(translator.meta && typeof translator.meta === 'object' ? translator.meta : {}), - connector, - instrument_config: entry.config - }; - return canonical; -} - -const registry = new Map([ - ['overrides', { translate: translateOverrides }] -]); - -function resolve(name) { - if (!name) return registry.get('overrides'); - const key = String(name).trim().toLowerCase(); - return registry.get(key) || null; -} - -function translate(entry, parsedPayload, connector, engineName) { - const engine = resolve(engineName); - if (!engine) { - const options = engineName ? ` (requested: ${engineName})` : ''; - throw new Error(`translator engine not found${options}`); - } - return engine.translate(entry, parsedPayload, connector); -} - -module.exports = { - resolve, - translate -}; diff --git a/middleware/src/parsers/astmParser.js b/middleware/src/parsers/astmParser.js deleted file mode 100644 index d928fc3..0000000 --- a/middleware/src/parsers/astmParser.js +++ /dev/null @@ -1,55 +0,0 @@ -function split(line) { - return line.split('|'); -} - -function extractTestCode(raw) { - const candidates = (raw || '').split('^').filter(Boolean); - return candidates[0] || raw || 'UNKNOWN'; -} - -function buildResult(fields) { - return { - test_code: extractTestCode(fields[2]), - value: fields[3] || '0', - unit: fields[4] || undefined, - flag: fields[11] || fields[10] || undefined, - meta: { - channel: fields[1] - } - }; -} - -function parseAstm(message) { - const payload = typeof message === 'string' ? message.trim() : ''; - if (!payload) throw new Error('empty ASTM payload'); - const lines = payload.split(/\r?\n/).filter(Boolean); - const header = lines.find((line) => line.startsWith('H|')) || ''; - const order = lines.find((line) => line.startsWith('O|')) || ''; - const resultLines = lines.filter((line) => line.startsWith('R|')); - - if (!resultLines.length) { - throw new Error('no ASTM R segments'); - } - - const headerFields = header ? split(header) : []; - const orderFields = order ? split(order) : []; - const instrument_id = headerFields[3] || 'astm-instrument'; - const sample_id = orderFields[2] || `${instrument_id}-sample`; - const result_time = orderFields[13] || new Date().toISOString(); - - const results = resultLines.map((line) => buildResult(split(line))); - - return { - instrument_id: instrument_id.trim(), - sample_id: sample_id.trim(), - result_time, - results, - meta: { - source_protocol: 'ASTM', - connector: 'astm-serial', - segments: resultLines.length - } - }; -} - -module.exports = { parse: parseAstm }; diff --git a/middleware/src/parsers/hl7Parser.js b/middleware/src/parsers/hl7Parser.js deleted file mode 100644 index 35d667f..0000000 --- a/middleware/src/parsers/hl7Parser.js +++ /dev/null @@ -1,69 +0,0 @@ -function splitFields(segment) { - return segment.split('|'); -} - -function formatHl7Timestamp(value) { - if (!value) return new Date().toISOString(); - const year = value.slice(0, 4); - const month = value.slice(4, 6) || '01'; - const day = value.slice(6, 8) || '01'; - const hour = value.slice(8, 10) || '00'; - const minute = value.slice(10, 12) || '00'; - const second = value.slice(12, 14) || '00'; - if (!year || !month || !day) { - return new Date().toISOString(); - } - return `${year}-${month}-${day}T${hour}:${minute}:${second}Z`; -} - -async function parseHl7(message) { - const payload = typeof message === 'string' ? message.trim() : ''; - if (!payload) { - throw new Error('empty HL7 payload'); - } - - const segments = payload.split(/\r?\n/).filter(Boolean); - const msh = segments.find((line) => line.startsWith('MSH')); - const obr = segments.find((line) => line.startsWith('OBR')); - const obxSegments = segments.filter((line) => line.startsWith('OBX')); - - const mshFields = msh ? splitFields(msh) : []; - const obrFields = obr ? splitFields(obr) : []; - - const instrument_id = (obrFields[2] || '').split('^')[0] || 'hl7-instrument'; - const sample_id = (obrFields[3] || obrFields[2] || '').split('^')[0] || `${instrument_id}-sample`; - const result_time = formatHl7Timestamp(obrFields[7] || mshFields[6] || ''); - if (!obxSegments.length) { - throw new Error('no OBX segments in HL7 payload'); - } - - const results = obxSegments.map((segment) => { - const fields = splitFields(segment); - return { - test_code: (fields[3] || 'UNKNOWN').split('^')[0], - value: fields[5] || '0', - unit: fields[6] || undefined, - flag: fields[8] || undefined, - meta: { - observation_id: fields[3] - } - }; - }); - - const canonical = { - instrument_id: instrument_id.trim(), - sample_id: sample_id.trim(), - result_time, - results, - meta: { - source_protocol: 'HL7', - message_id: mshFields[9] || undefined, - connector: 'hl7-tcp', - obx_count: obxSegments.length - } - }; - - return canonical; -} - -module.exports = { parse: parseHl7 }; diff --git a/middleware/src/parsers/httpParser.js b/middleware/src/parsers/httpParser.js deleted file mode 100644 index a121a5c..0000000 --- a/middleware/src/parsers/httpParser.js +++ /dev/null @@ -1,8 +0,0 @@ -async function parseHttp(message) { - if (typeof message === 'string') { - return JSON.parse(message); - } - return message; -} - -module.exports = { parse: parseHttp }; diff --git a/middleware/src/storage/migrate.js b/middleware/src/storage/migrate.js deleted file mode 100644 index 00e910e..0000000 --- a/middleware/src/storage/migrate.js +++ /dev/null @@ -1,83 +0,0 @@ -const fs = require('fs'); -const path = require('path'); -const DatabaseClient = require('./db'); -const config = require('../../config/app'); - -const LOCK_MIGRATION_FILE = '002_outbox_locks.sql'; - -async function ensureMigrationTable(db) { - await db.exec(` - CREATE TABLE IF NOT EXISTS schema_migrations ( - filename TEXT PRIMARY KEY, - applied_at TEXT NOT NULL DEFAULT (datetime('now')) - ); - `); -} - -async function migrationIsApplied(db, filename) { - const row = await db.get( - 'SELECT filename FROM schema_migrations WHERE filename = ? LIMIT 1', - [filename] - ); - return Boolean(row); -} - -async function markMigrationApplied(db, filename) { - await db.run( - 'INSERT OR IGNORE INTO schema_migrations (filename) VALUES (?)', - [filename] - ); -} - -async function applyOutboxLockMigration(db) { - const columns = await db.all('PRAGMA table_info(outbox_result)'); - const columnNames = new Set(columns.map((column) => column.name)); - - if (!columnNames.has('locked_at')) { - await db.exec('ALTER TABLE outbox_result ADD COLUMN locked_at INTEGER NULL;'); - } - - if (!columnNames.has('locked_by')) { - await db.exec('ALTER TABLE outbox_result ADD COLUMN locked_by TEXT NULL;'); - } -} - -async function migrate() { - const db = new DatabaseClient(config.db); - const migrationsDir = path.join(__dirname, '..', '..', 'db', 'migrations'); - const files = fs - .readdirSync(migrationsDir) - .filter((name) => name.endsWith('.sql')) - .sort(); - - await ensureMigrationTable(db); - - for (const file of files) { - if (await migrationIsApplied(db, file)) { - continue; - } - - if (file === LOCK_MIGRATION_FILE) { - await applyOutboxLockMigration(db); - await markMigrationApplied(db, file); - continue; - } - - const payload = fs.readFileSync(path.join(migrationsDir, file), 'utf8'); - await db.exec(payload); - await markMigrationApplied(db, file); - } - - await db.close(); -} - -if (require.main === module) { - migrate() - .then(() => console.log('migrations applied')) - .catch((err) => { - console.error('migration failed', err); - process.exit(1); - }); -} - -module.exports = migrate; diff --git a/middleware/test/parsers.test.js b/middleware/test/parsers.test.js deleted file mode 100644 index 8b2684d..0000000 --- a/middleware/test/parsers.test.js +++ /dev/null @@ -1,33 +0,0 @@ -const assert = require('node:assert'); -const { parse: parseHl7 } = require('../src/parsers/hl7Parser'); -const { parse: parseAstm } = require('../src/parsers/astmParser'); -const { normalize } = require('../src/normalizers'); - -async function run() { - const hl7Sample = String.raw`MSH|^~\&|LIS|LAB|CLQMS|NORTH|202603261000||ORU^R01|msg-123|P|2.5 -OBR|1|SOMEID|SMP-001||CBC^^^WBC^H|||202603261000 -OBX|1|NM|WBC^White Blood Cell Count||8.2|10^3/uL|N|||F -OBX|2|NM|RBC^Red Blood Cell Count||4.5|10^6/uL|N|||F`; - const astmSample = String.raw`H|\^&|LAB||ASTM1 -P|1 -O|1|SMP-100|12345^Instrument|^^^GLU^^^|| -R|1|^^^GLU^7.2|7.2|mg/dL|||||N| -R|2|^^^ALT^7.4|50|IU/L|||||N|`; - - const hl7Result = await parseHl7(hl7Sample); - assert.strictEqual(hl7Result.results.length, 2, 'HL7 should parse two OBX segments'); - const hl7Normalized = normalize(hl7Result); - assert.strictEqual(hl7Normalized.instrument_id, 'SOMEID'); - assert.strictEqual(hl7Normalized.results[0].test_code, 'WBC'); - - const astmResult = parseAstm(astmSample); - assert.strictEqual(astmResult.results.length, 2, 'ASTM should parse multi R segments'); - const astmNormalized = normalize(astmResult); - assert.strictEqual(astmNormalized.results[0].test_code, 'GLU'); - console.log('Parser smoke tests passed'); -} - -run().catch((err) => { - console.error(err); - process.exit(1); -}); diff --git a/package.json b/package.json index 7ddb040..972164d 100644 --- a/package.json +++ b/package.json @@ -2,16 +2,16 @@ "name": "tinylink", "version": "1.0.0", "description": "Workstation middleware service", - "main": "middleware/src/index.js", + "main": "core/index.js", "directories": { "doc": "docs" }, "scripts": { - "start": "node middleware/src/index.js", - "migrate": "node middleware/src/storage/migrate.js", - "maintenance": "node middleware/src/scripts/maintenance.js", - "instrument:check": "node middleware/src/scripts/instrumentCheck.js", - "test": "node middleware/test/parsers.test.js" + "start": "node core/index.js", + "migrate": "node core/maintenance/migrate.js", + "maintenance": "node core/maintenance/maintenance.js", + "instrument:check": "node core/config/instrumentCheck.js", + "test": "node core/rawPipeline.test.js" }, "keywords": [], "author": "",