Skip to content

Commit

Permalink
fix(natss,nats): add natss test, throw error on nats init
Browse files Browse the repository at this point in the history
  • Loading branch information
daywiss committed Apr 16, 2020
1 parent ff4d416 commit ee58d9f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 76 deletions.
61 changes: 1 addition & 60 deletions transports/nats.js
Original file line number Diff line number Diff line change
@@ -1,65 +1,6 @@
const highland = require('highland')
const lodash = require('lodash')
const Natss = require('nats-transport')

module.exports = async config => {
const publishers = new Map()
const subscribers = new Map()
const transport = await Natss(config)
// const sub = transport.subscribe('test')
// sub.resume()
// const pub = transport.publish('test')

function publish(service, channel) {
const id = [service, channel].join('.')
if (publishers.has(id)) return publishers.get(id)
const pub = transport.publish(id)
publishers.set(id, pub)

return pub

// stream
// .map(x=>{
// x.service = service
// return x
// })
// // .doto(x=>{
// // console.log('publishing',x)
// // })
// .pipe(pub)

// return stream
}

function subscribe(service, channel) {
const id = [service, channel].join('.')
const sub = transport.subscribe(id)
if (subscribers.has(id)) return subscribers.get(id).observe()
subscribers.set(id, sub)
return sub
}

// function publish(service,channel){
// const id = [service,channel].join('.')
// console.log('natss publisher transport',id)
// if(publishers.has(id)) return publishers.get(id)
// const pub = transport.publish(id)
// publishers.set(id,pub)
// return pub
// }

// function subscribe(service,channel){
// const id = [service,channel].join('.')
// console.log('natss susbscribe transport',id)
// if(subscribers.has(id)) return subscribers.get(id)
// const sub = transport.subscribe(id)
// sub.resume()
// subscribers.set(id,sub)
// return sub
// }

return {
publish,
subscribe,
}
throw new Error('Nats transport not implemented')
}
4 changes: 3 additions & 1 deletion transports/natss.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ function Natss(config,emit=x=>x){
function subscribe(channel, durableName) {
assert(channel, 'requires channel')
var opts = stan.subscriptionOptions()
opts.setStartTime(now)

if (durableName) {
opts.setDeliverAllAvailable()
opts.setDurableName(durableName)
}else{
opts.setStartTime(now)
}

const sub = stan.subscribe(channel, opts)
Expand Down Expand Up @@ -113,6 +114,7 @@ module.exports = async config => {
return {
publish,
subscribe,
transport,
}
}

Expand Down
26 changes: 20 additions & 6 deletions transports/test.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require('dotenv').config()
const config = require('openenv')(process.env)
const test = require('tape')
const Local = require('./local')
const Natss = require('./natss')
Expand All @@ -20,14 +22,26 @@ test('transport',t=>{
})
})
t.test('natss',t=>{
let server, client
t.test('init',async t=>{
transport = await Natss({
durableName:'test',
clusterid:'test-cluster',
clientid:'test',
url:'localhost',
server = await Natss({...config.natss,clientid:'test-server'})
t.ok(server)
t.end()
})
t.test('publish',async t=>{
await server.publish('a','test').write('hello')
t.end()
})
t.test('subscribe',async t=>{
t.plan(1)
client = await Natss({...config.natss,durableName:'test',clientid:'test-client'})
await client.subscribe('a','test').each(x=>{
t.equal(x,'hello')
})
t.ok(transport)
})
t.test('close',async t=>{
await client.transport.disconnect()
await server.transport.disconnect()
t.end()
})
})
Expand Down
21 changes: 12 additions & 9 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,12 @@ object-inspect@^1.7.0, object-inspect@~1.7.0:
integrity sha512-a7pEHdh1xKIAgTySUGgLMx/xwDZskN1Ud6egYYN3EdRW4ZMPNEDUTF+hwy2LUC+Bl+SyLXANnwz/jyh/qutKUw==

object-is@^1.0.1:
version "1.0.2"
resolved "https://registry.yarnpkg.com/object-is/-/object-is-1.0.2.tgz#6b80eb84fe451498f65007982f035a5b445edec4"
integrity sha512-Epah+btZd5wrrfjkJZq1AOB9O6OxUQto45hzFd7lXGrpHPGE0W1k+426yrZV+k6NJOzLNNW/nVsmZdIWsAqoOQ==
version "1.1.2"
resolved "https://registry.yarnpkg.com/object-is/-/object-is-1.1.2.tgz#c5d2e87ff9e119f78b7a088441519e2eec1573b6"
integrity sha512-5lHCz+0uufF6wZ7CRFWJN3hp8Jqblpgve06U5CMQ3f//6iDjPr2PEo9MWCjEssDsa+UZEL4PkFpr+BMop6aKzQ==
dependencies:
define-properties "^1.1.3"
es-abstract "^1.17.5"

object-keys@^1.0.11, object-keys@^1.0.12, object-keys@^1.1.1:
version "1.1.1"
Expand Down Expand Up @@ -317,9 +320,9 @@ string.prototype.trim@~1.2.1:
function-bind "^1.1.1"

string.prototype.trimend@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/string.prototype.trimend/-/string.prototype.trimend-1.0.0.tgz#ee497fd29768646d84be2c9b819e292439614373"
integrity sha512-EEJnGqa/xNfIg05SxiPSqRS7S9qwDhYts1TSLR1BQfYUfPe1stofgGKvwERK9+9yf+PpfBMlpBaCHucXGPQfUA==
version "1.0.1"
resolved "https://registry.yarnpkg.com/string.prototype.trimend/-/string.prototype.trimend-1.0.1.tgz#85812a6b847ac002270f5808146064c995fb6913"
integrity sha512-LRPxFUaTtpqYsTeNKaFOw3R4bxIzWOnbQ837QfBylo8jIxtcbK/A/sMV7Q+OAV/vWo+7s25pOE10KYSjaSO06g==
dependencies:
define-properties "^1.1.3"
es-abstract "^1.17.5"
Expand All @@ -343,9 +346,9 @@ string.prototype.trimright@^2.1.1:
string.prototype.trimend "^1.0.0"

string.prototype.trimstart@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/string.prototype.trimstart/-/string.prototype.trimstart-1.0.0.tgz#afe596a7ce9de905496919406c9734845f01a2f2"
integrity sha512-iCP8g01NFYiiBOnwG1Xc3WZLyoo+RuBymwIlWncShXDDJYWN6DbnM3odslBJdgCdRlq94B5s63NWAZlcn2CS4w==
version "1.0.1"
resolved "https://registry.yarnpkg.com/string.prototype.trimstart/-/string.prototype.trimstart-1.0.1.tgz#14af6d9f34b053f7cfc89b72f8f2ee14b9039a54"
integrity sha512-XxZn+QpvrBI1FOcg6dIpxUPgWCPuNXvMD72aaRaUQv1eD4e/Qy8i/hFTe0BUmD60p/QA6bh1avmuPTfNjqVWRw==
dependencies:
define-properties "^1.1.3"
es-abstract "^1.17.5"
Expand Down

0 comments on commit ee58d9f

Please sign in to comment.