26 lines
779 B
JavaScript

const logger = require('../util/logger');
const { processMessage } = require('../pipeline/pipeline');
async function ingestIncomingMessage(connectorName, incoming) {
const payload = incoming && Object.prototype.hasOwnProperty.call(incoming, 'payload')
? incoming.payload
: incoming;
const context = incoming && incoming.context ? incoming.context : {};
return processMessage(connectorName, payload, context);
}
function createIngestMessageHandler(connector) {
return async (incoming) => {
try {
await ingestIncomingMessage(connector.name(), incoming);
} catch (err) {
logger.error({ err: err.message, connector: connector.name() }, 'pipeline error');
}
};
}
module.exports = {
ingestIncomingMessage,
createIngestMessageHandler
};