Skip to content

Commit 58c9fb9

Browse files
authored
feat: select early muxer (#482)
Implements early multiplexer negotiation to save multistream-select round trips when a connection is opened. Spec: https://github.com/libp2p/specs/blob/6d38f88f7b2d16b0e4489298bcd0737a6d704f7e/connections/inlined-muxer-negotiation.md#multiplexer-negotiation-over-noise
1 parent 0029324 commit 58c9fb9

File tree

10 files changed

+1882
-590
lines changed

10 files changed

+1882
-590
lines changed

package-lock.json

Lines changed: 1644 additions & 547 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@
163163
"@noble/ciphers": "^1.1.3",
164164
"@noble/curves": "^1.1.0",
165165
"@noble/hashes": "^1.3.1",
166-
"it-length-prefixed": "^9.0.1",
166+
"it-length-prefixed": "^10.0.1",
167167
"it-length-prefixed-stream": "^1.0.0",
168168
"it-pair": "^2.0.6",
169169
"it-pipe": "^3.0.1",
@@ -194,7 +194,8 @@
194194
"multiformats": "^13.2.2",
195195
"p-defer": "^4.0.0",
196196
"protons": "^7.6.0",
197-
"sinon": "^19.0.2"
197+
"sinon": "^19.0.2",
198+
"sinon-ts": "^2.0.0"
198199
},
199200
"browser": {
200201
"./dist/src/crypto/index.js": "./dist/src/crypto/index.browser.js"

src/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,16 @@
3737
*/
3838

3939
import { Noise } from './noise.js'
40-
import type { NoiseInit } from './noise.js'
41-
import type { NoiseExtensions } from './proto/payload.js'
42-
import type { ComponentLogger, ConnectionEncrypter, Metrics, PeerId, PrivateKey } from '@libp2p/interface'
40+
import type { NoiseInit, NoiseExtensions } from './noise.js'
41+
import type { ComponentLogger, ConnectionEncrypter, Metrics, PeerId, PrivateKey, Upgrader } from '@libp2p/interface'
4342
export type { ICryptoInterface } from './crypto.js'
4443
export { pureJsCrypto } from './crypto/js.js'
4544

4645
export interface NoiseComponents {
4746
peerId: PeerId
4847
privateKey: PrivateKey
4948
logger: ComponentLogger
49+
upgrader: Upgrader
5050
metrics?: Metrics
5151
}
5252

src/noise.ts

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { publicKeyFromProtobuf } from '@libp2p/crypto/keys'
2-
import { serviceCapabilities } from '@libp2p/interface'
2+
import { InvalidCryptoExchangeError, serviceCapabilities } from '@libp2p/interface'
33
import { peerIdFromPublicKey } from '@libp2p/peer-id'
44
import { decode } from 'it-length-prefixed'
55
import { lpStream, type LengthPrefixedStream } from 'it-length-prefixed-stream'
@@ -14,18 +14,21 @@ import { type MetricsRegistry, registerMetrics } from './metrics.js'
1414
import { performHandshakeInitiator, performHandshakeResponder } from './performHandshake.js'
1515
import { decryptStream, encryptStream } from './streaming.js'
1616
import type { NoiseComponents } from './index.js'
17-
import type { NoiseExtensions } from './proto/payload.js'
1817
import type { HandshakeResult, ICrypto, INoiseConnection, KeyPair } from './types.js'
19-
import type { MultiaddrConnection, SecuredConnection, PeerId, PrivateKey, PublicKey, AbortOptions } from '@libp2p/interface'
18+
import type { MultiaddrConnection, SecuredConnection, PeerId, PrivateKey, PublicKey, AbortOptions, StreamMuxerFactory } from '@libp2p/interface'
2019
import type { Duplex } from 'it-stream-types'
2120
import type { Uint8ArrayList } from 'uint8arraylist'
2221

22+
export interface NoiseExtensions {
23+
webtransportCerthashes: Uint8Array[]
24+
}
25+
2326
export interface NoiseInit {
2427
/**
2528
* x25519 private key, reuse for faster handshakes
2629
*/
2730
staticNoiseKey?: Uint8Array
28-
extensions?: NoiseExtensions
31+
extensions?: Partial<NoiseExtensions>
2932
crypto?: ICryptoInterface
3033
prologueBytes?: Uint8Array
3134
}
@@ -47,7 +50,10 @@ export class Noise implements INoiseConnection {
4750
this.components = components
4851
const _crypto = crypto ?? defaultCrypto
4952
this.crypto = wrapCrypto(_crypto)
50-
this.extensions = extensions
53+
this.extensions = {
54+
webtransportCerthashes: [],
55+
...extensions
56+
}
5157
this.metrics = metrics ? registerMetrics(metrics) : undefined
5258

5359
if (staticNoiseKey) {
@@ -100,7 +106,30 @@ export class Noise implements INoiseConnection {
100106
return {
101107
conn: connection,
102108
remoteExtensions: handshake.payload.extensions,
103-
remotePeer: peerIdFromPublicKey(publicKey)
109+
remotePeer: peerIdFromPublicKey(publicKey),
110+
streamMuxer: this.getStreamMuxer(handshake.payload.extensions?.streamMuxers)
111+
}
112+
}
113+
114+
private getStreamMuxer (protocols?: string[]): StreamMuxerFactory | undefined {
115+
if (protocols == null) {
116+
return
117+
}
118+
119+
const streamMuxers = this.components.upgrader.getStreamMuxers()
120+
121+
if (streamMuxers != null) {
122+
for (const protocol of protocols) {
123+
const streamMuxer = streamMuxers.get(protocol)
124+
125+
if (streamMuxer != null) {
126+
return streamMuxer
127+
}
128+
}
129+
}
130+
131+
if (protocols.length) {
132+
throw new InvalidCryptoExchangeError('Early muxer negotiation was requested but the initiator and responder had no common muxers')
104133
}
105134
}
106135

@@ -138,7 +167,8 @@ export class Noise implements INoiseConnection {
138167
return {
139168
conn: connection,
140169
remoteExtensions: handshake.payload.extensions,
141-
remotePeer: peerIdFromPublicKey(publicKey)
170+
remotePeer: peerIdFromPublicKey(publicKey),
171+
streamMuxer: this.getStreamMuxer(handshake.payload.extensions?.streamMuxers)
142172
}
143173
}
144174

@@ -162,7 +192,11 @@ export class Noise implements INoiseConnection {
162192
crypto: this.crypto,
163193
prologue: this.prologue,
164194
s: this.staticKey,
165-
extensions: this.extensions
195+
extensions: {
196+
streamMuxers: [...this.components.upgrader.getStreamMuxers().keys()],
197+
webtransportCerthashes: [],
198+
...this.extensions
199+
}
166200
}, options)
167201
this.metrics?.xxHandshakeSuccesses.increment()
168202
} catch (e: unknown) {
@@ -192,7 +226,11 @@ export class Noise implements INoiseConnection {
192226
crypto: this.crypto,
193227
prologue: this.prologue,
194228
s: this.staticKey,
195-
extensions: this.extensions
229+
extensions: {
230+
streamMuxers: [...this.components.upgrader.getStreamMuxers().keys()],
231+
webtransportCerthashes: [],
232+
...this.extensions
233+
}
196234
}, options)
197235
this.metrics?.xxHandshakeSuccesses.increment()
198236
} catch (e: unknown) {

src/proto/payload.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ syntax = "proto3";
22

33
message NoiseExtensions {
44
repeated bytes webtransport_certhashes = 1;
5+
repeated string stream_muxers = 2;
56
}
67

78
message NoiseHandshakePayload {
89
bytes identity_key = 1;
910
bytes identity_sig = 2;
1011
optional NoiseExtensions extensions = 4;
11-
}
12+
}

