Skip to content

Commit

Permalink
feat: Improve reliability with faulty apps (closes #129)
Browse files Browse the repository at this point in the history
  • Loading branch information
claustres committed Mar 27, 2024
1 parent 570deba commit 4a8b26a
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 184 deletions.
4 changes: 1 addition & 3 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ yarn-error.log*
node_modules

# Project
example
coverage
lib
src


1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ This launches a gateway and two replicas of the microservice. Wait a couple of s

The same example is available based on a Docker compose file:
```
cd ./example
// Start
docker-compose up -d
// Stop when you've finished
Expand Down
14 changes: 10 additions & 4 deletions example/docker-compose.yml → docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
version: '2'
version: '3'

services:
gateway:
build: ./gateway
build:
context: .
dockerfile: ./example/gateway/dockerfile
container_name: gateway
image: feathers/gateway
ports:
- 3030:3030
networks:
- feathers
service1:
build: ./service
build:
context: .
dockerfile: ./example/service/dockerfile
container_name: service1
image: feathers/service1
ports:
- 3031:3031
networks:
- feathers
service2:
build: ./service
build:
context: .
dockerfile: ./example/service/dockerfile
container_name: service2
image: feathers/service2
ports:
Expand Down
8 changes: 5 additions & 3 deletions example/gateway/dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
FROM node:12-buster
FROM node:16-bullseye

MAINTAINER Luc Claustres <luc.claustres@kalisio.xyz>

WORKDIR /opt/app
COPY . /opt/app
COPY . /opt/feathers-distributed

WORKDIR /opt/feathers-distributed
RUN yarn install
WORKDIR /opt/feathers-distributed/example/gateway
RUN yarn install

EXPOSE 3030
Expand Down
14 changes: 7 additions & 7 deletions example/gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
"mocha": "mocha test/ --recursive"
},
"dependencies": {
"@feathersjs/authentication": "^5.0.0-pre.22",
"@feathersjs/authentication-local": "^5.0.0-pre.22",
"@feathersjs/configuration": "^5.0.0-pre.22",
"@feathersjs/errors": "^5.0.0-pre.22",
"@feathersjs/express": "^5.0.0-pre.22",
"@feathersjs/feathers": "^5.0.0-pre.22",
"@feathersjs/socketio": "^5.0.0-pre.22",
"@feathersjs/authentication": "^5.0.8",
"@feathersjs/authentication-local": "^5.0.8",
"@feathersjs/configuration": "^5.0.8",
"@feathersjs/errors": "^5.0.8",
"@feathersjs/express": "^5.0.8",
"@feathersjs/feathers": "^5.0.8",
"@feathersjs/socketio": "^5.0.8",
"body-parser": "^1.18.1",
"compression": "^1.7.0",
"cors": "^2.8.4",
Expand Down
8 changes: 5 additions & 3 deletions example/service/dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
FROM node:12-buster
FROM node:16-bullseye

MAINTAINER Luc Claustres <luc.claustres@kalisio.xyz>

WORKDIR /opt/app
COPY . /opt/app
COPY . /opt/feathers-distributed

WORKDIR /opt/feathers-distributed
RUN yarn install
WORKDIR /opt/feathers-distributed/example/service
RUN yarn install

EXPOSE 3031
Expand Down
10 changes: 5 additions & 5 deletions example/service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
"mocha": "mocha test/ --recursive"
},
"dependencies": {
"@feathersjs/configuration": "^5.0.0-pre.22",
"@feathersjs/errors": "^5.0.0-pre.22",
"@feathersjs/express": "^5.0.0-pre.22",
"@feathersjs/feathers": "^5.0.0-pre.22",
"@feathersjs/socketio": "^5.0.0-pre.22",
"@feathersjs/configuration": "^5.0.8",
"@feathersjs/errors": "^5.0.8",
"@feathersjs/express": "^5.0.8",
"@feathersjs/feathers": "^5.0.8",
"@feathersjs/socketio": "^5.0.8",
"body-parser": "^1.18.1",
"compression": "^1.7.0",
"cors": "^2.8.4",
Expand Down
85 changes: 58 additions & 27 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { promisify } from 'util'
import errors from '@feathersjs/errors'
import makeCote from 'cote'
import { v4 as uuid } from 'uuid'
import makeDebug from 'debug'
import portfinder from 'portfinder'
import { DEFAULT_METHODS, DEFAULT_EVENTS, HealthcheckService } from './utils.js'
import { v4 as uuid } from 'uuid'
import { DEFAULT_METHODS, DEFAULT_EVENTS, COMPONENTS, HealthcheckService } from './utils.js'
import { publishService, unpublishService, publishServices } from './publish.js'
import { registerService, unregisterService, registerApplication } from './register.js'
import { registerService, unregisterService, registerApplication, unregisterApplication } from './register.js'

const { Unavailable } = errors
const debug = makeDebug('feathers-distributed')
Expand All @@ -19,18 +19,23 @@ export async function initialize (app) {
// Placeholder for request/events managers for remote services
app.serviceRequesters = {}
app.serviceEventsSubscribers = {}
// Placeholder for remote app replicas
app.remoteApps = {}

// This subscriber listen to an event each time a remote app service has been registered
app.serviceSubscriber = new app.cote.Subscriber({
name: 'feathers services subscriber',
name: COMPONENTS.SERVICES_SUBSCRIBER,
namespace: 'services',
key: 'services',
subscribesTo: ['service', 'service-removed']
subscribesTo: ['service', 'service-removed'],
appUuid: app.uuid,
appDistributionKey: app.distributionKey
}, app.coteOptions)
debug('Services subscriber ready for app with uuid ' + app.uuid + ' and key ' + app.distributionKey)
debug('Services subscriber ready for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
// When a remote service is declared create the local proxy interface to it
app.serviceSubscriber.on('service', async serviceDescriptor => {
// When a new app pops up create the required proxy to it first
// When a new app pops up ensure the required proxy to it is created first
// Indeed this should be done by new component detection but as it is based on a check interval it might occur later
await registerApplication(app, serviceDescriptor)
registerService(app, serviceDescriptor)
})
Expand All @@ -43,35 +48,41 @@ export async function initialize (app) {
await promisify(setTimeout)(app.distributionOptions.componentDelay)
// This publisher publishes an event each time a local app or service is registered
app.servicePublisher = new app.cote.Publisher({
name: 'feathers services publisher',
name: COMPONENTS.SERVICES_PUBLISHER,
namespace: 'services',
key: 'services',
broadcasts: ['service', 'service-removed']
broadcasts: ['service', 'service-removed'],
appUuid: app.uuid,
appDistributionKey: app.distributionKey
}, app.coteOptions)
debug('Services publisher ready for app with uuid ' + app.uuid + ' and key ' + app.distributionKey)
// Dispatcher of service events to other nodes) {
debug('Services publisher ready for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
// Dispatcher of service events to other nodes
if (app.distributionOptions.publishEvents) {
// Wait before instanciating new component to avoid too much concurrency on port allocation
await promisify(setTimeout)(app.distributionOptions.componentDelay)
app.serviceEventsPublisher = new app.cote.Publisher({
name: 'feathers service events publisher',
name: COMPONENTS.SERVICES_EVENTS_PUBLISHER,
namespace: app.distributionKey,
key: app.distributionKey,
broadcasts: app.distributionOptions.distributedEvents || DEFAULT_EVENTS
broadcasts: app.distributionOptions.distributedEvents || DEFAULT_EVENTS,
appUuid: app.uuid,
appDistributionKey: app.distributionKey
}, app.coteOptions)
debug('Service events publisher ready for local app with uuid ' + app.uuid + ' and key ' + app.distributionKey)
debug('Service events publisher ready for local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
}
// Wait before instanciating new component to avoid too much concurrency on port allocation
await promisify(setTimeout)(app.distributionOptions.componentDelay)
// Create the response manager for local services
const methods = app.distributionOptions.distributedMethods || DEFAULT_METHODS
app.serviceResponder = new app.cote.Responder({
name: 'feathers services responder',
name: COMPONENTS.SERVICES_RESPONDER,
namespace: app.distributionKey,
key: app.distributionKey,
requests: methods.concat(['healthcheck'])
requests: methods.concat(['healthcheck']),
appUuid: app.uuid,
appDistributionKey: app.distributionKey
}, app.coteOptions)
debug('Service responder ready for local app with uuid ' + app.uuid + ' and key ' + app.distributionKey)
debug('Service responder ready for local app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
debug('Registering listeners for ', methods.concat(['healthcheck']))
// Answer requests from other nodes
if (methods.includes('find')) {
Expand Down Expand Up @@ -151,9 +162,27 @@ export async function initialize (app) {

// Each time a new app pops up we republish local services so that
// service distribution does not depend on the initialization order of the apps
app.servicePublisher.on('cote:added', (data) => { publishServices(app) })
// FIXME: we should manage apps going offline
app.servicePublisher.on('cote:removed', (data) => { })
app.servicePublisher.on('cote:added', (data) => {
// As this event is emitted for all cote components filtering one should be sufficient
if (data.advertisement.name !== COMPONENTS.SERVICES_SUBSCRIBER) return
const key = data.advertisement.appDistributionKey
const uuid = data.advertisement.appUuid
const shortUuid = data.advertisement.appUuid.split('-')[0]
debug('New component detected for app with uuid ' + shortUuid + ' and key ' + key + ' from app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
// When a new app pops up create the required proxy to it first
registerApplication(app, { uuid, shortUuid, key })
})
// Manage app going offline
app.servicePublisher.on('cote:removed', (data) => {
// As this event is emitted for all cote components filtering one should be sufficient
if (data.advertisement.name !== COMPONENTS.SERVICES_SUBSCRIBER) return
const key = data.advertisement.appDistributionKey
const uuid = data.advertisement.appUuid
const shortUuid = data.advertisement.appUuid.split('-')[0]
debug('Component loss detected for app with uuid ' + shortUuid + ' and key ' + key + ' from app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
// When an app goes offline check if we need to keep cote components alive for remaining replicas
unregisterApplication(app, { uuid, shortUuid, key })
})

// Tell others apps I'm here
publishServices(app)
Expand All @@ -165,38 +194,39 @@ export async function initialize (app) {
publishService(app, path)
})
}, app.distributionOptions.heartbeatInterval)
debug('Scheduled heartbeat local services publishing for app with uuid ' + app.uuid + ' and key ' + app.distributionKey)
debug('Scheduled heartbeat local services publishing for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
}
}

export function finalize (app) {
debug('Finalizing cote')
delete app.remoteApps
if (app.serviceRequesters) {
Object.getOwnPropertyNames(app.serviceRequesters).forEach(key => {
debug(`Finalizing service requester for remote app with key ${key}`)
debug(`Finalizing service requester for remote app with key ${key} in local app with uuid ${app.shortUuid} and key ${app.distributionKey}`)
app.serviceRequesters[key].close()
})
delete app.serviceRequesters
}
if (app.serviceEventsSubscribers) {
Object.getOwnPropertyNames(app.serviceEventsSubscribers).forEach(key => {
debug(`Finalizing service event subscriber for remote app with key ${key}`)
debug(`Finalizing service event subscriber for remote app with key ${key} in local app with uuid ${app.shortUuid} and key ${app.distributionKey}`)
app.serviceEventsSubscribers[key].close()
})
delete app.serviceEventsSubscribers
}
if (app.serviceSubscriber) {
debug(`Finalizing service subscriber for local app with key ${app.distributionKey}`)
debug(`Finalizing service subscriber for local app with uuid ${app.shortUuid} and key ${app.distributionKey}`)
app.serviceSubscriber.close()
delete app.serviceSubscriber
}
if (app.servicePublisher) {
debug(`Finalizing service publisher for local app with key ${app.distributionKey}`)
debug(`Finalizing service publisher for local app with uuid ${app.shortUuid} and key ${app.distributionKey}`)
app.servicePublisher.close()
delete app.servicePublisher
}
if (app.serviceResponder) {
debug(`Finalizing service responder for local app with key ${app.distributionKey}`)
debug(`Finalizing service responder for local app with uuid ${app.shortUuid} and key ${app.distributionKey}`)
app.serviceResponder.close()
delete app.serviceResponder
}
Expand All @@ -208,8 +238,9 @@ export function finalize (app) {
export default function init (options = {}) {
return function (app) {
// We need to uniquely identify the app to avoid infinite loop by registering our own services
// This uuid is also used a partition key in cote unless provided
app.uuid = uuid()
// For display purpose
app.shortUuid = app.uuid.split('-')[0]
app.coteOptions = Object.assign({
helloInterval: 10000,
checkInterval: 20000,
Expand Down
20 changes: 11 additions & 9 deletions lib/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ export function publishService (app, path) {
if (!service || (typeof service !== 'object')) return
if (service.remote) {
debugIgnore('Ignoring remote service publication on path ' + path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)
app.shortUuid + ' and key ' + app.distributionKey)
return
}
const options = Object.assign(getServiceOptions(service), service.options)
const serviceDescriptor = {
uuid: app.uuid,
shortUuid: app.shortUuid,
key: app.distributionKey,
path: stripSlashes(path),
events: options.distributedEvents || options.events.concat(DEFAULT_EVENTS),
Expand All @@ -30,7 +31,7 @@ export function publishService (app, path) {
// Skip internal services
if (isInternalService(app, serviceDescriptor)) {
debugIgnore('Ignoring local service publication on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)
app.shortUuid + ' and key ' + app.distributionKey)
return
}
// Add distributed service options if any
Expand All @@ -50,7 +51,7 @@ export function publishService (app, path) {
service[EVENT_LISTENERS_KEY][event] = object => {
if (app.serviceEventsPublisher) {
debug(`Publishing ${event} local service event on path ` + serviceDescriptor.path +
' for app with uuid ' + app.uuid + ' and key ' + app.distributionKey, object)
' for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey, object)
app.serviceEventsPublisher.publish(event, Object.assign({
path: serviceDescriptor.path, key: app.distributionKey
}, object))
Expand All @@ -60,12 +61,12 @@ export function publishService (app, path) {
service.on(event, service[EVENT_LISTENERS_KEY][event])
})
debug('Publish callbacks registered for local service events on path ' + serviceDescriptor.path +
' for app with uuid ' + app.uuid + ' and key ' + app.distributionKey, serviceDescriptor.events)
' for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey, serviceDescriptor.events)
}
// Publish new local service
app.servicePublisher.publish('service', serviceDescriptor)
debug('Published local service on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey, serviceDescriptor)
app.shortUuid + ' and key ' + app.distributionKey, serviceDescriptor)
}

export function unpublishService (app, path) {
Expand All @@ -77,14 +78,15 @@ export function unpublishService (app, path) {
const options = Object.assign(getServiceOptions(service), service.options)
const serviceDescriptor = {
uuid: app.uuid,
shortUuid: app.shortUuid,
key: app.distributionKey,
path: stripSlashes(path),
events: options.distributedEvents || options.events.concat(DEFAULT_EVENTS)
}
// Skip internal services
if (isInternalService(app, serviceDescriptor)) {
debugIgnore('Ignoring local service unpublication on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey)
app.shortUuid + ' and key ' + app.distributionKey)
return
}
// Remove event listeners whenever required and if not already done
Expand All @@ -95,12 +97,12 @@ export function unpublishService (app, path) {
// Untag service so that we will not uninstall listeners twice
delete service[EVENT_LISTENERS_KEY]
debug('Publish callbacks unregistered for local service events on path ' + serviceDescriptor.path +
' for app with uuid ' + app.uuid + ' and key ' + app.distributionKey, serviceDescriptor.events)
' for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey, serviceDescriptor.events)
}
// Unpublish removed local service
app.servicePublisher.publish('service-removed', serviceDescriptor)
debug('Unpublished local service on path ' + serviceDescriptor.path + ' for app with uuid ' +
app.uuid + ' and key ' + app.distributionKey, serviceDescriptor)
app.shortUuid + ' and key ' + app.distributionKey, serviceDescriptor)
}

export function publishServices (app) {
Expand All @@ -113,5 +115,5 @@ export function publishServices (app) {
// Reset timeout so that next queued publication will be scheduled
app.applicationPublicationTimeout = null
}, app.distributionOptions.publicationDelay)
debug('Scheduled local services publishing for app with uuid ' + app.uuid + ' and key ' + app.distributionKey)
debug('Scheduled local services publishing for app with uuid ' + app.shortUuid + ' and key ' + app.distributionKey)
}
Loading

0 comments on commit 4a8b26a

Please sign in to comment.