diff --git a/transports/nats.js b/transports/nats.js index 200978d..a53c70a 100644 --- a/transports/nats.js +++ b/transports/nats.js @@ -1,65 +1,6 @@ const highland = require('highland') const lodash = require('lodash') -const Natss = require('nats-transport') module.exports = async config => { - const publishers = new Map() - const subscribers = new Map() - const transport = await Natss(config) - // const sub = transport.subscribe('test') - // sub.resume() - // const pub = transport.publish('test') - - function publish(service, channel) { - const id = [service, channel].join('.') - if (publishers.has(id)) return publishers.get(id) - const pub = transport.publish(id) - publishers.set(id, pub) - - return pub - - // stream - // .map(x=>{ - // x.service = service - // return x - // }) - // // .doto(x=>{ - // // console.log('publishing',x) - // // }) - // .pipe(pub) - - // return stream - } - - function subscribe(service, channel) { - const id = [service, channel].join('.') - const sub = transport.subscribe(id) - if (subscribers.has(id)) return subscribers.get(id).observe() - subscribers.set(id, sub) - return sub - } - - // function publish(service,channel){ - // const id = [service,channel].join('.') - // console.log('natss publisher transport',id) - // if(publishers.has(id)) return publishers.get(id) - // const pub = transport.publish(id) - // publishers.set(id,pub) - // return pub - // } - - // function subscribe(service,channel){ - // const id = [service,channel].join('.') - // console.log('natss susbscribe transport',id) - // if(subscribers.has(id)) return subscribers.get(id) - // const sub = transport.subscribe(id) - // sub.resume() - // subscribers.set(id,sub) - // return sub - // } - - return { - publish, - subscribe, - } + throw new Error('Nats transport not implemented') } diff --git a/transports/natss.js b/transports/natss.js index 378d2a3..18a5334 100644 --- a/transports/natss.js +++ b/transports/natss.js @@ -37,11 +37,12 @@ function Natss(config,emit=x=>x){ function subscribe(channel, durableName) { assert(channel, 'requires channel') var opts = stan.subscriptionOptions() - opts.setStartTime(now) if (durableName) { opts.setDeliverAllAvailable() opts.setDurableName(durableName) + }else{ + opts.setStartTime(now) } const sub = stan.subscribe(channel, opts) @@ -113,6 +114,7 @@ module.exports = async config => { return { publish, subscribe, + transport, } } diff --git a/transports/test.js b/transports/test.js index 073a504..4aded6f 100644 --- a/transports/test.js +++ b/transports/test.js @@ -1,3 +1,5 @@ +require('dotenv').config() +const config = require('openenv')(process.env) const test = require('tape') const Local = require('./local') const Natss = require('./natss') @@ -20,14 +22,26 @@ test('transport',t=>{ }) }) t.test('natss',t=>{ + let server, client t.test('init',async t=>{ - transport = await Natss({ - durableName:'test', - clusterid:'test-cluster', - clientid:'test', - url:'localhost', + server = await Natss({...config.natss,clientid:'test-server'}) + t.ok(server) + t.end() + }) + t.test('publish',async t=>{ + await server.publish('a','test').write('hello') + t.end() + }) + t.test('subscribe',async t=>{ + t.plan(1) + client = await Natss({...config.natss,durableName:'test',clientid:'test-client'}) + await client.subscribe('a','test').each(x=>{ + t.equal(x,'hello') }) - t.ok(transport) + }) + t.test('close',async t=>{ + await client.transport.disconnect() + await server.transport.disconnect() t.end() }) }) diff --git a/yarn.lock b/yarn.lock index 75db43c..00d4e0c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -237,9 +237,12 @@ object-inspect@^1.7.0, object-inspect@~1.7.0: integrity sha512-a7pEHdh1xKIAgTySUGgLMx/xwDZskN1Ud6egYYN3EdRW4ZMPNEDUTF+hwy2LUC+Bl+SyLXANnwz/jyh/qutKUw== object-is@^1.0.1: - version "1.0.2" - resolved "https://registry.yarnpkg.com/object-is/-/object-is-1.0.2.tgz#6b80eb84fe451498f65007982f035a5b445edec4" - integrity sha512-Epah+btZd5wrrfjkJZq1AOB9O6OxUQto45hzFd7lXGrpHPGE0W1k+426yrZV+k6NJOzLNNW/nVsmZdIWsAqoOQ== + version "1.1.2" + resolved "https://registry.yarnpkg.com/object-is/-/object-is-1.1.2.tgz#c5d2e87ff9e119f78b7a088441519e2eec1573b6" + integrity sha512-5lHCz+0uufF6wZ7CRFWJN3hp8Jqblpgve06U5CMQ3f//6iDjPr2PEo9MWCjEssDsa+UZEL4PkFpr+BMop6aKzQ== + dependencies: + define-properties "^1.1.3" + es-abstract "^1.17.5" object-keys@^1.0.11, object-keys@^1.0.12, object-keys@^1.1.1: version "1.1.1" @@ -317,9 +320,9 @@ string.prototype.trim@~1.2.1: function-bind "^1.1.1" string.prototype.trimend@^1.0.0: - version "1.0.0" - resolved "https://registry.yarnpkg.com/string.prototype.trimend/-/string.prototype.trimend-1.0.0.tgz#ee497fd29768646d84be2c9b819e292439614373" - integrity sha512-EEJnGqa/xNfIg05SxiPSqRS7S9qwDhYts1TSLR1BQfYUfPe1stofgGKvwERK9+9yf+PpfBMlpBaCHucXGPQfUA== + version "1.0.1" + resolved "https://registry.yarnpkg.com/string.prototype.trimend/-/string.prototype.trimend-1.0.1.tgz#85812a6b847ac002270f5808146064c995fb6913" + integrity sha512-LRPxFUaTtpqYsTeNKaFOw3R4bxIzWOnbQ837QfBylo8jIxtcbK/A/sMV7Q+OAV/vWo+7s25pOE10KYSjaSO06g== dependencies: define-properties "^1.1.3" es-abstract "^1.17.5" @@ -343,9 +346,9 @@ string.prototype.trimright@^2.1.1: string.prototype.trimend "^1.0.0" string.prototype.trimstart@^1.0.0: - version "1.0.0" - resolved "https://registry.yarnpkg.com/string.prototype.trimstart/-/string.prototype.trimstart-1.0.0.tgz#afe596a7ce9de905496919406c9734845f01a2f2" - integrity sha512-iCP8g01NFYiiBOnwG1Xc3WZLyoo+RuBymwIlWncShXDDJYWN6DbnM3odslBJdgCdRlq94B5s63NWAZlcn2CS4w== + version "1.0.1" + resolved "https://registry.yarnpkg.com/string.prototype.trimstart/-/string.prototype.trimstart-1.0.1.tgz#14af6d9f34b053f7cfc89b72f8f2ee14b9039a54" + integrity sha512-XxZn+QpvrBI1FOcg6dIpxUPgWCPuNXvMD72aaRaUQv1eD4e/Qy8i/hFTe0BUmD60p/QA6bh1avmuPTfNjqVWRw== dependencies: define-properties "^1.1.3" es-abstract "^1.17.5"