diff --git a/client.go b/client.go index fba3daee3..610cfee29 100644 --- a/client.go +++ b/client.go @@ -132,6 +132,18 @@ func populateClientConfig(config *Config) *Config { if maxReceiveConnectionFlowControlWindow == 0 { maxReceiveConnectionFlowControlWindow = protocol.DefaultMaxReceiveConnectionFlowControlWindowClient } + maxIncomingStreams := config.MaxIncomingStreams + if maxIncomingStreams == 0 { + maxIncomingStreams = protocol.DefaultMaxIncomingStreams + } else if maxIncomingStreams < 0 { + maxIncomingStreams = 0 + } + maxIncomingUniStreams := config.MaxIncomingUniStreams + if maxIncomingUniStreams == 0 { + maxIncomingUniStreams = protocol.DefaultMaxIncomingUniStreams + } else if maxIncomingUniStreams < 0 { + maxIncomingUniStreams = 0 + } return &Config{ Versions: versions, @@ -140,7 +152,9 @@ func populateClientConfig(config *Config) *Config { RequestConnectionIDOmission: config.RequestConnectionIDOmission, MaxReceiveStreamFlowControlWindow: maxReceiveStreamFlowControlWindow, MaxReceiveConnectionFlowControlWindow: maxReceiveConnectionFlowControlWindow, - KeepAlive: config.KeepAlive, + MaxIncomingStreams: maxIncomingStreams, + MaxIncomingUniStreams: maxIncomingUniStreams, + KeepAlive: config.KeepAlive, } } @@ -171,9 +185,8 @@ func (c *client) dialTLS() error { ConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow, IdleTimeout: c.config.IdleTimeout, OmitConnectionID: c.config.RequestConnectionIDOmission, - // TODO(#523): make these values configurable - MaxBidiStreamID: protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveClient), - MaxUniStreamID: protocol.MaxUniStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveClient), + MaxBidiStreamID: protocol.MaxBidiStreamID(c.config.MaxIncomingStreams, protocol.PerspectiveClient), + MaxUniStreamID: protocol.MaxUniStreamID(c.config.MaxIncomingUniStreams, protocol.PerspectiveClient), } csc := handshake.NewCryptoStreamConn(nil) extHandler := handshake.NewExtensionHandlerClient(params, c.initialVersion, c.config.Versions, c.version) diff --git a/client_test.go b/client_test.go index a955b5ea0..2c2f88bdc 100644 --- a/client_test.go +++ b/client_test.go @@ -206,11 +206,35 @@ var _ = Describe("Client", func() { HandshakeTimeout: 1337 * time.Minute, IdleTimeout: 42 * time.Hour, RequestConnectionIDOmission: true, + MaxIncomingStreams: 1234, + MaxIncomingUniStreams: 4321, } c := populateClientConfig(config) Expect(c.HandshakeTimeout).To(Equal(1337 * time.Minute)) Expect(c.IdleTimeout).To(Equal(42 * time.Hour)) Expect(c.RequestConnectionIDOmission).To(BeTrue()) + Expect(c.MaxIncomingStreams).To(Equal(1234)) + Expect(c.MaxIncomingUniStreams).To(Equal(4321)) + }) + + It("disables bidirectional streams", func() { + config := &Config{ + MaxIncomingStreams: -1, + MaxIncomingUniStreams: 4321, + } + c := populateClientConfig(config) + Expect(c.MaxIncomingStreams).To(BeZero()) + Expect(c.MaxIncomingUniStreams).To(Equal(4321)) + }) + + It("disables unidirectional streams", func() { + config := &Config{ + MaxIncomingStreams: 1234, + MaxIncomingUniStreams: -1, + } + c := populateClientConfig(config) + Expect(c.MaxIncomingStreams).To(Equal(1234)) + Expect(c.MaxIncomingUniStreams).To(BeZero()) }) It("fills in default values if options are not set in the Config", func() { diff --git a/integrationtests/chrome/chrome_test.go b/integrationtests/chrome/chrome_test.go index c2441c4db..1465b61d1 100644 --- a/integrationtests/chrome/chrome_test.go +++ b/integrationtests/chrome/chrome_test.go @@ -54,7 +54,7 @@ var _ = Describe("Chrome tests", func() { }) It("uploads many small files", func() { - num := protocol.MaxIncomingStreams + 20 + num := protocol.DefaultMaxIncomingStreams + 20 chromeTest( version, fmt.Sprintf("https://quic.clemente.io/uploadtest?num=%d&len=%d", num, dataLen), diff --git a/integrationtests/self/stream_test.go b/integrationtests/self/stream_test.go index 234c3a980..58852d021 100644 --- a/integrationtests/self/stream_test.go +++ b/integrationtests/self/stream_test.go @@ -24,13 +24,16 @@ var _ = Describe("Stream tests", func() { qconf *quic.Config ) - for _, v := range []protocol.VersionNumber{protocol.Version39, protocol.VersionTLS} { + for _, v := range []protocol.VersionNumber{protocol.VersionTLS} { version := v Context(fmt.Sprintf("with QUIC %s", version), func() { BeforeEach(func() { var err error - qconf = &quic.Config{Versions: []protocol.VersionNumber{version}} + qconf = &quic.Config{ + Versions: []protocol.VersionNumber{version}, + MaxIncomingStreams: 0, + } server, err = quic.ListenAddr("localhost:0", testdata.GetTLSConfig(), qconf) Expect(err).ToNot(HaveOccurred()) serverAddr = fmt.Sprintf("quic.clemente.io:%d", server.Addr().(*net.UDPAddr).Port) diff --git a/interface.go b/interface.go index aa8320799..e0a72cb43 100644 --- a/interface.go +++ b/interface.go @@ -174,6 +174,14 @@ type Config struct { // MaxReceiveConnectionFlowControlWindow is the connection-level flow control window for receiving data. // If this value is zero, it will default to 1.5 MB for the server and 15 MB for the client. MaxReceiveConnectionFlowControlWindow uint64 + // MaxIncomingStreams is the maximum number of concurrent bidirectional streams that a peer is allowed to open. + // If not set, it will default to 100. + // If set to a negative value, it doesn't allow any bidirectional streams. + MaxIncomingStreams int + // MaxIncomingUniStreams is the maximum number of concurrent unidirectional streams that a peer is allowed to open. + // If not set, it will default to 100. + // If set to a negative value, it doesn't allow any unidirectional streams. + MaxIncomingUniStreams int // KeepAlive defines whether this peer will periodically send PING frames to keep the connection alive. KeepAlive bool } diff --git a/internal/handshake/tls_extension_handler_client.go b/internal/handshake/tls_extension_handler_client.go index 431c62b3a..d45d88b17 100644 --- a/internal/handshake/tls_extension_handler_client.go +++ b/internal/handshake/tls_extension_handler_client.go @@ -3,7 +3,6 @@ package handshake import ( "errors" "fmt" - "math" "github.com/lucas-clemente/quic-go/qerr" @@ -121,8 +120,6 @@ func (h *extensionHandlerClient) Receive(hType mint.HandshakeType, el *mint.Exte if err != nil { return err } - // TODO(#878): remove this when implementing the MAX_STREAM_ID frame - params.MaxStreams = math.MaxUint32 h.paramsChan <- *params return nil } diff --git a/internal/protocol/server_parameters.go b/internal/protocol/server_parameters.go index 61e5a2dfc..dbbce7b70 100644 --- a/internal/protocol/server_parameters.go +++ b/internal/protocol/server_parameters.go @@ -59,8 +59,11 @@ const ConnectionFlowControlMultiplier = 1.5 // WindowUpdateThreshold is the fraction of the receive window that has to be consumed before an higher offset is advertised to the client const WindowUpdateThreshold = 0.25 -// MaxIncomingStreams is the maximum number of streams that a peer may open -const MaxIncomingStreams = 100 +// DefaultMaxIncomingStreams is the maximum number of streams that a peer may open +const DefaultMaxIncomingStreams = 100 + +// DefaultMaxIncomingUniStreams is the maximum number of unidirectional streams that a peer may open +const DefaultMaxIncomingUniStreams = 100 // MaxStreamsMultiplier is the slack the client is allowed for the maximum number of streams per connection, needed e.g. when packets are out of order or dropped. The minimum of this procentual increase and the absolute increment specified by MaxStreamsMinimumIncrement is used. const MaxStreamsMultiplier = 1.1 @@ -70,7 +73,7 @@ const MaxStreamsMinimumIncrement = 10 // MaxNewStreamIDDelta is the maximum difference between and a newly opened Stream and the highest StreamID that a client has ever opened // note that the number of streams is half this value, since the client can only open streams with open StreamID -const MaxNewStreamIDDelta = 4 * MaxIncomingStreams +const MaxNewStreamIDDelta = 4 * DefaultMaxIncomingStreams // MaxSessionUnprocessedPackets is the max number of packets stored in each session that are not yet processed. const MaxSessionUnprocessedPackets = DefaultMaxCongestionWindow diff --git a/server.go b/server.go index 26e153d5d..53d56a4ca 100644 --- a/server.go +++ b/server.go @@ -198,6 +198,18 @@ func populateServerConfig(config *Config) *Config { if maxReceiveConnectionFlowControlWindow == 0 { maxReceiveConnectionFlowControlWindow = protocol.DefaultMaxReceiveConnectionFlowControlWindowServer } + maxIncomingStreams := config.MaxIncomingStreams + if maxIncomingStreams == 0 { + maxIncomingStreams = protocol.DefaultMaxIncomingStreams + } else if maxIncomingStreams < 0 { + maxIncomingStreams = 0 + } + maxIncomingUniStreams := config.MaxIncomingUniStreams + if maxIncomingUniStreams == 0 { + maxIncomingUniStreams = protocol.DefaultMaxIncomingUniStreams + } else if maxIncomingUniStreams < 0 { + maxIncomingUniStreams = 0 + } return &Config{ Versions: versions, @@ -207,6 +219,8 @@ func populateServerConfig(config *Config) *Config { KeepAlive: config.KeepAlive, MaxReceiveStreamFlowControlWindow: maxReceiveStreamFlowControlWindow, MaxReceiveConnectionFlowControlWindow: maxReceiveConnectionFlowControlWindow, + MaxIncomingStreams: maxIncomingStreams, + MaxIncomingUniStreams: maxIncomingUniStreams, } } diff --git a/server_test.go b/server_test.go index f81c3dead..b8aa17536 100644 --- a/server_test.go +++ b/server_test.go @@ -124,6 +124,42 @@ var _ = Describe("Server", func() { firstPacket = append(firstPacket, bytes.Repeat([]byte{0}, protocol.MinClientHelloSize)...) // add padding }) + It("setups with the right values", func() { + config := &Config{ + HandshakeTimeout: 1337 * time.Minute, + IdleTimeout: 42 * time.Hour, + RequestConnectionIDOmission: true, + MaxIncomingStreams: 1234, + MaxIncomingUniStreams: 4321, + } + c := populateServerConfig(config) + Expect(c.HandshakeTimeout).To(Equal(1337 * time.Minute)) + Expect(c.IdleTimeout).To(Equal(42 * time.Hour)) + Expect(c.RequestConnectionIDOmission).To(BeFalse()) + Expect(c.MaxIncomingStreams).To(Equal(1234)) + Expect(c.MaxIncomingUniStreams).To(Equal(4321)) + }) + + It("disables bidirectional streams", func() { + config := &Config{ + MaxIncomingStreams: -1, + MaxIncomingUniStreams: 4321, + } + c := populateServerConfig(config) + Expect(c.MaxIncomingStreams).To(BeZero()) + Expect(c.MaxIncomingUniStreams).To(Equal(4321)) + }) + + It("disables unidirectional streams", func() { + config := &Config{ + MaxIncomingStreams: 1234, + MaxIncomingUniStreams: -1, + } + c := populateServerConfig(config) + Expect(c.MaxIncomingStreams).To(Equal(1234)) + Expect(c.MaxIncomingUniStreams).To(BeZero()) + }) + It("returns the address", func() { conn.addr = &net.UDPAddr{ IP: net.IPv4(192, 168, 13, 37), diff --git a/server_tls.go b/server_tls.go index a6d974df0..a3b63d835 100644 --- a/server_tls.go +++ b/server_tls.go @@ -72,9 +72,8 @@ func newServerTLS( StreamFlowControlWindow: protocol.ReceiveStreamFlowControlWindow, ConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow, IdleTimeout: config.IdleTimeout, - // TODO(#523): make these values configurable - MaxBidiStreamID: protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveServer), - MaxUniStreamID: protocol.MaxUniStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveServer), + MaxBidiStreamID: protocol.MaxBidiStreamID(config.MaxIncomingStreams, protocol.PerspectiveServer), + MaxUniStreamID: protocol.MaxUniStreamID(config.MaxIncomingUniStreams, protocol.PerspectiveServer), }, } s.newMintConn = s.newMintConnImpl diff --git a/session.go b/session.go index 681444976..72efb34f0 100644 --- a/session.go +++ b/session.go @@ -157,7 +157,7 @@ func newSession( transportParams := &handshake.TransportParameters{ StreamFlowControlWindow: protocol.ReceiveStreamFlowControlWindow, ConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow, - MaxStreams: protocol.MaxIncomingStreams, + MaxStreams: protocol.DefaultMaxIncomingStreams, IdleTimeout: s.config.IdleTimeout, } cs, err := newCryptoSetup( @@ -205,7 +205,7 @@ var newClientSession = func( transportParams := &handshake.TransportParameters{ StreamFlowControlWindow: protocol.ReceiveStreamFlowControlWindow, ConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow, - MaxStreams: protocol.MaxIncomingStreams, + MaxStreams: protocol.DefaultMaxIncomingStreams, IdleTimeout: s.config.IdleTimeout, OmitConnectionID: s.config.RequestConnectionIDOmission, } @@ -330,7 +330,7 @@ func (s *session) postSetup(initialPacketNumber protocol.PacketNumber) error { s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.version) if s.version.UsesTLS() { - s.streamsMap = newStreamsMap(s, s.newFlowController, s.perspective, s.version) + s.streamsMap = newStreamsMap(s, s.newFlowController, s.config.MaxIncomingStreams, s.config.MaxIncomingUniStreams, s.perspective, s.version) } else { s.streamsMap = newStreamsMapLegacy(s.newStream, s.perspective) } diff --git a/streams_map.go b/streams_map.go index c3ce2ef4d..941fe9549 100644 --- a/streams_map.go +++ b/streams_map.go @@ -35,6 +35,8 @@ var _ streamManager = &streamsMap{} func newStreamsMap( sender streamSender, newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController, + maxIncomingStreams int, + maxIncomingUniStreams int, perspective protocol.Perspective, version protocol.VersionNumber, ) streamManager { @@ -69,11 +71,10 @@ func newStreamsMap( newBidiStream, sender.queueControlFrame, ) - // TODO(#523): make these values configurable m.incomingBidiStreams = newIncomingBidiStreamsMap( firstIncomingBidiStream, - protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, perspective), - protocol.MaxIncomingStreams, + protocol.MaxBidiStreamID(maxIncomingStreams, perspective), + maxIncomingStreams, sender.queueControlFrame, newBidiStream, ) @@ -82,11 +83,10 @@ func newStreamsMap( newUniSendStream, sender.queueControlFrame, ) - // TODO(#523): make these values configurable m.incomingUniStreams = newIncomingUniStreamsMap( firstIncomingUniStream, - protocol.MaxUniStreamID(protocol.MaxIncomingStreams, perspective), - protocol.MaxIncomingStreams, + protocol.MaxUniStreamID(maxIncomingUniStreams, perspective), + maxIncomingUniStreams, sender.queueControlFrame, newUniReceiveStream, ) diff --git a/streams_map_legacy.go b/streams_map_legacy.go index 11be7efd4..262499129 100644 --- a/streams_map_legacy.go +++ b/streams_map_legacy.go @@ -41,7 +41,7 @@ var errMapAccess = errors.New("streamsMap: Error accessing the streams map") func newStreamsMapLegacy(newStream func(protocol.StreamID) streamI, pers protocol.Perspective) streamManager { // add some tolerance to the maximum incoming streams value - maxStreams := uint32(protocol.MaxIncomingStreams) + maxStreams := uint32(protocol.DefaultMaxIncomingStreams) maxIncomingStreams := utils.MaxUint32( maxStreams+protocol.MaxStreamsMinimumIncrement, uint32(float64(maxStreams)*float64(protocol.MaxStreamsMultiplier)), diff --git a/streams_map_test.go b/streams_map_test.go index b53d7d652..29f6134a7 100644 --- a/streams_map_test.go +++ b/streams_map_test.go @@ -56,6 +56,11 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { mockSender *MockStreamSender ) + const ( + maxBidiStreams = 111 + maxUniStreams = 222 + ) + allowUnlimitedStreams := func() { m.UpdateLimits(&handshake.TransportParameters{ MaxBidiStreamID: 0xffffffff, @@ -65,7 +70,7 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { BeforeEach(func() { mockSender = NewMockStreamSender(mockCtrl) - m = newStreamsMap(mockSender, newFlowController, perspective, versionIETFFrames).(*streamsMap) + m = newStreamsMap(mockSender, newFlowController, maxBidiStreams, maxUniStreams, perspective, versionIETFFrames).(*streamsMap) }) Context("opening", func() { @@ -325,7 +330,7 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { _, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream + 4*10) Expect(err).ToNot(HaveOccurred()) mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{ - StreamID: protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, perspective) + 4, + StreamID: protocol.MaxBidiStreamID(maxBidiStreams, perspective) + 4, }) Expect(m.DeleteStream(ids.firstIncomingBidiStream)).To(Succeed()) }) @@ -334,7 +339,7 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { _, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream + 4*10) Expect(err).ToNot(HaveOccurred()) mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{ - StreamID: protocol.MaxUniStreamID(protocol.MaxIncomingStreams, perspective) + 4, + StreamID: protocol.MaxUniStreamID(maxUniStreams, perspective) + 4, }) Expect(m.DeleteStream(ids.firstIncomingUniStream)).To(Succeed()) })