From 4a8b26a1aa9c8ba7e91e7f5c4487e0e346f27b8a Mon Sep 17 00:00:00 2001 From: Luc Claustres Date: Wed, 27 Mar 2024 19:54:24 +0100 Subject: [PATCH] feat: Improve reliability with faulty apps (closes #129) --- .dockerignore | 4 +- README.md | 1 - .../docker-compose.yml => docker-compose.yml | 14 ++- example/gateway/dockerfile | 8 +- example/gateway/package.json | 14 +-- example/service/dockerfile | 8 +- example/service/package.json | 10 +- lib/index.js | 85 +++++++++----- lib/publish.js | 20 ++-- lib/register.js | 97 +++++++++++---- lib/utils.js | 8 ++ test/index.test.js | 101 +--------------- test/network.test.js | 110 ++++++++++++++++++ test/utils.js | 97 +++++++++++++++ 14 files changed, 393 insertions(+), 184 deletions(-) rename example/docker-compose.yml => docker-compose.yml (60%) create mode 100644 test/network.test.js create mode 100644 test/utils.js diff --git a/.dockerignore b/.dockerignore index 93b8e8f..0c99107 100644 --- a/.dockerignore +++ b/.dockerignore @@ -15,8 +15,6 @@ yarn-error.log* node_modules # Project -example coverage -lib -src + diff --git a/README.md b/README.md index 1500119..ae59d0f 100644 --- a/README.md +++ b/README.md @@ -244,7 +244,6 @@ This launches a gateway and two replicas of the microservice. Wait a couple of s The same example is available based on a Docker compose file: ``` -cd ./example // Start docker-compose up -d // Stop when you've finished diff --git a/example/docker-compose.yml b/docker-compose.yml similarity index 60% rename from example/docker-compose.yml rename to docker-compose.yml index 8c1f0e5..9a68ea3 100644 --- a/example/docker-compose.yml +++ b/docker-compose.yml @@ -1,8 +1,10 @@ -version: '2' +version: '3' services: gateway: - build: ./gateway + build: + context: . + dockerfile: ./example/gateway/dockerfile container_name: gateway image: feathers/gateway ports: @@ -10,7 +12,9 @@ services: networks: - feathers service1: - build: ./service + build: + context: . + dockerfile: ./example/service/dockerfile container_name: service1 image: feathers/service1 ports: @@ -18,7 +22,9 @@ services: networks: - feathers service2: - build: ./service + build: + context: . + dockerfile: ./example/service/dockerfile container_name: service2 image: feathers/service2 ports: diff --git a/example/gateway/dockerfile b/example/gateway/dockerfile index ee9849e..e7805dc 100644 --- a/example/gateway/dockerfile +++ b/example/gateway/dockerfile @@ -1,10 +1,12 @@ -FROM node:12-buster +FROM node:16-bullseye MAINTAINER Luc Claustres -WORKDIR /opt/app -COPY . /opt/app +COPY . /opt/feathers-distributed +WORKDIR /opt/feathers-distributed +RUN yarn install +WORKDIR /opt/feathers-distributed/example/gateway RUN yarn install EXPOSE 3030 diff --git a/example/gateway/package.json b/example/gateway/package.json index 1232db0..8905035 100644 --- a/example/gateway/package.json +++ b/example/gateway/package.json @@ -29,13 +29,13 @@ "mocha": "mocha test/ --recursive" }, "dependencies": { - "@feathersjs/authentication": "^5.0.0-pre.22", - "@feathersjs/authentication-local": "^5.0.0-pre.22", - "@feathersjs/configuration": "^5.0.0-pre.22", - "@feathersjs/errors": "^5.0.0-pre.22", - "@feathersjs/express": "^5.0.0-pre.22", - "@feathersjs/feathers": "^5.0.0-pre.22", - "@feathersjs/socketio": "^5.0.0-pre.22", + "@feathersjs/authentication": "^5.0.8", + "@feathersjs/authentication-local": "^5.0.8", + "@feathersjs/configuration": "^5.0.8", + "@feathersjs/errors": "^5.0.8", + "@feathersjs/express": "^5.0.8", + "@feathersjs/feathers": "^5.0.8", + "@feathersjs/socketio": "^5.0.8", "body-parser": "^1.18.1", "compression": "^1.7.0", "cors": "^2.8.4", diff --git a/example/service/dockerfile b/example/service/dockerfile index d33823c..abbcf5d 100644 --- a/example/service/dockerfile +++ b/example/service/dockerfile @@ -1,10 +1,12 @@ -FROM node:12-buster +FROM node:16-bullseye MAINTAINER Luc Claustres -WORKDIR /opt/app -COPY . /opt/app +COPY . /opt/feathers-distributed +WORKDIR /opt/feathers-distributed +RUN yarn install +WORKDIR /opt/feathers-distributed/example/service RUN yarn install EXPOSE 3031 diff --git a/example/service/package.json b/example/service/package.json index f1fbb70..6479e5e 100644 --- a/example/service/package.json +++ b/example/service/package.json @@ -29,11 +29,11 @@ "mocha": "mocha test/ --recursive" }, "dependencies": { - "@feathersjs/configuration": "^5.0.0-pre.22", - "@feathersjs/errors": "^5.0.0-pre.22", - "@feathersjs/express": "^5.0.0-pre.22", - "@feathersjs/feathers": "^5.0.0-pre.22", - "@feathersjs/socketio": "^5.0.0-pre.22", + "@feathersjs/configuration": "^5.0.8", + "@feathersjs/errors": "^5.0.8", + "@feathersjs/express": "^5.0.8", + "@feathersjs/feathers": "^5.0.8", + "@feathersjs/socketio": "^5.0.8", "body-parser": "^1.18.1", "compression": "^1.7.0", "cors": "^2.8.4", diff --git a/lib/index.js b/lib/index.js index c88bbff..af0fc8c 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,12 +1,12 @@ import { promisify } from 'util' import errors from '@feathersjs/errors' import makeCote from 'cote' -import { v4 as uuid } from 'uuid' import makeDebug from 'debug' import portfinder from 'portfinder' -import { DEFAULT_METHODS, DEFAULT_EVENTS, HealthcheckService } from './utils.js' +import { v4 as uuid } from 'uuid' +import { DEFAULT_METHODS, DEFAULT_EVENTS, COMPONENTS, HealthcheckService } from './utils.js' import { publishService, unpublishService, publishServices } from './publish.js' -import { registerService, unregisterService, registerApplication } from './register.js' +import { registerService, unregisterService, registerApplication, unregisterApplication } from './register.js' const { Unavailable } = errors const debug = makeDebug('feathers-distributed') @@ -19,18 +19,23 @@ export async function initialize (app) { // Placeholder for request/events managers for remote services app.serviceRequesters = {} app.serviceEventsSubscribers = {} + // Placeholder for remote app replicas + app.remoteApps = {} // This subscriber listen to an event each time a remote app service has been registered app.serviceSubscriber = new app.cote.Subscriber({ - name: 'feathers services subscriber', + name: COMPONENTS.SERVICES_SUBSCRIBER, namespace: 'services', key: 'services', - subscribesTo: ['service', 'service-removed'] + subscribesTo: ['service', 'service-removed'], + appUuid: app.uuid, + appDistributionKey: app.distributionKey }, app.coteOptions) - debug('Services subscriber ready for app with uuid ' + app.uuid + ' and key ' + app.distributionKey) + debug('Services subscriber ready for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) // When a remote service is declared create the local proxy interface to it app.serviceSubscriber.on('service', async serviceDescriptor => { - // When a new app pops up create the required proxy to it first + // When a new app pops up ensure the required proxy to it is created first + // Indeed this should be done by new component detection but as it is based on a check interval it might occur later await registerApplication(app, serviceDescriptor) registerService(app, serviceDescriptor) }) @@ -43,35 +48,41 @@ export async function initialize (app) { await promisify(setTimeout)(app.distributionOptions.componentDelay) // This publisher publishes an event each time a local app or service is registered app.servicePublisher = new app.cote.Publisher({ - name: 'feathers services publisher', + name: COMPONENTS.SERVICES_PUBLISHER, namespace: 'services', key: 'services', - broadcasts: ['service', 'service-removed'] + broadcasts: ['service', 'service-removed'], + appUuid: app.uuid, + appDistributionKey: app.distributionKey }, app.coteOptions) - debug('Services publisher ready for app with uuid ' + app.uuid + ' and key ' + app.distributionKey) - // Dispatcher of service events to other nodes) { + debug('Services publisher ready for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) + // Dispatcher of service events to other nodes if (app.distributionOptions.publishEvents) { // Wait before instanciating new component to avoid too much concurrency on port allocation await promisify(setTimeout)(app.distributionOptions.componentDelay) app.serviceEventsPublisher = new app.cote.Publisher({ - name: 'feathers service events publisher', + name: COMPONENTS.SERVICES_EVENTS_PUBLISHER, namespace: app.distributionKey, key: app.distributionKey, - broadcasts: app.distributionOptions.distributedEvents || DEFAULT_EVENTS + broadcasts: app.distributionOptions.distributedEvents || DEFAULT_EVENTS, + appUuid: app.uuid, + appDistributionKey: app.distributionKey }, app.coteOptions) - debug('Service events publisher ready for local app with uuid ' + app.uuid + ' and key ' + app.distributionKey) + debug('Service events publisher ready for local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) } // Wait before instanciating new component to avoid too much concurrency on port allocation await promisify(setTimeout)(app.distributionOptions.componentDelay) // Create the response manager for local services const methods = app.distributionOptions.distributedMethods || DEFAULT_METHODS app.serviceResponder = new app.cote.Responder({ - name: 'feathers services responder', + name: COMPONENTS.SERVICES_RESPONDER, namespace: app.distributionKey, key: app.distributionKey, - requests: methods.concat(['healthcheck']) + requests: methods.concat(['healthcheck']), + appUuid: app.uuid, + appDistributionKey: app.distributionKey }, app.coteOptions) - debug('Service responder ready for local app with uuid ' + app.uuid + ' and key ' + app.distributionKey) + debug('Service responder ready for local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) debug('Registering listeners for ', methods.concat(['healthcheck'])) // Answer requests from other nodes if (methods.includes('find')) { @@ -151,9 +162,27 @@ export async function initialize (app) { // Each time a new app pops up we republish local services so that // service distribution does not depend on the initialization order of the apps - app.servicePublisher.on('cote:added', (data) => { publishServices(app) }) - // FIXME: we should manage apps going offline - app.servicePublisher.on('cote:removed', (data) => { }) + app.servicePublisher.on('cote:added', (data) => { + // As this event is emitted for all cote components filtering one should be sufficient + if (data.advertisement.name !== COMPONENTS.SERVICES_SUBSCRIBER) return + const key = data.advertisement.appDistributionKey + const uuid = data.advertisement.appUuid + const shortUuid = data.advertisement.appUuid.split('-')[0] + debug('New component detected for app with uuid ' + shortUuid + ' and key ' + key + ' from app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) + // When a new app pops up create the required proxy to it first + registerApplication(app, { uuid, shortUuid, key }) + }) + // Manage app going offline + app.servicePublisher.on('cote:removed', (data) => { + // As this event is emitted for all cote components filtering one should be sufficient + if (data.advertisement.name !== COMPONENTS.SERVICES_SUBSCRIBER) return + const key = data.advertisement.appDistributionKey + const uuid = data.advertisement.appUuid + const shortUuid = data.advertisement.appUuid.split('-')[0] + debug('Component loss detected for app with uuid ' + shortUuid + ' and key ' + key + ' from app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) + // When an app goes offline check if we need to keep cote components alive for remaining replicas + unregisterApplication(app, { uuid, shortUuid, key }) + }) // Tell others apps I'm here publishServices(app) @@ -165,38 +194,39 @@ export async function initialize (app) { publishService(app, path) }) }, app.distributionOptions.heartbeatInterval) - debug('Scheduled heartbeat local services publishing for app with uuid ' + app.uuid + ' and key ' + app.distributionKey) + debug('Scheduled heartbeat local services publishing for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) } } export function finalize (app) { debug('Finalizing cote') + delete app.remoteApps if (app.serviceRequesters) { Object.getOwnPropertyNames(app.serviceRequesters).forEach(key => { - debug(`Finalizing service requester for remote app with key ${key}`) + debug(`Finalizing service requester for remote app with key ${key} in local app with uuid ${app.shortUuid} and key ${app.distributionKey}`) app.serviceRequesters[key].close() }) delete app.serviceRequesters } if (app.serviceEventsSubscribers) { Object.getOwnPropertyNames(app.serviceEventsSubscribers).forEach(key => { - debug(`Finalizing service event subscriber for remote app with key ${key}`) + debug(`Finalizing service event subscriber for remote app with key ${key} in local app with uuid ${app.shortUuid} and key ${app.distributionKey}`) app.serviceEventsSubscribers[key].close() }) delete app.serviceEventsSubscribers } if (app.serviceSubscriber) { - debug(`Finalizing service subscriber for local app with key ${app.distributionKey}`) + debug(`Finalizing service subscriber for local app with uuid ${app.shortUuid} and key ${app.distributionKey}`) app.serviceSubscriber.close() delete app.serviceSubscriber } if (app.servicePublisher) { - debug(`Finalizing service publisher for local app with key ${app.distributionKey}`) + debug(`Finalizing service publisher for local app with uuid ${app.shortUuid} and key ${app.distributionKey}`) app.servicePublisher.close() delete app.servicePublisher } if (app.serviceResponder) { - debug(`Finalizing service responder for local app with key ${app.distributionKey}`) + debug(`Finalizing service responder for local app with uuid ${app.shortUuid} and key ${app.distributionKey}`) app.serviceResponder.close() delete app.serviceResponder } @@ -208,8 +238,9 @@ export function finalize (app) { export default function init (options = {}) { return function (app) { // We need to uniquely identify the app to avoid infinite loop by registering our own services - // This uuid is also used a partition key in cote unless provided app.uuid = uuid() + // For display purpose + app.shortUuid = app.uuid.split('-')[0] app.coteOptions = Object.assign({ helloInterval: 10000, checkInterval: 20000, diff --git a/lib/publish.js b/lib/publish.js index 8859792..1b5027d 100644 --- a/lib/publish.js +++ b/lib/publish.js @@ -16,12 +16,13 @@ export function publishService (app, path) { if (!service || (typeof service !== 'object')) return if (service.remote) { debugIgnore('Ignoring remote service publication on path ' + path + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey) + app.shortUuid + ' and key ' + app.distributionKey) return } const options = Object.assign(getServiceOptions(service), service.options) const serviceDescriptor = { uuid: app.uuid, + shortUuid: app.shortUuid, key: app.distributionKey, path: stripSlashes(path), events: options.distributedEvents || options.events.concat(DEFAULT_EVENTS), @@ -30,7 +31,7 @@ export function publishService (app, path) { // Skip internal services if (isInternalService(app, serviceDescriptor)) { debugIgnore('Ignoring local service publication on path ' + serviceDescriptor.path + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey) + app.shortUuid + ' and key ' + app.distributionKey) return } // Add distributed service options if any @@ -50,7 +51,7 @@ export function publishService (app, path) { service[EVENT_LISTENERS_KEY][event] = object => { if (app.serviceEventsPublisher) { debug(`Publishing ${event} local service event on path ` + serviceDescriptor.path + - ' for app with uuid ' + app.uuid + ' and key ' + app.distributionKey, object) + ' for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey, object) app.serviceEventsPublisher.publish(event, Object.assign({ path: serviceDescriptor.path, key: app.distributionKey }, object)) @@ -60,12 +61,12 @@ export function publishService (app, path) { service.on(event, service[EVENT_LISTENERS_KEY][event]) }) debug('Publish callbacks registered for local service events on path ' + serviceDescriptor.path + - ' for app with uuid ' + app.uuid + ' and key ' + app.distributionKey, serviceDescriptor.events) + ' for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey, serviceDescriptor.events) } // Publish new local service app.servicePublisher.publish('service', serviceDescriptor) debug('Published local service on path ' + serviceDescriptor.path + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey, serviceDescriptor) + app.shortUuid + ' and key ' + app.distributionKey, serviceDescriptor) } export function unpublishService (app, path) { @@ -77,6 +78,7 @@ export function unpublishService (app, path) { const options = Object.assign(getServiceOptions(service), service.options) const serviceDescriptor = { uuid: app.uuid, + shortUuid: app.shortUuid, key: app.distributionKey, path: stripSlashes(path), events: options.distributedEvents || options.events.concat(DEFAULT_EVENTS) @@ -84,7 +86,7 @@ export function unpublishService (app, path) { // Skip internal services if (isInternalService(app, serviceDescriptor)) { debugIgnore('Ignoring local service unpublication on path ' + serviceDescriptor.path + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey) + app.shortUuid + ' and key ' + app.distributionKey) return } // Remove event listeners whenever required and if not already done @@ -95,12 +97,12 @@ export function unpublishService (app, path) { // Untag service so that we will not uninstall listeners twice delete service[EVENT_LISTENERS_KEY] debug('Publish callbacks unregistered for local service events on path ' + serviceDescriptor.path + - ' for app with uuid ' + app.uuid + ' and key ' + app.distributionKey, serviceDescriptor.events) + ' for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey, serviceDescriptor.events) } // Unpublish removed local service app.servicePublisher.publish('service-removed', serviceDescriptor) debug('Unpublished local service on path ' + serviceDescriptor.path + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey, serviceDescriptor) + app.shortUuid + ' and key ' + app.distributionKey, serviceDescriptor) } export function publishServices (app) { @@ -113,5 +115,5 @@ export function publishServices (app) { // Reset timeout so that next queued publication will be scheduled app.applicationPublicationTimeout = null }, app.distributionOptions.publicationDelay) - debug('Scheduled local services publishing for app with uuid ' + app.uuid + ' and key ' + app.distributionKey) + debug('Scheduled local services publishing for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) } diff --git a/lib/register.js b/lib/register.js index 3147e62..ca54644 100644 --- a/lib/register.js +++ b/lib/register.js @@ -1,63 +1,110 @@ import { promisify } from 'util' import makeDebug from 'debug' -import { DEFAULT_EVENTS, isDiscoveredService, getServicePath, getService } from './utils.js' +import { stripSlashes } from '@feathersjs/commons' +import { DEFAULT_EVENTS, COMPONENTS, isDiscoveredService, getServicePath, getService } from './utils.js' +import { publishServices } from './publish.js' import RemoteService from './service.js' const debug = makeDebug('feathers-distributed:register') const debugIgnore = makeDebug('feathers-distributed:ignore') export async function registerApplication (app, applicationDescriptor) { - // Create the request/events manager for remote services only if (applicationDescriptor.uuid === app.uuid) { - debugIgnore('Ignoring service requester/events publisher creation for local app with uuid ' + app.uuid) + debugIgnore('Ignoring service requester/events publisher creation for local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) return } + // Create the request/events manager for remote services const key = applicationDescriptor.key - // Already registered + // No app already registered for this key + if (!app.remoteApps[key]) app.remoteApps[key] = new Set() + debug('Registering remote app with uuid ' + applicationDescriptor.shortUuid + ' and key ' + key + ' in local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) + const isRegistered = app.remoteApps[key].has(applicationDescriptor.uuid) + // The first time a new application is registered publish services for it + if (!isRegistered) publishServices(app) + app.remoteApps[key].add(applicationDescriptor.uuid) + // Service requester/event subscriber already registered for this key + // as we only need one not one for each app (cote will dispatch) if (app.serviceRequesters[key]) { - debugIgnore('Ignoring already registered remote app with uuid ' + app.uuid + ' and key ' + key) return } - debug('Registering remote app with uuid ' + app.uuid + ' and key ' + key) // Create the request manager to remote services app.serviceRequesters[key] = new app.cote.Requester({ - name: 'feathers services requester', + name: COMPONENTS.SERVICES_REQUESTER, namespace: key, key, requests: ['find', 'get', 'create', 'update', 'patch', 'remove', 'healthcheck'], - timeout: app.distributionOptions.timeout || 20000 + timeout: app.distributionOptions.timeout || 20000, + appUuid: app.uuid, + appDistributionKey: app.distributionKey }, app.coteOptions) - debug('Service requester ready for remote app with uuid ' + applicationDescriptor.uuid + ' and key ' + key + - ' for app with uuid ' + app.uuid + ' and key ' + app.distributionKey) + debug('Service requester ready for remote app with key ' + key + + ' in local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) // Wait before instanciating new component to avoid too much concurrency on port allocation await promisify(setTimeout)(app.distributionOptions.componentDelay) // Subscriber to listen to events from other nodes const events = app.distributionOptions.distributedEvents || DEFAULT_EVENTS app.serviceEventsSubscribers[key] = new app.cote.Subscriber({ - name: 'feathers services events subscriber', + name: COMPONENTS.SERVICES_EVENTS_SUBSCRIBER, namespace: key, key, - subscribesTo: events + subscribesTo: events, + appUuid: app.uuid, + appDistributionKey: app.distributionKey }, app.coteOptions) events.forEach(event => { app.serviceEventsSubscribers[key].on(event, object => { - debug(`Dispatching ${event} remote service event on path ${object.path} for app with uuid ${app.uuid} and key ${app.distributionKey}`, object) + debug(`Dispatching ${event} remote service event on path ${object.path} in local app with uuid ${app.shortUuid} and key ${app.distributionKey}`, object) const servicePath = getServicePath(app, object) const service = getService(app, servicePath) // Ensure we don't have any local service with the same name to avoid infinite looping if (service && service.remote) service.emit(event, object) }) }) - debug('Service events subscriber ready for remote app with uuid ' + applicationDescriptor.uuid + ' and key ' + key + - ' for app with uuid ' + app.uuid + ' and key ' + app.distributionKey) + debug('Service events subscriber ready for remote app with key ' + key + + ' in local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) +} + +export async function unregisterApplication (app, applicationDescriptor) { + if (applicationDescriptor.uuid === app.uuid) { + debugIgnore('Ignoring service requester/events publisher removal for local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) + return + } + // Remove the request/events manager for remote services + const key = applicationDescriptor.key + // Already unregistered + if (!app.remoteApps[key]) { + debugIgnore('Ignoring service requester/events publisher removal as already done for local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) + return + } + app.remoteApps[key].delete(applicationDescriptor.uuid) + // Some replicas remain for this distribution key, keep service requester/event subscriber alive for them + if (app.remoteApps[key].size !== 0) { + debugIgnore('Ignoring service requester/events publisher removal as replicas remain for local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey) + return + } + // Clear service requester/event subscriber as no replicas remains for this key + debug(`Finalizing service requester for remote app with key ${key} as no remaining replica in local app with uuid ${app.shortUuid} and key ${app.distributionKey}`) + app.serviceRequesters[key].close() + delete app.serviceRequesters[key] + debug(`Finalizing service event subscriber for remote app with key ${key} as no remaining replica in local app with uuid ${app.shortUuid} and key ${app.distributionKey}`) + app.serviceEventsSubscribers[key].close() + delete app.serviceEventsSubscribers[key] + Object.getOwnPropertyNames(app.services).forEach(path => { + unregisterService(app, { + uuid: applicationDescriptor.uuid, + shortUuid: applicationDescriptor.shortUuid, + key, + path: stripSlashes(path) + }) + }) } export function registerService (app, serviceDescriptor) { // Do not register our own services if (serviceDescriptor.uuid === app.uuid) { debugIgnore('Ignoring local service registration on path ' + serviceDescriptor.path + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey) + app.shortUuid + ' and key ' + app.distributionKey) return } // Retrieve target service path as it could be different from remote one in case of alias @@ -67,17 +114,17 @@ export function registerService (app, serviceDescriptor) { if (service) { if (service instanceof RemoteService) { debugIgnore('Ignoring already registered service as remote on path ' + servicePath + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey) + app.shortUuid + ' and key ' + app.distributionKey) } else { debugIgnore('Ignoring already registered local service on path ' + servicePath + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey) + app.shortUuid + ' and key ' + app.distributionKey) } return } // Skip services we are not interested into if (!isDiscoveredService(app, serviceDescriptor)) { debugIgnore('Ignoring remote service on path ' + serviceDescriptor.path + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey) + app.shortUuid + ' and key ' + app.distributionKey) return } @@ -97,7 +144,7 @@ export function registerService (app, serviceDescriptor) { if (app.distributionOptions.middlewares.after) args = args.concat(app.distributionOptions.middlewares.after) app.use(...args) debug('Registered remote service on path ' + servicePath + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey, serviceDescriptor) + app.shortUuid + ' and key ' + app.distributionKey, serviceDescriptor) // Register hook object on every remote service if (app.distributionOptions.hooks) { @@ -110,7 +157,7 @@ export function registerService (app, serviceDescriptor) { } }) debug('Registered hooks on remote service on path ' + servicePath + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey) + app.shortUuid + ' and key ' + app.distributionKey) // Dispatch an event internally through node so that async processes can run app.emit('service', Object.assign({}, serviceDescriptor, { path: servicePath })) @@ -123,23 +170,23 @@ export function unregisterService (app, serviceDescriptor) { if (!service) { debugIgnore('Ignoring unregistration of already unregistered service on path ' + servicePath + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey) + app.shortUuid + ' and key ' + app.distributionKey) return } if (!(service instanceof RemoteService)) { debugIgnore('Ignoring unregistration of local service on path ' + servicePath + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey) + app.shortUuid + ' and key ' + app.distributionKey) return } // Skip services we are not interested into if (!isDiscoveredService(app, serviceDescriptor)) { debugIgnore('Ignoring unregistration of remote service on path ' + serviceDescriptor.path + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey) + app.shortUuid + ' and key ' + app.distributionKey) return } app.unuse(servicePath) debug('Unregistered remote service on path ' + servicePath + ' for app with uuid ' + - app.uuid + ' and key ' + app.distributionKey, serviceDescriptor) + app.shortUuid + ' and key ' + app.distributionKey, serviceDescriptor) // Dispatch an event internally through node so that async processes can run app.emit('service-removed', Object.assign({}, serviceDescriptor, { path: servicePath })) diff --git a/lib/utils.js b/lib/utils.js index 28ccac8..84fc0f8 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -5,6 +5,14 @@ const { Unavailable, NotFound } = errors export const DEFAULT_EVENTS = ['created', 'updated', 'patched', 'removed'] export const DEFAULT_METHODS = ['find', 'get', 'create', 'update', 'patch', 'remove'] +export const COMPONENTS = { + SERVICES_SUBSCRIBER: 'feathers services subscriber', + SERVICES_PUBLISHER: 'feathers services publisher', + SERVICES_EVENTS_PUBLISHER: 'feathers services events publisher', + SERVICES_EVENTS_SUBSCRIBER: 'feathers services events subscriber', + SERVICES_REQUESTER: 'feathers services requester', + SERVICES_RESPONDER: 'feathers services responder' +} export function isKoaApp (app) { return typeof app.request === 'object' && typeof app.request.query === 'object' diff --git a/test/index.test.js b/test/index.test.js index a3f37ac..c826deb 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -1,11 +1,9 @@ -import { authenticate, AuthenticationService, JWTStrategy } from '@feathersjs/authentication' -import { LocalStrategy } from '@feathersjs/authentication-local' +import { authenticate } from '@feathersjs/authentication' import auth from '@feathersjs/authentication-client' import restClient from '@feathersjs/rest-client' import socketClient from '@feathersjs/socketio-client' import express from '@feathersjs/express' import feathers from '@feathersjs/feathers' -import socketio from '@feathersjs/socketio' import request from 'superagent' import utils from 'util' import chai, { expect, util, assert } from 'chai' @@ -14,6 +12,7 @@ import spies from 'chai-spies' import * as commonHooks from 'feathers-hooks-common' import { MemoryService } from '@feathersjs/memory' import io from 'socket.io-client' +import { createApp, waitForService, waitForServiceRemoval, channels, clone } from './utils.js' import plugin, { finalize } from '../lib/index.js' class CustomMemoryService extends MemoryService { @@ -41,25 +40,7 @@ let serviceMiddleware = (req, res, next) => next() let appMiddleware = (req, res, next) => res.json({}) let hookFromRemote -function channels (app) { - if (typeof app.channel !== 'function') { - return - } - app.on('connection', connection => { - // console.log('App ' + app.uuid + ' with key ' + app.distributionKey + ' connects client ', connection) - app.channel('all').join(connection) - }) - app.publish((data, context) => { - // console.log('App ' + app.uuid + ' with key ' + app.distributionKey + ' publishes ', data) - return app.channel('all') - }) -} - -function clone (obj) { - return JSON.parse(JSON.stringify(obj)) -} - -describe('feathers-distributed', () => { +describe('feathers-distributed:main', () => { const apps = [] const servers = [] let customServices = [] @@ -78,42 +59,6 @@ describe('feathers-distributed', () => { const service2 = 2 const noEvents = 3 - function createApp (index) { - const app = express(feathers()) - const authService = new AuthenticationService(app) - - app.set('authentication', { - secret: '1234', - entity: 'user', - service: 'users', - entityId: 'id', - authStrategies: ['jwt', 'local'], - local: { - usernameField: 'email', - passwordField: 'password' - }, - jwtOptions: { - header: { typ: 'access' }, - audience: 'https://yourdomain.com', - issuer: 'feathers', - algorithm: 'HS256', - expiresIn: '1d' - } - }) - authService.register('jwt', new JWTStrategy()) - - if (index === gateway) { - authService.register('local', new LocalStrategy()) - } - - app.use(express.json()) - app.configure(socketio()) - app.configure(express.rest()) - app.use('/authentication', authService) - - return app - } - before(async () => { chailint(chai, util) chai.use(spies) @@ -124,7 +69,7 @@ describe('feathers-distributed', () => { const promises = [] for (let i = 0; i < nbApps; i++) { - apps.push(createApp(i)) + apps.push(createApp(i, { authentication: (i === gateway ? ['jwt', 'local'] : ['jwt']) })) apps[i].configure(plugin({ hooks: { before: { all: beforeHook }, after: { all: afterHook } }, middlewares: { after: express.errorHandler() }, @@ -189,7 +134,6 @@ describe('feathers-distributed', () => { apps[i].use(express.notFound()) apps[i].use(express.errorHandler()) servers.push(await apps[i].listen(3030 + i)) - promises.push(servers[i]) } for (let i = 0; i < nbApps; i++) { @@ -223,43 +167,6 @@ describe('feathers-distributed', () => { await utils.promisify(setTimeout)(10000) }) - function waitForService (app, path) { - return new Promise((resolve, reject) => { - app.on('service', data => { - if (data.path === path) { - let service - try { - service = app.service(path) - } catch { - reject(new Error(`Service on ${path} does not exist`)) - return - } - expect(service).toExist() - if (path === 'users') { - expect(service.remoteOptions).toExist() - expect(service.remoteOptions.startId).toExist() - } - resolve(service) - } - }) - }) - } - - function waitForServiceRemoval (app, path) { - return new Promise((resolve, reject) => { - app.on('service-removed', data => { - if (data.path === path) { - try { - app.service(path) - reject(new Error(`Service on ${path} do exists`)) - } catch { - resolve() - } - } - }) - }) - } - it('is ES module compatible', () => { expect(typeof finalize).to.equal('function') expect(typeof plugin).to.equal('function') diff --git a/test/network.test.js b/test/network.test.js new file mode 100644 index 0000000..8e01d15 --- /dev/null +++ b/test/network.test.js @@ -0,0 +1,110 @@ +import express from '@feathersjs/express' +import feathers from '@feathersjs/feathers' +import request from 'superagent' +import utils from 'util' +import chai, { expect, util, assert } from 'chai' +import chailint from 'chai-lint' +import spies from 'chai-spies' +import * as commonHooks from 'feathers-hooks-common' +import { MemoryService } from '@feathersjs/memory' +import io from 'socket.io-client' +import { createApp, waitForService, waitForServiceRemoval, clone } from './utils.js' +import plugin, { finalize } from '../lib/index.js' + +const startId = 2 +const store = { + 0: { content: 'message 0', id: 0 }, + 1: { content: 'message 1', id: 1 } +} + +describe('feathers-distributed:network', () => { + const apps = [] + const servers = [] + const nbApps = 3 + + before(async () => { + chailint(chai, util) + const promises = [] + + for (let i = 0; i < nbApps; i++) { + apps.push(createApp(i, { authentication: false })) + apps[i].configure(plugin({ + middlewares: { after: express.errorHandler() }, + // Distribute only the test service + services: (service) => service.path.endsWith('messages'), + key: (i === 0 ? 'app' : 'messages'), + coteDelay: 2000, + publicationDelay: 2000, + cote: { // Use cote defaults + helloInterval: 2000, + checkInterval: 4000, + nodeTimeout: 5000, + masterTimeout: 6000, + // We need 3 open ports by app + basePort: 10000, + highestPort: 10008 + } + })) + + // Only the first app use distributed services + if (i !== 0) { + apps[i].use('messages', new MemoryService({ store: clone(store), startId })) + const messagesService = apps[i].service('messages') + expect(messagesService).toExist() + promises.push(Promise.resolve(messagesService)) + } else { + // Wait for remote service to be registered + promises.push(waitForService(apps[i], 'messages')) + } + } + + await Promise.all(promises) + + for (let i = 0; i < nbApps; i++) { + // See https://github.com/kalisio/feathers-distributed/issues/3 + // Now all services are registered setup handlers + apps[i].use(express.notFound()) + apps[i].use(express.errorHandler()) + servers.push(await apps[i].listen(3030 + i)) + } + }) + + it('check remote service is accessible', async () => { + const messages = await apps[0].service('messages').find({}) + expect(messages.length > 0).beTrue() + }) + + it('check remote service is accessible on partial failure', async () => { + // Simulate network failure by closing the service subscriber socket + // as this component is used to detect app loss + apps[1].serviceSubscriber.close() + // Wait before cote component has been flagged as unreachable + await utils.promisify(setTimeout)(6000) + const messages = await apps[0].service('messages').find({}) + expect(messages.length > 0).beTrue() + }) + // Let enough time to process + .timeout(10000) + + it('check remote service is not accessible anymore on complete failure', async () => { + // Simulate network failure by closing the service subscriber socket + // as this component is used to detect app loss + apps[2].serviceSubscriber.close() + // Wait before cote component has been flagged as unreachable + await utils.promisify(setTimeout)(6000) + try { + const messages = await apps[0].service('messages').find({}) + assert.fail('accessing messages service should raise an error') + } catch (error) { + expect(error.message).to.equal(`Can not find service 'messages'`) + } + }) + + // Cleanup + after(async () => { + for (let i = 0; i < nbApps; i++) { + await servers[i].close() + finalize(apps[i]) + } + }) +}) diff --git a/test/utils.js b/test/utils.js new file mode 100644 index 0000000..7540e6a --- /dev/null +++ b/test/utils.js @@ -0,0 +1,97 @@ +import feathers from '@feathersjs/feathers' +import { AuthenticationService, JWTStrategy } from '@feathersjs/authentication' +import { LocalStrategy } from '@feathersjs/authentication-local' +import express from '@feathersjs/express' +import socketio from '@feathersjs/socketio' +import { expect } from 'chai' + +export function createApp (index, options = { authentication: ['jwt', 'local'] }) { + const app = express(feathers()) + let authService + if (options.authentication) { + authService = new AuthenticationService(app) + + app.set('authentication', { + secret: '1234', + entity: 'user', + service: 'users', + entityId: 'id', + authStrategies: ['jwt', 'local'], + local: { + usernameField: 'email', + passwordField: 'password' + }, + jwtOptions: { + header: { typ: 'access' }, + audience: 'https://yourdomain.com', + issuer: 'feathers', + algorithm: 'HS256', + expiresIn: '1d' + } + }) + if (options.authentication.includes('jwt')) authService.register('jwt', new JWTStrategy()) + if (options.authentication.includes('local')) authService.register('local', new LocalStrategy()) + } + + app.use(express.json()) + app.configure(socketio()) + app.configure(express.rest()) + if (authService) app.use('/authentication', authService) + + return app +} + +export function waitForService (app, path) { + return new Promise((resolve, reject) => { + app.on('service', data => { + if (data.path === path) { + let service + try { + service = app.service(path) + } catch { + reject(new Error(`Service on ${path} does not exist`)) + return + } + expect(service).toExist() + if (path === 'users') { + expect(service.remoteOptions).toExist() + expect(service.remoteOptions.startId).toExist() + } + resolve(service) + } + }) + }) +} + +export function waitForServiceRemoval (app, path) { + return new Promise((resolve, reject) => { + app.on('service-removed', data => { + if (data.path === path) { + try { + app.service(path) + reject(new Error(`Service on ${path} do exists`)) + } catch { + resolve() + } + } + }) + }) +} + +export function channels (app) { + if (typeof app.channel !== 'function') { + return + } + app.on('connection', connection => { + // console.log('App ' + app.uuid + ' with key ' + app.distributionKey + ' connects client ', connection) + app.channel('all').join(connection) + }) + app.publish((data, context) => { + // console.log('App ' + app.uuid + ' with key ' + app.distributionKey + ' publishes ', data) + return app.channel('all') + }) +} + +export function clone (obj) { + return JSON.parse(JSON.stringify(obj)) +}