src/proto/payload.ts

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
55
/* eslint-disable @typescript-eslint/no-empty-interface */
66

7-
import { type Codec, decodeMessage, encodeMessage, message } from 'protons-runtime'
7+
import { type Codec, decodeMessage, type DecodeOptions, encodeMessage, MaxLengthError, message } from 'protons-runtime'
88
import { alloc as uint8ArrayAlloc } from 'uint8arrays/alloc'
99
import type { Uint8ArrayList } from 'uint8arraylist'
1010

1111
export interface NoiseExtensions {
1212
webtransportCerthashes: Uint8Array[]
13+
streamMuxers: string[]
1314
}
1415

1516
export namespace NoiseExtensions {
@@ -29,12 +30,20 @@ export namespace NoiseExtensions {
2930
}
3031
}
3132

33+
if (obj.streamMuxers != null) {
34+
for (const value of obj.streamMuxers) {
35+
w.uint32(18)
36+
w.string(value)
37+
}
38+
}
39+
3240
if (opts.lengthDelimited !== false) {
3341
w.ldelim()
3442
}
35-
}, (reader, length) => {
43+
}, (reader, length, opts = {}) => {
3644
const obj: any = {
37-
webtransportCerthashes: []
45+
webtransportCerthashes: [],
46+
streamMuxers: []
3847
}
3948

4049
const end = length == null ? reader.len : reader.pos + length
@@ -44,9 +53,21 @@ export namespace NoiseExtensions {
4453

4554
switch (tag >>> 3) {
4655
case 1: {
56+
if (opts.limits?.webtransportCerthashes != null && obj.webtransportCerthashes.length === opts.limits.webtransportCerthashes) {
57+
throw new MaxLengthError('Decode error - map field "webtransportCerthashes" had too many elements')
58+
}
59+
4760
obj.webtransportCerthashes.push(reader.bytes())
4861
break
4962
}
63+
case 2: {
64+
if (opts.limits?.streamMuxers != null && obj.streamMuxers.length === opts.limits.streamMuxers) {
65+
throw new MaxLengthError('Decode error - map field "streamMuxers" had too many elements')
66+
}
67+
68+
obj.streamMuxers.push(reader.string())
69+
break
70+
}
5071
default: {
5172
reader.skipType(tag & 7)
5273
break
@@ -65,8 +86,8 @@ export namespace NoiseExtensions {
6586
return encodeMessage(obj, NoiseExtensions.codec())
6687
}
6788

68-
export const decode = (buf: Uint8Array | Uint8ArrayList): NoiseExtensions => {
69-
return decodeMessage(buf, NoiseExtensions.codec())
89+
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<NoiseExtensions>): NoiseExtensions => {
90+
return decodeMessage(buf, NoiseExtensions.codec(), opts)
7091
}
7192
}
7293

@@ -104,7 +125,7 @@ export namespace NoiseHandshakePayload {
104125
if (opts.lengthDelimited !== false) {
105126
w.ldelim()
106127
}
107-
}, (reader, length) => {
128+
}, (reader, length, opts = {}) => {
108129
const obj: any = {
109130
identityKey: uint8ArrayAlloc(0),
110131
identitySig: uint8ArrayAlloc(0)
@@ -125,7 +146,9 @@ export namespace NoiseHandshakePayload {
125146
break
126147
}
127148
case 4: {
128-
obj.extensions = NoiseExtensions.codec().decode(reader, reader.uint32())
149+
obj.extensions = NoiseExtensions.codec().decode(reader, reader.uint32(), {
150+
limits: opts.limits?.extensions
151+
})
129152
break
130153
}
131154
default: {
@@ -146,7 +169,7 @@ export namespace NoiseHandshakePayload {
146169
return encodeMessage(obj, NoiseHandshakePayload.codec())
147170
}
148171

149-
export const decode = (buf: Uint8Array | Uint8ArrayList): NoiseHandshakePayload => {
150-
return decodeMessage(buf, NoiseHandshakePayload.codec())
172+
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<NoiseHandshakePayload>): NoiseHandshakePayload => {
173+
return decodeMessage(buf, NoiseHandshakePayload.codec(), opts)
151174
}
152175
}

src/types.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,8 @@ export interface KeyPair {
8181
privateKey: Uint8Array
8282
}
8383

84-
export interface INoiseConnection extends ConnectionEncrypter<NoiseExtensions> { }
84+
export interface INoiseExtensions {
85+
webtransportCerthashes: Uint8Array[]
86+
}
87+
88+
export interface INoiseConnection extends ConnectionEncrypter<INoiseExtensions> { }

test/compliance.spec.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ import { generateKeyPair } from '@libp2p/crypto/keys'
22
import tests from '@libp2p/interface-compliance-tests/connection-encryption'
33
import { defaultLogger } from '@libp2p/logger'
44
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
5+
import { stubInterface } from 'sinon-ts'
56
import { Noise } from '../src/noise.js'
7+
import type { Upgrader } from '@libp2p/interface'
68

79
describe('spec compliance tests', function () {
810
tests({
@@ -13,7 +15,10 @@ describe('spec compliance tests', function () {
1315
return new Noise({
1416
privateKey,
1517
peerId,
16-
logger: defaultLogger()
18+
logger: defaultLogger(),
19+
upgrader: stubInterface<Upgrader>({
20+
getStreamMuxers: () => new Map()
21+
})
1722
})
1823
},
1924
async teardown () {}

test/index.spec.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ import { expect } from 'aegir/chai'
55
import { lpStream } from 'it-length-prefixed-stream'
66
import { duplexPair } from 'it-pair/duplex'
77
import sinon from 'sinon'
8+
import { stubInterface } from 'sinon-ts'
89
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
910
import { noise } from '../src/index.js'
1011
import { Noise } from '../src/noise.js'
11-
import type { Metrics } from '@libp2p/interface'
12+
import type { Metrics, Upgrader } from '@libp2p/interface'
1213
import type { Uint8ArrayList } from 'uint8arraylist'
1314

1415
function createCounterSpy (): ReturnType<typeof sinon.spy> {
@@ -26,7 +27,10 @@ describe('Index', () => {
2627
const noiseInstance = noise()({
2728
privateKey,
2829
peerId,
29-
logger: defaultLogger()
30+
logger: defaultLogger(),
31+
upgrader: stubInterface<Upgrader>({
32+
getStreamMuxers: () => new Map()
33+
})
3034
})
3135
expect(noiseInstance.protocol).to.equal('/noise')
3236
expect(typeof (noiseInstance.secureInbound)).to.equal('function')
@@ -49,15 +53,21 @@ describe('Index', () => {
4953
privateKey: privateKeyInit,
5054
peerId: peerIdInit,
5155
logger: defaultLogger(),
52-
metrics: metrics as any as Metrics
56+
metrics: metrics as any as Metrics,
57+
upgrader: stubInterface<Upgrader>({
58+
getStreamMuxers: () => new Map()
59+
})
5360
})
5461

5562
const privateKeyResp = await generateKeyPair('Ed25519')
5663
const peerIdResp = peerIdFromPrivateKey(privateKeyResp)
5764
const noiseResp = new Noise({
5865
privateKey: privateKeyResp,
5966
peerId: peerIdResp,
60-
logger: defaultLogger()
67+
logger: defaultLogger(),
68+
upgrader: stubInterface<Upgrader>({
69+
getStreamMuxers: () => new Map()
70+
})
6171
})
6272

6373
const [inboundConnection, outboundConnection] = duplexPair<Uint8Array | Uint8ArrayList>()

0 commit comments

Comments
 (0)