From 5fc2e1203811b66f2e8c1124bae29e120f5c2aeb Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 11 Oct 2018 14:45:45 +0100 Subject: [PATCH] make the outgoing streams map work with stream 0 Stream 0 is now a valid stream ID used for application data, so the streams map must be able to (block on) opening this stream. --- streams_map_outgoing_bidi.go | 6 ++++-- streams_map_outgoing_generic.go | 6 ++++-- streams_map_outgoing_generic_test.go | 17 +++++++++++++++++ streams_map_outgoing_uni.go | 6 ++++-- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/streams_map_outgoing_bidi.go b/streams_map_outgoing_bidi.go index ea9f47e6..ce3cd8c1 100644 --- a/streams_map_outgoing_bidi.go +++ b/streams_map_outgoing_bidi.go @@ -21,6 +21,7 @@ type outgoingBidiStreamsMap struct { nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) maxStream protocol.StreamID // the maximum stream ID we're allowed to open + maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for newStream func(protocol.StreamID) streamI @@ -71,7 +72,7 @@ func (m *outgoingBidiStreamsMap) openStreamImpl() (streamI, error) { if m.closeErr != nil { return nil, m.closeErr } - if m.nextStream > m.maxStream { + if !m.maxStreamSet || m.nextStream > m.maxStream { if m.maxStream == 0 || m.highestBlocked < m.maxStream { m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream}) m.highestBlocked = m.maxStream @@ -108,8 +109,9 @@ func (m *outgoingBidiStreamsMap) DeleteStream(id protocol.StreamID) error { func (m *outgoingBidiStreamsMap) SetMaxStream(id protocol.StreamID) { m.mutex.Lock() - if id > m.maxStream { + if !m.maxStreamSet || id > m.maxStream { m.maxStream = id + m.maxStreamSet = true m.cond.Broadcast() } m.mutex.Unlock() diff --git a/streams_map_outgoing_generic.go b/streams_map_outgoing_generic.go index f4b3eb61..5df1d685 100644 --- a/streams_map_outgoing_generic.go +++ b/streams_map_outgoing_generic.go @@ -19,6 +19,7 @@ type outgoingItemsMap struct { nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) maxStream protocol.StreamID // the maximum stream ID we're allowed to open + maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for newStream func(protocol.StreamID) item @@ -69,7 +70,7 @@ func (m *outgoingItemsMap) openStreamImpl() (item, error) { if m.closeErr != nil { return nil, m.closeErr } - if m.nextStream > m.maxStream { + if !m.maxStreamSet || m.nextStream > m.maxStream { if m.maxStream == 0 || m.highestBlocked < m.maxStream { m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream}) m.highestBlocked = m.maxStream @@ -106,8 +107,9 @@ func (m *outgoingItemsMap) DeleteStream(id protocol.StreamID) error { func (m *outgoingItemsMap) SetMaxStream(id protocol.StreamID) { m.mutex.Lock() - if id > m.maxStream { + if !m.maxStreamSet || id > m.maxStream { m.maxStream = id + m.maxStreamSet = true m.cond.Broadcast() } m.mutex.Unlock() diff --git a/streams_map_outgoing_generic_test.go b/streams_map_outgoing_generic_test.go index c8426899..231f8cfe 100644 --- a/streams_map_outgoing_generic_test.go +++ b/streams_map_outgoing_generic_test.go @@ -122,6 +122,23 @@ var _ = Describe("Streams Map (outgoing)", func() { Eventually(done).Should(BeClosed()) }) + It("works with stream 0", func() { + m = newOutgoingItemsMap(0, newItem, mockSender.queueControlFrame) + mockSender.EXPECT().queueControlFrame(&wire.StreamIDBlockedFrame{StreamID: 0}) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + str, err := m.OpenStreamSync() + Expect(err).ToNot(HaveOccurred()) + Expect(str.(*mockGenericStream).id).To(BeZero()) + close(done) + }() + + Consistently(done).ShouldNot(BeClosed()) + m.SetMaxStream(0) + Eventually(done).Should(BeClosed()) + }) + It("stops opening synchronously when it is closed", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) testErr := errors.New("test error") diff --git a/streams_map_outgoing_uni.go b/streams_map_outgoing_uni.go index 6ad0348c..34243887 100644 --- a/streams_map_outgoing_uni.go +++ b/streams_map_outgoing_uni.go @@ -21,6 +21,7 @@ type outgoingUniStreamsMap struct { nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) maxStream protocol.StreamID // the maximum stream ID we're allowed to open + maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) highestBlocked protocol.StreamID // the highest stream ID that we queued a STREAM_ID_BLOCKED frame for newStream func(protocol.StreamID) sendStreamI @@ -71,7 +72,7 @@ func (m *outgoingUniStreamsMap) openStreamImpl() (sendStreamI, error) { if m.closeErr != nil { return nil, m.closeErr } - if m.nextStream > m.maxStream { + if !m.maxStreamSet || m.nextStream > m.maxStream { if m.maxStream == 0 || m.highestBlocked < m.maxStream { m.queueStreamIDBlocked(&wire.StreamIDBlockedFrame{StreamID: m.maxStream}) m.highestBlocked = m.maxStream @@ -108,8 +109,9 @@ func (m *outgoingUniStreamsMap) DeleteStream(id protocol.StreamID) error { func (m *outgoingUniStreamsMap) SetMaxStream(id protocol.StreamID) { m.mutex.Lock() - if id > m.maxStream { + if !m.maxStreamSet || id > m.maxStream { m.maxStream = id + m.maxStreamSet = true m.cond.Broadcast() } m.mutex.Unlock()