Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move setMaxListeners to @libp2p/interface #2154

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/interface/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
"!dist/test",
"!**/*.tsbuildinfo"
],
"browser": {
"events": "./dist/src/events.browser.js"
},
"exports": {
".": {
"types": "./dist/src/index.d.ts",
Expand Down
2 changes: 2 additions & 0 deletions packages/interface/src/events.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/** Noop for browser compatibility */
export function setMaxListeners (): void {}
11 changes: 11 additions & 0 deletions packages/interface/src/events.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { setMaxListeners as nodeSetMaxListeners } from 'events'

export interface EventCallback<EventType> { (evt: EventType): void }
export interface EventObject<EventType> { handleEvent: EventCallback<EventType> }
export type EventHandler<EventType> = EventCallback<EventType> | EventObject<EventType>
Expand Down Expand Up @@ -117,3 +119,12 @@ export const CustomEvent = globalThis.CustomEvent ?? CustomEventPolyfill

// TODO: remove this in v1
export { TypedEventEmitter as EventEmitter }

// create a setMaxListeners that doesn't break browser usage
export const setMaxListeners: typeof nodeSetMaxListeners = (n, ...eventTargets) => {
try {
nodeSetMaxListeners(n, ...eventTargets)
} catch {
// swallow error, gulp
}
}
3 changes: 1 addition & 2 deletions packages/kad-dht/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"test:chrome-webworker": "aegir test -t webworker",
"test:firefox": "aegir test -t browser -- --browser firefox",
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
"dep-check": "aegir dep-check -i events"
"dep-check": "aegir dep-check"
},
"dependencies": {
"@libp2p/crypto": "^2.0.6",
Expand All @@ -63,7 +63,6 @@
"abortable-iterator": "^5.0.1",
"any-signal": "^4.1.1",
"datastore-core": "^9.0.1",
"events": "^3.3.0",
"hashlru": "^2.3.0",
"interface-datastore": "^8.2.0",
"it-all": "^3.0.2",
Expand Down
8 changes: 2 additions & 6 deletions packages/kad-dht/src/query-self.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { setMaxListeners } from 'events'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger, type Logger } from '@libp2p/logger'
import { anySignal } from 'any-signal'
import length from 'it-length'
Expand Down Expand Up @@ -110,11 +110,7 @@ export class QuerySelf implements Startable {
const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)])

// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, signal)
}
} catch {} // fails on node < 15.4
setMaxListeners(Infinity, signal)

try {
if (this.routingTable.size === 0) {
Expand Down
21 changes: 4 additions & 17 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { setMaxListeners } from 'events'
import { AbortError } from '@libp2p/interface/errors'
import { TypedEventEmitter, CustomEvent } from '@libp2p/interface/events'
import { TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { PeerSet } from '@libp2p/peer-collections'
import { anySignal } from 'any-signal'
Expand Down Expand Up @@ -75,11 +74,7 @@ export class QueryManager implements Startable {
// allow us to stop queries on shut down
this.shutDownController = new AbortController()
// make sure we don't make a lot of noise in the logs
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, this.shutDownController.signal)
}
} catch {} // fails on node < 15.4
setMaxListeners(Infinity, this.shutDownController.signal)
}

isStarted (): boolean {
Expand Down Expand Up @@ -122,22 +117,14 @@ export class QueryManager implements Startable {

// this signal will get listened to for network requests, etc
// so make sure we don't make a lot of noise in the logs
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, options.signal)
}
} catch {} // fails on node < 15.4
setMaxListeners(Infinity, options.signal)
}

const signal = anySignal([this.shutDownController.signal, options.signal])

// this signal will get listened to for every invocation of queryFunc
// so make sure we don't make a lot of noise in the logs
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, signal)
}
} catch {} // fails on node < 15.4
setMaxListeners(Infinity, signal)

const log = logger(`libp2p:kad-dht:${this.lan ? 'lan' : 'wan'}:query:` + uint8ArrayToString(key, 'base58btc'))

