Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export interface PendingDialTarget {
interface DialQueueJobOptions extends PriorityQueueJobOptions, ProgressOptions<OpenConnectionProgressEvents> {
peerId?: PeerId
multiaddrs: Set<string>
force?: boolean
}

interface DialerInit {
Expand Down Expand Up @@ -136,9 +137,12 @@ export class DialQueue {
*/
async dial (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: OpenConnectionOptions = {}): Promise<Connection> {
const { peerId, multiaddrs } = getPeerAddress(peerIdOrMultiaddr)
const { force } = options

if (peerId != null && options.force !== true) {
const existingConnection = findExistingConnection(peerId, this.connections.get(peerId), multiaddrs)
// make sure we don't have an existing connection to any of the addresses we
// are about to dial
if (options.force !== true) {
const existingConnection = findExistingConnection([...this.connections.values()].flat(), multiaddrs, peerId)

if (existingConnection != null) {
this.log('already connected to %a', existingConnection.remoteAddr)
Expand Down Expand Up @@ -209,6 +213,7 @@ export class DialQueue {
peerId,
priority: options.priority ?? DEFAULT_DIAL_PRIORITY,
multiaddrs: new Set(multiaddrs.map(ma => ma.toString())),
force,
signal: options.signal ?? AbortSignal.timeout(this.dialTimeout),
onProgress: options.onProgress
})
Expand Down
18 changes: 11 additions & 7 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,9 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
throw new InvalidPeerIdError('Can not dial self')
}

if (peerId != null && options.force !== true) {
if (options.force !== true) {
this.log('dial %p', peerId)
const existingConnection = findExistingConnection(peerId, this.getConnections(peerId), multiaddrs)
const existingConnection = findExistingConnection(this.getConnections(), multiaddrs, peerId)

if (existingConnection != null) {
this.log('had an existing connection to %p as %a', peerId, existingConnection.remoteAddr)
Expand Down Expand Up @@ -595,12 +595,16 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
trackedConnection = true
}

// make sure we don't already have a connection to this multiaddr
if (options.force !== true && conn.id !== connection.id && conn.remoteAddr.equals(connection.remoteAddr)) {
connection.abort(new InvalidMultiaddrError('Duplicate multiaddr connection'))
// make sure we don't already have a connection to this peer
if (options.force !== true && conn.id !== connection.id && conn.remotePeer.equals(connection.remotePeer)) {
// choose the existing connection if it is either unlimited or both
// the old and the new connection are limited
if (conn.limits == null || (conn.limits != null && connection.limits != null)) {
connection.abort(new InvalidMultiaddrError('Duplicate multiaddr connection'))

// return the existing connection
return conn
// return the existing connection
return conn
}
}
}

Expand Down
35 changes: 31 additions & 4 deletions packages/libp2p/src/connection-manager/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { IpNet } from '@chainsafe/netmask'
import { InvalidParametersError } from '@libp2p/interface'
import { getNetConfig } from '@libp2p/utils'
import { CODE_P2P, multiaddr } from '@multiformats/multiaddr'
import { Circuit } from '@multiformats/multiaddr-matcher'
import type { Connection, AbortOptions, PeerId } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -105,17 +106,43 @@ export function isDirect (ma: Multiaddr): boolean {
return !Circuit.exactMatch(ma)
}

/**
* Strips the remote peer's peer id from the multiaddr if it is present
*/
function stripPeerId (ma: Multiaddr): Multiaddr {
if (Circuit.exactMatch(ma)) {
return multiaddr(`${ma.toString().split('/p2p-circuit')[0]}/p2p-circuit`)
}

if (ma.getComponents().some(c => c.code === CODE_P2P)) {
return ma.decapsulateCode(CODE_P2P)
}

return ma
}

/**
* If there is an existing non-limited connection to the remote peer return it,
* unless it is indirect and at least one of the passed dial addresses would
* result in a direct connection
*/
export function findExistingConnection (peerId?: PeerId, connections?: Connection[], dialAddresses?: Multiaddr[]): Connection | undefined {
if (peerId == null || connections == null) {
return
}
export function findExistingConnection (connections: Connection[], dialAddresses: Multiaddr[], peerId?: PeerId): Connection | undefined {
// strip PeerId for address comparison if it is present as we may be dialing
// a multiaddr without a peer id
const dialAddressesWithoutPeerId = dialAddresses.map(stripPeerId)

const existingConnection = connections
.filter(conn => {
// check remote peer
if (peerId != null) {
return conn.remotePeer.equals(peerId)
}

// check multiaddr
return dialAddressesWithoutPeerId.some(ma => {
return ma.equals(stripPeerId(conn.remoteAddr))
})
})
.sort((a, b) => {
if (a.direct) {
return -1
Expand Down
92 changes: 92 additions & 0 deletions packages/libp2p/test/connection-manager/dial-queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import { generateKeyPair } from '@libp2p/crypto/keys'
import { NotFoundError } from '@libp2p/interface'
import { peerLogger } from '@libp2p/logger'
import { PeerMap } from '@libp2p/peer-collections'
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import { TCP, WebRTC } from '@multiformats/multiaddr-matcher'
Expand Down Expand Up @@ -381,6 +382,97 @@ describe('dial queue', () => {
await expect(dialer.dial(remotePeer)).to.eventually.equal(connection)
})

it('should return new connection when dialing a different multiaddr without a peer id', async () => {
const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
const ip = multiaddr('/ip4/123.123.123.123')

// same host but different ports
const addr1 = ip.encapsulate('/tcp/123')
const addr2 = ip.encapsulate('/tcp/321')

const existingConnection = stubInterface<Connection>({
limits: undefined,
remotePeer,
remoteAddr: addr1.encapsulate(`/p2p/${remotePeer}`),
status: 'open'
})

const newConnection = stubInterface<Connection>({
limits: undefined,
remotePeer,
remoteAddr: addr2.encapsulate(`/p2p/${remotePeer}`),
status: 'open'
})

const connections = new PeerMap<Connection[]>()
connections.set(remotePeer, [existingConnection])

components.transportManager.dialTransportForMultiaddr.callsFake(ma => stubInterface<Transport>())
components.transportManager.dial.callsFake(async (ma, opts = {}) => newConnection)
dialer = new DialQueue(components, { connections })

await expect(dialer.dial(addr2)).to.eventually.equal(newConnection, 'did not create new connection to different multiaddr')
})

it('should return existing connection when dialing the same multiaddr without a peer id', async () => {
const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
const addr = multiaddr('/ip4/123.123.123.123/tcp/123')

const existingConnection = stubInterface<Connection>({
limits: undefined,
remotePeer,
remoteAddr: addr.encapsulate(`/p2p/${remotePeer}`),
status: 'open'
})

const newConnection = stubInterface<Connection>({
limits: undefined,
remotePeer,
remoteAddr: addr.encapsulate(`/p2p/${remotePeer}`),
status: 'open'
})

const connections = new PeerMap<Connection[]>()
connections.set(remotePeer, [existingConnection])

components.transportManager.dialTransportForMultiaddr.callsFake(ma => stubInterface<Transport>())
components.transportManager.dial.callsFake(async (ma, opts = {}) => newConnection)
dialer = new DialQueue(components, { connections })

await expect(dialer.dial(addr)).to.eventually.equal(existingConnection, 'did not return existing connection')
})

it('should return new connection when dialing a multiaddr that would result in a direct connection', async () => {
const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
const relayPeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
const addr = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${remotePeer}`)

const existingConnection = stubInterface<Connection>({
limits: {
bytes: 100n
},
remotePeer,
remoteAddr: multiaddr(`/ip4/124.124.124.124/tcp/123/p2p/${relayPeer}/p2p-circuit/p2p/${remotePeer}`),
status: 'open'
})

const newConnection = stubInterface<Connection>({
limits: undefined,
remotePeer,
remoteAddr: addr.encapsulate(`/p2p/${remotePeer}`),
status: 'open'
})

const connections = new PeerMap<Connection[]>()
connections.set(remotePeer, [existingConnection])

components.transportManager.dialTransportForMultiaddr.callsFake(ma => stubInterface<Transport>())
components.transportManager.dial.callsFake(async (ma, opts = {}) => newConnection)
dialer = new DialQueue(components, { connections })

await expect(dialer.dial(addr)).to.eventually.equal(newConnection, 'did not create direct connection where limited connection existed previously')
})

it('should respect user dial signal over default timeout if it is passed', async () => {
const dialTimeout = 10
const userTimeout = 500
Expand Down
88 changes: 88 additions & 0 deletions packages/libp2p/test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ describe('Connection Manager', () => {
status: 'open'
})
const newConnection = stubInterface<Connection>({
limits: undefined,
remotePeer: targetPeer,
remoteAddr: addr,
status: 'open'
Expand All @@ -421,6 +422,93 @@ describe('Connection Manager', () => {
expect(conn).to.equal(newConnection)
})

it('should return existing connection when dialing a different multiaddr without a peer id that resolves to the same peer', async () => {
connectionManager = new DefaultConnectionManager(components, {
...defaultOptions,
maxIncomingPendingConnections: 1
})
await connectionManager.start()

const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
const ip = multiaddr('/ip4/123.123.123.123')

// same host but different ports
const addr1 = ip.encapsulate('/tcp/123')
const addr2 = ip.encapsulate('/tcp/321')

const existingConnection = stubInterface<Connection>({
limits: undefined,
remotePeer,
remoteAddr: addr1.encapsulate(`/p2p/${remotePeer}`),
status: 'open'
})

const newConnection = stubInterface<Connection>({
limits: undefined,
remotePeer,
remoteAddr: addr2.encapsulate(`/p2p/${remotePeer}`),
status: 'open'
})

const dialStub = sinon.stub(connectionManager.dialQueue, 'dial')

dialStub.withArgs(addr1)
.resolves(existingConnection)

await expect(connectionManager.openConnection(addr1)).to.eventually.equal(existingConnection, 'did not open first connection')

dialStub.withArgs(addr2)
.resolves(newConnection)

await expect(connectionManager.openConnection(addr2)).to.eventually.equal(existingConnection, 'did not reuse existing connection to same peer with different multiaddr')
expect(newConnection.abort.called).to.be.true('did not abort duplicate connection')
})

it('should return existing limited connection when dialing a different multiaddr results in a connection that is also limited', async () => {
connectionManager = new DefaultConnectionManager(components, {
...defaultOptions,
maxIncomingPendingConnections: 1
})
await connectionManager.start()

const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))

// same host but different ports
const addr1 = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/16Uiu2HAm2dSCBFxuge46aEt7U1oejtYuBUZXxASHqmcfVmk4gsbx/p2p-circuit/p2p/${remotePeer}`)
const addr2 = multiaddr(`/ip4/123.123.123.123/tcp/321/p2p/16Uiu2HAm2dSCBFxuge46aEt7U1oejtYuBUZXxASHqmcfVmk4gsbx/p2p-circuit/p2p/${remotePeer}`)

const existingConnection = stubInterface<Connection>({
limits: {
bytes: 100n
},
remotePeer,
remoteAddr: addr1.encapsulate(`/p2p/${remotePeer}`),
status: 'open'
})

const newConnection = stubInterface<Connection>({
limits: {
bytes: 100n
},
remotePeer,
remoteAddr: addr2.encapsulate(`/p2p/${remotePeer}`),
status: 'open'
})

const dialStub = sinon.stub(connectionManager.dialQueue, 'dial')

dialStub.withArgs(addr1)
.resolves(existingConnection)

await expect(connectionManager.openConnection(addr1)).to.eventually.equal(existingConnection, 'did not open first connection')

dialStub.withArgs(addr2)
.resolves(newConnection)

await expect(connectionManager.openConnection(addr2)).to.eventually.equal(existingConnection, 'did not reuse existing connection to same peer with different multiaddr')
expect(newConnection.abort.called).to.be.true('did not abort duplicate connection')
})

it('should filter connections on disconnect, removing the closed one', async () => {
connectionManager = new DefaultConnectionManager(components, {
maxConnections: 1000,
Expand Down
Loading
Loading