diff --git a/client.go b/client.go index 4f7cc184..955c908e 100644 --- a/client.go +++ b/client.go @@ -171,9 +171,9 @@ func (c *client) dialTLS() error { ConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow, IdleTimeout: c.config.IdleTimeout, OmitConnectionID: c.config.RequestConnectionIDOmission, - // TODO(#1150): set reasonable limits - MaxBidiStreamID: 0xffffffff, - MaxUniStreamID: 0xffffffff, + // TODO(#523): make these values configurable + MaxBidiStreamID: protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveClient), + MaxUniStreamID: protocol.MaxUniStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveClient), } csc := handshake.NewCryptoStreamConn(nil) extHandler := handshake.NewExtensionHandlerClient(params, c.initialVersion, c.config.Versions, c.version) diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index 16229836..0901b19e 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -55,9 +55,6 @@ func (t PacketType) String() string { // A ConnectionID in QUIC type ConnectionID uint64 -// A StreamID in QUIC -type StreamID uint64 - // A ByteCount in QUIC type ByteCount uint64 diff --git a/internal/protocol/stream_id.go b/internal/protocol/stream_id.go new file mode 100644 index 00000000..a0dced0c --- /dev/null +++ b/internal/protocol/stream_id.go @@ -0,0 +1,36 @@ +package protocol + +// A StreamID in QUIC +type StreamID uint64 + +// MaxBidiStreamID is the highest stream ID that the peer is allowed to open, +// when it is allowed to open numStreams bidirectional streams. +// It is only valid for IETF QUIC. +func MaxBidiStreamID(numStreams int, pers Perspective) StreamID { + if numStreams == 0 { + return 0 + } + var first StreamID + if pers == PerspectiveClient { + first = 1 + } else { + first = 4 + } + return first + 4*StreamID(numStreams-1) +} + +// MaxUniStreamID is the highest stream ID that the peer is allowed to open, +// when it is allowed to open numStreams unidirectional streams. +// It is only valid for IETF QUIC. +func MaxUniStreamID(numStreams int, pers Perspective) StreamID { + if numStreams == 0 { + return 0 + } + var first StreamID + if pers == PerspectiveClient { + first = 3 + } else { + first = 2 + } + return first + 4*StreamID(numStreams-1) +} diff --git a/internal/protocol/stream_id_test.go b/internal/protocol/stream_id_test.go new file mode 100644 index 00000000..cca4f928 --- /dev/null +++ b/internal/protocol/stream_id_test.go @@ -0,0 +1,42 @@ +package protocol + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Stream ID", func() { + Context("bidirectional streams", func() { + It("doesn't allow any", func() { + Expect(MaxBidiStreamID(0, PerspectiveClient)).To(Equal(StreamID(0))) + Expect(MaxBidiStreamID(0, PerspectiveServer)).To(Equal(StreamID(0))) + }) + + It("allows one", func() { + Expect(MaxBidiStreamID(1, PerspectiveClient)).To(Equal(StreamID(1))) + Expect(MaxBidiStreamID(1, PerspectiveServer)).To(Equal(StreamID(4))) + }) + + It("allows many", func() { + Expect(MaxBidiStreamID(100, PerspectiveClient)).To(Equal(StreamID(397))) + Expect(MaxBidiStreamID(100, PerspectiveServer)).To(Equal(StreamID(400))) + }) + }) + + Context("unidirectional streams", func() { + It("doesn't allow any", func() { + Expect(MaxUniStreamID(0, PerspectiveClient)).To(Equal(StreamID(0))) + Expect(MaxUniStreamID(0, PerspectiveServer)).To(Equal(StreamID(0))) + }) + + It("allows one", func() { + Expect(MaxUniStreamID(1, PerspectiveClient)).To(Equal(StreamID(3))) + Expect(MaxUniStreamID(1, PerspectiveServer)).To(Equal(StreamID(2))) + }) + + It("allows many", func() { + Expect(MaxUniStreamID(100, PerspectiveClient)).To(Equal(StreamID(399))) + Expect(MaxUniStreamID(100, PerspectiveServer)).To(Equal(StreamID(398))) + }) + }) +}) diff --git a/server_tls.go b/server_tls.go index 1648459d..c45de35b 100644 --- a/server_tls.go +++ b/server_tls.go @@ -67,9 +67,9 @@ func newServerTLS( StreamFlowControlWindow: protocol.ReceiveStreamFlowControlWindow, ConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow, IdleTimeout: config.IdleTimeout, - // TODO(#1150): set reasonable limits - MaxBidiStreamID: 0xffffffff, - MaxUniStreamID: 0xffffffff, + // TODO(#523): make these values configurable + MaxBidiStreamID: protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveServer), + MaxUniStreamID: protocol.MaxUniStreamID(protocol.MaxIncomingStreams, protocol.PerspectiveServer), }, } s.newMintConn = s.newMintConnImpl diff --git a/streams_map.go b/streams_map.go index 6cfd45a8..8287c83b 100644 --- a/streams_map.go +++ b/streams_map.go @@ -2,7 +2,6 @@ package quic import ( "fmt" - "math" "github.com/lucas-clemente/quic-go/internal/flowcontrol" "github.com/lucas-clemente/quic-go/internal/handshake" @@ -66,11 +65,23 @@ func newStreamsMap( return newReceiveStream(id, m.sender, m.newFlowController(id), version) } m.outgoingBidiStreams = newOutgoingBidiStreamsMap(firstOutgoingBidiStream, newBidiStream) - // TODO(#1150): use a reasonable stream limit - m.incomingBidiStreams = newIncomingBidiStreamsMap(firstIncomingBidiStream, protocol.StreamID(math.MaxUint32), newBidiStream) + // TODO(#523): make these values configurable + m.incomingBidiStreams = newIncomingBidiStreamsMap( + firstIncomingBidiStream, + protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, perspective), + protocol.MaxIncomingStreams, + sender.queueControlFrame, + newBidiStream, + ) m.outgoingUniStreams = newOutgoingUniStreamsMap(firstOutgoingUniStream, newUniSendStream) - // TODO(#1150): use a reasonable stream limit - m.incomingUniStreams = newIncomingUniStreamsMap(firstIncomingUniStream, protocol.StreamID(math.MaxUint32), newUniReceiveStream) + // TODO(#523): make these values configurable + m.incomingUniStreams = newIncomingUniStreamsMap( + firstIncomingUniStream, + protocol.MaxUniStreamID(protocol.MaxIncomingStreams, perspective), + protocol.MaxIncomingStreams, + sender.queueControlFrame, + newUniReceiveStream, + ) return m } diff --git a/streams_map_incoming_bidi.go b/streams_map_incoming_bidi.go index abd446d4..8a35f044 100644 --- a/streams_map_incoming_bidi.go +++ b/streams_map_incoming_bidi.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/wire" ) type incomingBidiStreamsMap struct { @@ -20,21 +21,28 @@ type incomingBidiStreamsMap struct { nextStream protocol.StreamID // the next stream that will be returned by AcceptStream() highestStream protocol.StreamID // the highest stream that the peer openend maxStream protocol.StreamID // the highest stream that the peer is allowed to open - newStream func(protocol.StreamID) streamI + maxNumStreams int // maximum number of streams + + newStream func(protocol.StreamID) streamI + queueMaxStreamID func(*wire.MaxStreamIDFrame) closeErr error } func newIncomingBidiStreamsMap( nextStream protocol.StreamID, - maxStream protocol.StreamID, + initialMaxStreamID protocol.StreamID, + maxNumStreams int, + queueControlFrame func(wire.Frame), newStream func(protocol.StreamID) streamI, ) *incomingBidiStreamsMap { m := &incomingBidiStreamsMap{ - streams: make(map[protocol.StreamID]streamI), - nextStream: nextStream, - maxStream: maxStream, - newStream: newStream, + streams: make(map[protocol.StreamID]streamI), + nextStream: nextStream, + maxStream: initialMaxStreamID, + maxNumStreams: maxNumStreams, + newStream: newStream, + queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) }, } m.cond.L = &m.mutex return m @@ -99,6 +107,11 @@ func (m *incomingBidiStreamsMap) DeleteStream(id protocol.StreamID) error { return fmt.Errorf("Tried to delete unknown stream %d", id) } delete(m.streams, id) + // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream + if numNewStreams := m.maxNumStreams - len(m.streams); numNewStreams > 0 { + m.maxStream = m.highestStream + protocol.StreamID(numNewStreams*4) + m.queueMaxStreamID(&wire.MaxStreamIDFrame{StreamID: m.maxStream}) + } return nil } diff --git a/streams_map_incoming_generic.go b/streams_map_incoming_generic.go index 44811f67..830b690d 100644 --- a/streams_map_incoming_generic.go +++ b/streams_map_incoming_generic.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/wire" ) //go:generate genny -in $GOFILE -out streams_map_incoming_bidi.go gen "item=streamI Item=BidiStream" @@ -18,21 +19,28 @@ type incomingItemsMap struct { nextStream protocol.StreamID // the next stream that will be returned by AcceptStream() highestStream protocol.StreamID // the highest stream that the peer openend maxStream protocol.StreamID // the highest stream that the peer is allowed to open - newStream func(protocol.StreamID) item + maxNumStreams int // maximum number of streams + + newStream func(protocol.StreamID) item + queueMaxStreamID func(*wire.MaxStreamIDFrame) closeErr error } func newIncomingItemsMap( nextStream protocol.StreamID, - maxStream protocol.StreamID, + initialMaxStreamID protocol.StreamID, + maxNumStreams int, + queueControlFrame func(wire.Frame), newStream func(protocol.StreamID) item, ) *incomingItemsMap { m := &incomingItemsMap{ - streams: make(map[protocol.StreamID]item), - nextStream: nextStream, - maxStream: maxStream, - newStream: newStream, + streams: make(map[protocol.StreamID]item), + nextStream: nextStream, + maxStream: initialMaxStreamID, + maxNumStreams: maxNumStreams, + newStream: newStream, + queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) }, } m.cond.L = &m.mutex return m @@ -97,6 +105,11 @@ func (m *incomingItemsMap) DeleteStream(id protocol.StreamID) error { return fmt.Errorf("Tried to delete unknown stream %d", id) } delete(m.streams, id) + // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream + if numNewStreams := m.maxNumStreams - len(m.streams); numNewStreams > 0 { + m.maxStream = m.highestStream + protocol.StreamID(numNewStreams*4) + m.queueMaxStreamID(&wire.MaxStreamIDFrame{StreamID: m.maxStream}) + } return nil } diff --git a/streams_map_incoming_generic_test.go b/streams_map_incoming_generic_test.go index ad73df45..5d66fcd1 100644 --- a/streams_map_incoming_generic_test.go +++ b/streams_map_incoming_generic_test.go @@ -4,7 +4,9 @@ import ( "errors" "fmt" + "github.com/golang/mock/gomock" "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/wire" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -12,14 +14,16 @@ import ( var _ = Describe("Streams Map (outgoing)", func() { const ( - firstNewStream protocol.StreamID = 20 - maxStream protocol.StreamID = firstNewStream + 4*100 + firstNewStream protocol.StreamID = 20 + maxNumStreams int = 10 + initialMaxStream protocol.StreamID = firstNewStream + 4*protocol.StreamID(maxNumStreams-1) ) var ( m *incomingItemsMap newItem func(id protocol.StreamID) item newItemCounter int + mockSender *MockStreamSender ) BeforeEach(func() { @@ -28,7 +32,8 @@ var _ = Describe("Streams Map (outgoing)", func() { newItemCounter++ return id } - m = newIncomingItemsMap(firstNewStream, maxStream, newItem) + mockSender = NewMockStreamSender(mockCtrl) + m = newIncomingItemsMap(firstNewStream, initialMaxStream, maxNumStreams, mockSender.queueControlFrame, newItem) }) It("opens all streams up to the id on GetOrOpenStream", func() { @@ -59,14 +64,14 @@ var _ = Describe("Streams Map (outgoing)", func() { }) It("allows opening the maximum stream ID", func() { - str, err := m.GetOrOpenStream(maxStream) + str, err := m.GetOrOpenStream(initialMaxStream) Expect(err).ToNot(HaveOccurred()) - Expect(str).To(Equal(maxStream)) + Expect(str).To(Equal(initialMaxStream)) }) It("errors when trying to get a stream ID higher than the maximum", func() { - _, err := m.GetOrOpenStream(maxStream + 4) - Expect(err).To(MatchError(fmt.Errorf("peer tried to open stream %d (current limit: %d)", maxStream+4, maxStream))) + _, err := m.GetOrOpenStream(initialMaxStream + 4) + Expect(err).To(MatchError(fmt.Errorf("peer tried to open stream %d (current limit: %d)", initialMaxStream+4, initialMaxStream))) }) It("blocks AcceptStream until a new stream is available", func() { @@ -106,6 +111,7 @@ var _ = Describe("Streams Map (outgoing)", func() { }) It("deletes streams", func() { + mockSender.EXPECT().queueControlFrame(gomock.Any()) _, err := m.GetOrOpenStream(20) Expect(err).ToNot(HaveOccurred()) err = m.DeleteStream(20) @@ -119,4 +125,14 @@ var _ = Describe("Streams Map (outgoing)", func() { err := m.DeleteStream(1337) Expect(err).To(MatchError("Tried to delete unknown stream 1337")) }) + + It("sends MAX_STREAM_ID frames when streams are deleted", func() { + // open a bunch of streams + _, err := m.GetOrOpenStream(firstNewStream + 4*4) + Expect(err).ToNot(HaveOccurred()) + mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{StreamID: initialMaxStream + 4}) + Expect(m.DeleteStream(firstNewStream + 4)).To(Succeed()) + mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{StreamID: initialMaxStream + 8}) + Expect(m.DeleteStream(firstNewStream + 3*4)).To(Succeed()) + }) }) diff --git a/streams_map_incoming_uni.go b/streams_map_incoming_uni.go index 2dd2deeb..9091d635 100644 --- a/streams_map_incoming_uni.go +++ b/streams_map_incoming_uni.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/wire" ) type incomingUniStreamsMap struct { @@ -20,21 +21,28 @@ type incomingUniStreamsMap struct { nextStream protocol.StreamID // the next stream that will be returned by AcceptStream() highestStream protocol.StreamID // the highest stream that the peer openend maxStream protocol.StreamID // the highest stream that the peer is allowed to open - newStream func(protocol.StreamID) receiveStreamI + maxNumStreams int // maximum number of streams + + newStream func(protocol.StreamID) receiveStreamI + queueMaxStreamID func(*wire.MaxStreamIDFrame) closeErr error } func newIncomingUniStreamsMap( nextStream protocol.StreamID, - maxStream protocol.StreamID, + initialMaxStreamID protocol.StreamID, + maxNumStreams int, + queueControlFrame func(wire.Frame), newStream func(protocol.StreamID) receiveStreamI, ) *incomingUniStreamsMap { m := &incomingUniStreamsMap{ - streams: make(map[protocol.StreamID]receiveStreamI), - nextStream: nextStream, - maxStream: maxStream, - newStream: newStream, + streams: make(map[protocol.StreamID]receiveStreamI), + nextStream: nextStream, + maxStream: initialMaxStreamID, + maxNumStreams: maxNumStreams, + newStream: newStream, + queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) }, } m.cond.L = &m.mutex return m @@ -99,6 +107,11 @@ func (m *incomingUniStreamsMap) DeleteStream(id protocol.StreamID) error { return fmt.Errorf("Tried to delete unknown stream %d", id) } delete(m.streams, id) + // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream + if numNewStreams := m.maxNumStreams - len(m.streams); numNewStreams > 0 { + m.maxStream = m.highestStream + protocol.StreamID(numNewStreams*4) + m.queueMaxStreamID(&wire.MaxStreamIDFrame{StreamID: m.maxStream}) + } return nil } diff --git a/streams_map_test.go b/streams_map_test.go index cf8b655b..a9901d8c 100644 --- a/streams_map_test.go +++ b/streams_map_test.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/golang/mock/gomock" "github.com/lucas-clemente/quic-go/internal/flowcontrol" "github.com/lucas-clemente/quic-go/internal/handshake" "github.com/lucas-clemente/quic-go/internal/mocks" @@ -50,7 +51,10 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { } Context(perspective.String(), func() { - var m *streamsMap + var ( + m *streamsMap + mockSender *MockStreamSender + ) allowUnlimitedStreams := func() { m.UpdateLimits(&handshake.TransportParameters{ @@ -60,7 +64,8 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { } BeforeEach(func() { - m = newStreamsMap(nil, newFlowController, perspective, versionIETFFrames).(*streamsMap) + mockSender = NewMockStreamSender(mockCtrl) + m = newStreamsMap(mockSender, newFlowController, perspective, versionIETFFrames).(*streamsMap) }) Context("opening", func() { @@ -111,6 +116,7 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { Context("deleting", func() { BeforeEach(func() { + mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() allowUnlimitedStreams() }) @@ -306,6 +312,26 @@ var _ = Describe("Streams Map (for IETF QUIC)", func() { }) }) + Context("sending MAX_STREAM_ID frames", func() { + It("sends MAX_STREAM_ID frames for bidirectional streams", func() { + _, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream + 4*10) + Expect(err).ToNot(HaveOccurred()) + mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{ + StreamID: protocol.MaxBidiStreamID(protocol.MaxIncomingStreams, perspective) + 4, + }) + Expect(m.DeleteStream(ids.firstIncomingBidiStream)).To(Succeed()) + }) + + It("sends MAX_STREAM_ID frames for unidirectional streams", func() { + _, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream + 4*10) + Expect(err).ToNot(HaveOccurred()) + mockSender.EXPECT().queueControlFrame(&wire.MaxStreamIDFrame{ + StreamID: protocol.MaxUniStreamID(protocol.MaxIncomingStreams, perspective) + 4, + }) + Expect(m.DeleteStream(ids.firstIncomingUniStream)).To(Succeed()) + }) + }) + It("closes", func() { testErr := errors.New("test error") m.CloseWithError(testErr)