Expand Down
3 changes: 1 addition & 2 deletions packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
"scripts": {
"clean": "aegir clean",
"lint": "aegir lint",
"dep-check": "aegir dep-check -i events",
"dep-check": "aegir dep-check",
"prepublishOnly": "node scripts/update-version.js && npm run build",
"build": "aegir build",
"generate": "run-s generate:proto:*",
Expand Down Expand Up @@ -139,7 +139,6 @@
"any-signal": "^4.1.1",
"datastore-core": "^9.0.1",
"delay": "^6.0.0",
"events": "^3.3.0",
"interface-datastore": "^8.2.0",
"it-all": "^3.0.2",
"it-drain": "^3.0.2",
Expand Down
12 changes: 3 additions & 9 deletions packages/libp2p/src/autonat/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
* ```
*/

import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
Expand Down Expand Up @@ -164,10 +164,7 @@ class DefaultAutoNATService implements Startable {

// this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning
// appearing in the console
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)

const ourHosts = this.components.addressManager.getAddresses()
.map(ma => ma.toOptions().host)
Expand Down Expand Up @@ -432,10 +429,7 @@ class DefaultAutoNATService implements Startable {

// this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning
// appearing in the console
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)

const self = this

Expand Down
8 changes: 2 additions & 6 deletions packages/libp2p/src/circuit-relay/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { setMaxListeners } from 'events'
import { TypedEventEmitter } from '@libp2p/interface/events'
import { TypedEventEmitter, setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { RecordEnvelope } from '@libp2p/peer-record'
Expand Down Expand Up @@ -134,10 +133,7 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
this.maxOutboundHopStreams = init.maxOutboundHopStreams
this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, this.shutdownController.signal)
} catch { }
setMaxListeners(Infinity, this.shutdownController.signal)

if (init.advertise != null && init.advertise !== false) {
this.advertService = new AdvertService(components, init.advertise === true ? undefined : init.advertise)
Expand Down
7 changes: 2 additions & 5 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { setMaxListeners } from 'events'
import { AbortError, CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { type Multiaddr, type Resolver, resolvers } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -96,10 +96,7 @@ export class DialQueue {
this.transportManager = components.transportManager
this.shutDownController = new AbortController()

try {
// This emitter gets listened to a lot
setMaxListeners?.(Infinity, this.shutDownController.signal)
} catch {}
setMaxListeners(Infinity, this.shutDownController.signal)

this.pendingDialCount = components.metrics?.registerMetric('libp2p_dialler_pending_dials')
this.inProgressDialCount = components.metrics?.registerMetric('libp2p_dialler_in_progress_dials')
Expand Down
12 changes: 3 additions & 9 deletions packages/libp2p/src/connection-manager/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { setMaxListeners } from 'events'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { type AbortOptions, multiaddr, type Multiaddr } from '@multiformats/multiaddr'
import { type ClearableSignal, anySignal } from 'any-signal'
Expand Down Expand Up @@ -55,21 +55,15 @@ export function combineSignals (...signals: Array<AbortSignal | undefined>): Cle

for (const sig of signals) {
if (sig != null) {
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, sig)
} catch { }
setMaxListeners(Infinity, sig)
sigs.push(sig)
}
}

// let any signal abort the dial
const signal = anySignal(sigs)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)

return signal
}
7 changes: 2 additions & 5 deletions packages/libp2p/src/connection/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { setMaxListeners } from 'events'
import { symbol } from '@libp2p/interface/connection'
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import type { AbortOptions } from '@libp2p/interface'
import type { Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -152,10 +152,7 @@ export class ConnectionImpl implements Connection {

options.signal = options?.signal ?? AbortSignal.timeout(CLOSE_TIMEOUT)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, options.signal)
} catch { }
setMaxListeners(Infinity, options.signal)

try {
log.trace('closing all streams')
Expand Down
7 changes: 2 additions & 5 deletions packages/libp2p/src/fetch/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import first from 'it-first'
import * as lp from 'it-length-prefixed'
Expand Down Expand Up @@ -146,10 +146,7 @@ class DefaultFetchService implements Startable, FetchService {
log('using default timeout of %d ms', this.init.timeout)
signal = AbortSignal.timeout(this.init.timeout ?? DEFAULT_TIMEOUT)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)
}

try {
Expand Down
12 changes: 3 additions & 9 deletions packages/libp2p/src/identify/identify.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { peerIdFromKeys } from '@libp2p/peer-id'
import { RecordEnvelope, PeerRecord } from '@libp2p/peer-record'
Expand Down Expand Up @@ -187,10 +187,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {

const signal = AbortSignal.timeout(this.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)

try {
stream = await connection.newStream([this.identifyPushProtocolStr], {
Expand Down Expand Up @@ -345,10 +342,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {

const signal = AbortSignal.timeout(this.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)

try {
const publicKey = this.peerId.publicKey ?? new Uint8Array(0)
Expand Down
9 changes: 3 additions & 6 deletions packages/libp2p/src/libp2p.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { setMaxListeners } from 'events'
import { unmarshalPublicKey } from '@libp2p/crypto/keys'
import { type ContentRouting, contentRouting } from '@libp2p/interface/content-routing'
import { CodeError } from '@libp2p/interface/errors'
import { TypedEventEmitter, CustomEvent } from '@libp2p/interface/events'
import { TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface/events'
import { peerDiscovery } from '@libp2p/interface/peer-discovery'
import { type PeerRouting, peerRouting } from '@libp2p/interface/peer-routing'
import { DefaultKeyChain } from '@libp2p/keychain'
Expand Down Expand Up @@ -70,10 +69,8 @@ export class Libp2pNode<T extends ServiceMap = Record<string, unknown>> extends
return internalResult || externalResult
}

try {
// This emitter gets listened to a lot
setMaxListeners?.(Infinity, events)
} catch {}
// This emitter gets listened to a lot
setMaxListeners(Infinity, events)

this.#started = false
this.peerId = init.peerId
Expand Down
12 changes: 3 additions & 9 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import * as mss from '@libp2p/multistream-select'
import { peerIdFromString } from '@libp2p/peer-id'
Expand Down Expand Up @@ -170,10 +170,7 @@ export class DefaultUpgrader implements Upgrader {

signal.addEventListener('abort', onAbort, { once: true })

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch { }
setMaxListeners(Infinity, signal)

try {
if ((await this.components.connectionGater.denyInboundConnection?.(maConn)) === true) {
Expand Down Expand Up @@ -444,10 +441,7 @@ export class DefaultUpgrader implements Upgrader {

options.signal = AbortSignal.timeout(30000)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, options.signal)
} catch { }
setMaxListeners(Infinity, options.signal)
}

const { stream, protocol } = await mss.select(muxedStream, protocols, options)
Expand Down
Loading