From 5c5e1024e71e8021f7b93262aa43c4341e0cdac5 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 16 Oct 2020 17:28:20 +0700 Subject: [PATCH] send STREAMS_BLOCKED frame when MAX_STREAMS frame allows too few streams --- session_test.go | 2 +- streams_map_outgoing_bidi.go | 6 ++- streams_map_outgoing_generic.go | 6 ++- streams_map_outgoing_generic_test.go | 55 ++++++++++++++++++++++++---- streams_map_outgoing_uni.go | 6 ++- 5 files changed, 64 insertions(+), 11 deletions(-) diff --git a/session_test.go b/session_test.go index 805cf4614..81fdfee0e 100644 --- a/session_test.go +++ b/session_test.go @@ -325,7 +325,7 @@ var _ = Describe("Session", func() { Expect(err).NotTo(HaveOccurred()) }) - It("handles STREAM_ID_BLOCKED frames", func() { + It("handles STREAMS_BLOCKED frames", func() { err := sess.handleFrame(&wire.StreamsBlockedFrame{}, protocol.Encryption1RTT, protocol.ConnectionID{}) Expect(err).NotTo(HaveOccurred()) }) diff --git a/streams_map_outgoing_bidi.go b/streams_map_outgoing_bidi.go index 62524abe4..72d942117 100644 --- a/streams_map_outgoing_bidi.go +++ b/streams_map_outgoing_bidi.go @@ -119,6 +119,8 @@ func (m *outgoingBidiStreamsMap) openStream() streamI { return s } +// maybeSendBlockedFrame queues a STREAMS_BLOCKED frame for the current stream offset, +// if we haven't sent one for this offset yet func (m *outgoingBidiStreamsMap) maybeSendBlockedFrame() { if m.blockedSent { return @@ -172,8 +174,10 @@ func (m *outgoingBidiStreamsMap) SetMaxStream(num protocol.StreamNum) { } m.maxStream = num m.blockedSent = false + if m.maxStream < m.nextStream-1+protocol.StreamNum(len(m.openQueue)) { + m.maybeSendBlockedFrame() + } m.unblockOpenSync() - // TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame } // unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream diff --git a/streams_map_outgoing_generic.go b/streams_map_outgoing_generic.go index ed34be6be..01edd7907 100644 --- a/streams_map_outgoing_generic.go +++ b/streams_map_outgoing_generic.go @@ -117,6 +117,8 @@ func (m *outgoingItemsMap) openStream() item { return s } +// maybeSendBlockedFrame queues a STREAMS_BLOCKED frame for the current stream offset, +// if we haven't sent one for this offset yet func (m *outgoingItemsMap) maybeSendBlockedFrame() { if m.blockedSent { return @@ -170,8 +172,10 @@ func (m *outgoingItemsMap) SetMaxStream(num protocol.StreamNum) { } m.maxStream = num m.blockedSent = false + if m.maxStream < m.nextStream-1+protocol.StreamNum(len(m.openQueue)) { + m.maybeSendBlockedFrame() + } m.unblockOpenSync() - // TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame } // unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream diff --git a/streams_map_outgoing_generic_test.go b/streams_map_outgoing_generic_test.go index f073f10ef..3875f7ce5 100644 --- a/streams_map_outgoing_generic_test.go +++ b/streams_map_outgoing_generic_test.go @@ -333,7 +333,7 @@ var _ = Describe("Streams Map (outgoing)", func() { Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2))) }) - It("queues a STREAM_ID_BLOCKED frame if no stream can be opened", func() { + It("queues a STREAMS_BLOCKED frame if no stream can be opened", func() { m.SetMaxStream(6) // open the 6 allowed streams for i := 0; i < 6; i++ { @@ -349,19 +349,48 @@ var _ = Describe("Streams Map (outgoing)", func() { Expect(err.Error()).To(Equal(errTooManyOpenStreams.Error())) }) - It("only sends one STREAM_ID_BLOCKED frame for one stream ID", func() { + It("only sends one STREAMS_BLOCKED frame for one stream ID", func() { m.SetMaxStream(1) mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) { Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(1)) }) _, err := m.OpenStream() Expect(err).ToNot(HaveOccurred()) - // try to open a stream twice, but expect only one STREAM_ID_BLOCKED to be sent + // try to open a stream twice, but expect only one STREAMS_BLOCKED to be sent _, err = m.OpenStream() expectTooManyStreamsError(err) _, err = m.OpenStream() expectTooManyStreamsError(err) }) + + It("queues a STREAMS_BLOCKED frame when there more streams waiting for OpenStreamSync than MAX_STREAMS allows", func() { + mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) { + Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(0)) + }) + done := make(chan struct{}, 2) + go func() { + defer GinkgoRecover() + _, err := m.OpenStreamSync(context.Background()) + Expect(err).ToNot(HaveOccurred()) + done <- struct{}{} + }() + go func() { + defer GinkgoRecover() + _, err := m.OpenStreamSync(context.Background()) + Expect(err).ToNot(HaveOccurred()) + done <- struct{}{} + }() + waitForEnqueued(2) + + mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) { + Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(1)) + }) + m.SetMaxStream(1) + Eventually(done).Should(Receive()) + Consistently(done).ShouldNot(Receive()) + m.SetMaxStream(2) + Eventually(done).Should(Receive()) + }) }) Context("randomized tests", func() { @@ -370,8 +399,10 @@ var _ = Describe("Streams Map (outgoing)", func() { const n = 100 fmt.Fprintf(GinkgoWriter, "Opening %d streams concurrently.\n", n) - // TODO(#2826): check stream limits sent in STREAMS_BLOCKED frames - mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() + var blockedAt []protocol.StreamNum + mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) { + blockedAt = append(blockedAt, f.(*wire.StreamsBlockedFrame).StreamLimit) + }).AnyTimes() done := make(map[int]chan struct{}) for i := 1; i <= n; i++ { c := make(chan struct{}) @@ -388,8 +419,12 @@ var _ = Describe("Streams Map (outgoing)", func() { } var limit int + limits := []protocol.StreamNum{0} for limit < n { limit += rand.Intn(n/5) + 1 + if limit <= n { + limits = append(limits, protocol.StreamNum(limit)) + } fmt.Fprintf(GinkgoWriter, "Setting stream limit to %d.\n", limit) m.SetMaxStream(protocol.StreamNum(limit)) for i := 1; i <= n; i++ { @@ -407,6 +442,7 @@ var _ = Describe("Streams Map (outgoing)", func() { Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(n + 1))) } } + Expect(blockedAt).To(Equal(limits)) }) It("opens streams, when some of them are getting canceled", func() { @@ -414,8 +450,10 @@ var _ = Describe("Streams Map (outgoing)", func() { const n = 100 fmt.Fprintf(GinkgoWriter, "Opening %d streams concurrently.\n", n) - // TODO(#2826): check stream limits sent in STREAMS_BLOCKED frames - mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() + var blockedAt []protocol.StreamNum + mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) { + blockedAt = append(blockedAt, f.(*wire.StreamsBlockedFrame).StreamLimit) + }).AnyTimes() ctx, cancel := context.WithCancel(context.Background()) streamsToCancel := make(map[protocol.StreamNum]struct{}) // used as a set @@ -463,7 +501,9 @@ var _ = Describe("Streams Map (outgoing)", func() { } var limit int numStreams := n - len(streamsToCancel) + var limits []protocol.StreamNum for limit < numStreams { + limits = append(limits, protocol.StreamNum(limit)) limit += rand.Intn(n/5) + 1 fmt.Fprintf(GinkgoWriter, "Setting stream limit to %d.\n", limit) m.SetMaxStream(protocol.StreamNum(limit)) @@ -483,6 +523,7 @@ var _ = Describe("Streams Map (outgoing)", func() { Expect(streamIDs[i]).To(Equal(i + 1)) } } + Expect(blockedAt).To(Equal(limits)) }) }) }) diff --git a/streams_map_outgoing_uni.go b/streams_map_outgoing_uni.go index bffd21493..3fd1f177e 100644 --- a/streams_map_outgoing_uni.go +++ b/streams_map_outgoing_uni.go @@ -119,6 +119,8 @@ func (m *outgoingUniStreamsMap) openStream() sendStreamI { return s } +// maybeSendBlockedFrame queues a STREAMS_BLOCKED frame for the current stream offset, +// if we haven't sent one for this offset yet func (m *outgoingUniStreamsMap) maybeSendBlockedFrame() { if m.blockedSent { return @@ -172,8 +174,10 @@ func (m *outgoingUniStreamsMap) SetMaxStream(num protocol.StreamNum) { } m.maxStream = num m.blockedSent = false + if m.maxStream < m.nextStream-1+protocol.StreamNum(len(m.openQueue)) { + m.maybeSendBlockedFrame() + } m.unblockOpenSync() - // TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame } // unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream