From 08008b680ed29912e8c9b530bf91ff6637f8d6b4 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 2 Oct 2018 16:46:25 -0700 Subject: [PATCH] make the incoming streams map work with stream 0 Stream 0 is now a valid stream ID used for application data, so the streams map must be able to accept this stream. --- streams_map_incoming_bidi.go | 43 ++++++++++++---------------- streams_map_incoming_generic.go | 43 ++++++++++++---------------- streams_map_incoming_generic_test.go | 18 ++++++++++++ streams_map_incoming_uni.go | 43 ++++++++++++---------------- 4 files changed, 75 insertions(+), 72 deletions(-) diff --git a/streams_map_incoming_bidi.go b/streams_map_incoming_bidi.go index 317f5e23..37e75351 100644 --- a/streams_map_incoming_bidi.go +++ b/streams_map_incoming_bidi.go @@ -18,10 +18,10 @@ type incomingBidiStreamsMap struct { streams map[protocol.StreamID]streamI - nextStream protocol.StreamID // the next stream that will be returned by AcceptStream() - highestStream protocol.StreamID // the highest stream that the peer openend - maxStream protocol.StreamID // the highest stream that the peer is allowed to open - maxNumStreams int // maximum number of streams + nextStreamToAccept protocol.StreamID // the next stream that will be returned by AcceptStream() + nextStreamToOpen protocol.StreamID // the highest stream that the peer openend + maxStream protocol.StreamID // the highest stream that the peer is allowed to open + maxNumStreams int // maximum number of streams newStream func(protocol.StreamID) streamI queueMaxStreamID func(*wire.MaxStreamIDFrame) @@ -30,19 +30,20 @@ type incomingBidiStreamsMap struct { } func newIncomingBidiStreamsMap( - nextStream protocol.StreamID, + nextStreamToAccept protocol.StreamID, initialMaxStreamID protocol.StreamID, maxNumStreams int, queueControlFrame func(wire.Frame), newStream func(protocol.StreamID) streamI, ) *incomingBidiStreamsMap { m := &incomingBidiStreamsMap{ - streams: make(map[protocol.StreamID]streamI), - nextStream: nextStream, - maxStream: initialMaxStreamID, - maxNumStreams: maxNumStreams, - newStream: newStream, - queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) }, + streams: make(map[protocol.StreamID]streamI), + nextStreamToAccept: nextStreamToAccept, + nextStreamToOpen: nextStreamToAccept, + maxStream: initialMaxStreamID, + maxNumStreams: maxNumStreams, + newStream: newStream, + queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) }, } m.cond.L = &m.mutex return m @@ -58,13 +59,13 @@ func (m *incomingBidiStreamsMap) AcceptStream() (streamI, error) { if m.closeErr != nil { return nil, m.closeErr } - str, ok = m.streams[m.nextStream] + str, ok = m.streams[m.nextStreamToAccept] if ok { break } m.cond.Wait() } - m.nextStream += 4 + m.nextStreamToAccept += 4 return str, nil } @@ -77,7 +78,7 @@ func (m *incomingBidiStreamsMap) GetOrOpenStream(id protocol.StreamID) (streamI, // if the id is smaller than the highest we accepted // * this stream exists in the map, and we can return it, or // * this stream was already closed, then we can return the nil - if id <= m.highestStream { + if id < m.nextStreamToOpen { s := m.streams[id] m.mutex.RUnlock() return s, nil @@ -88,17 +89,11 @@ func (m *incomingBidiStreamsMap) GetOrOpenStream(id protocol.StreamID) (streamI, // no need to check the two error conditions from above again // * maxStream can only increase, so if the id was valid before, it definitely is valid now // * highestStream is only modified by this function - var start protocol.StreamID - if m.highestStream == 0 { - start = m.nextStream - } else { - start = m.highestStream + 4 - } - for newID := start; newID <= id; newID += 4 { + for newID := m.nextStreamToOpen; newID <= id; newID += 4 { m.streams[newID] = m.newStream(newID) m.cond.Signal() } - m.highestStream = id + m.nextStreamToOpen = id + 4 s := m.streams[id] m.mutex.Unlock() return s, nil @@ -113,8 +108,8 @@ func (m *incomingBidiStreamsMap) DeleteStream(id protocol.StreamID) error { } delete(m.streams, id) // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream - if numNewStreams := m.maxNumStreams - len(m.streams); numNewStreams > 0 { - m.maxStream = m.highestStream + protocol.StreamID(numNewStreams*4) + if numNewStreams := m.maxNumStreams - len(m.streams) - 1; numNewStreams > 0 { + m.maxStream = m.nextStreamToOpen + protocol.StreamID(numNewStreams*4) m.queueMaxStreamID(&wire.MaxStreamIDFrame{StreamID: m.maxStream}) } return nil diff --git a/streams_map_incoming_generic.go b/streams_map_incoming_generic.go index 58f1ccbe..e8f3e562 100644 --- a/streams_map_incoming_generic.go +++ b/streams_map_incoming_generic.go @@ -16,10 +16,10 @@ type incomingItemsMap struct { streams map[protocol.StreamID]item - nextStream protocol.StreamID // the next stream that will be returned by AcceptStream() - highestStream protocol.StreamID // the highest stream that the peer openend - maxStream protocol.StreamID // the highest stream that the peer is allowed to open - maxNumStreams int // maximum number of streams + nextStreamToAccept protocol.StreamID // the next stream that will be returned by AcceptStream() + nextStreamToOpen protocol.StreamID // the highest stream that the peer openend + maxStream protocol.StreamID // the highest stream that the peer is allowed to open + maxNumStreams int // maximum number of streams newStream func(protocol.StreamID) item queueMaxStreamID func(*wire.MaxStreamIDFrame) @@ -28,19 +28,20 @@ type incomingItemsMap struct { } func newIncomingItemsMap( - nextStream protocol.StreamID, + nextStreamToAccept protocol.StreamID, initialMaxStreamID protocol.StreamID, maxNumStreams int, queueControlFrame func(wire.Frame), newStream func(protocol.StreamID) item, ) *incomingItemsMap { m := &incomingItemsMap{ - streams: make(map[protocol.StreamID]item), - nextStream: nextStream, - maxStream: initialMaxStreamID, - maxNumStreams: maxNumStreams, - newStream: newStream, - queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) }, + streams: make(map[protocol.StreamID]item), + nextStreamToAccept: nextStreamToAccept, + nextStreamToOpen: nextStreamToAccept, + maxStream: initialMaxStreamID, + maxNumStreams: maxNumStreams, + newStream: newStream, + queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) }, } m.cond.L = &m.mutex return m @@ -56,13 +57,13 @@ func (m *incomingItemsMap) AcceptStream() (item, error) { if m.closeErr != nil { return nil, m.closeErr } - str, ok = m.streams[m.nextStream] + str, ok = m.streams[m.nextStreamToAccept] if ok { break } m.cond.Wait() } - m.nextStream += 4 + m.nextStreamToAccept += 4 return str, nil } @@ -75,7 +76,7 @@ func (m *incomingItemsMap) GetOrOpenStream(id protocol.StreamID) (item, error) { // if the id is smaller than the highest we accepted // * this stream exists in the map, and we can return it, or // * this stream was already closed, then we can return the nil - if id <= m.highestStream { + if id < m.nextStreamToOpen { s := m.streams[id] m.mutex.RUnlock() return s, nil @@ -86,17 +87,11 @@ func (m *incomingItemsMap) GetOrOpenStream(id protocol.StreamID) (item, error) { // no need to check the two error conditions from above again // * maxStream can only increase, so if the id was valid before, it definitely is valid now // * highestStream is only modified by this function - var start protocol.StreamID - if m.highestStream == 0 { - start = m.nextStream - } else { - start = m.highestStream + 4 - } - for newID := start; newID <= id; newID += 4 { + for newID := m.nextStreamToOpen; newID <= id; newID += 4 { m.streams[newID] = m.newStream(newID) m.cond.Signal() } - m.highestStream = id + m.nextStreamToOpen = id + 4 s := m.streams[id] m.mutex.Unlock() return s, nil @@ -111,8 +106,8 @@ func (m *incomingItemsMap) DeleteStream(id protocol.StreamID) error { } delete(m.streams, id) // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream - if numNewStreams := m.maxNumStreams - len(m.streams); numNewStreams > 0 { - m.maxStream = m.highestStream + protocol.StreamID(numNewStreams*4) + if numNewStreams := m.maxNumStreams - len(m.streams) - 1; numNewStreams > 0 { + m.maxStream = m.nextStreamToOpen + protocol.StreamID(numNewStreams*4) m.queueMaxStreamID(&wire.MaxStreamIDFrame{StreamID: m.maxStream}) } return nil diff --git a/streams_map_incoming_generic_test.go b/streams_map_incoming_generic_test.go index e9d8dbb4..5aba8506 100644 --- a/streams_map_incoming_generic_test.go +++ b/streams_map_incoming_generic_test.go @@ -103,6 +103,24 @@ var _ = Describe("Streams Map (incoming)", func() { Expect(acceptedStr.(*mockGenericStream).id).To(Equal(firstNewStream)) }) + It("works with stream 0", func() { + m = newIncomingItemsMap(0, 1000, 1000, mockSender.queueControlFrame, newItem) + strChan := make(chan item) + go func() { + defer GinkgoRecover() + str, err := m.AcceptStream() + Expect(err).ToNot(HaveOccurred()) + strChan <- str + }() + Consistently(strChan).ShouldNot(Receive()) + str, err := m.GetOrOpenStream(0) + Expect(err).ToNot(HaveOccurred()) + Expect(str.(*mockGenericStream).id).To(BeZero()) + var acceptedStr item + Eventually(strChan).Should(Receive(&acceptedStr)) + Expect(acceptedStr.(*mockGenericStream).id).To(BeZero()) + }) + It("unblocks AcceptStream when it is closed", func() { testErr := errors.New("test error") done := make(chan struct{}) diff --git a/streams_map_incoming_uni.go b/streams_map_incoming_uni.go index 8e775aac..bec6f31c 100644 --- a/streams_map_incoming_uni.go +++ b/streams_map_incoming_uni.go @@ -18,10 +18,10 @@ type incomingUniStreamsMap struct { streams map[protocol.StreamID]receiveStreamI - nextStream protocol.StreamID // the next stream that will be returned by AcceptStream() - highestStream protocol.StreamID // the highest stream that the peer openend - maxStream protocol.StreamID // the highest stream that the peer is allowed to open - maxNumStreams int // maximum number of streams + nextStreamToAccept protocol.StreamID // the next stream that will be returned by AcceptStream() + nextStreamToOpen protocol.StreamID // the highest stream that the peer openend + maxStream protocol.StreamID // the highest stream that the peer is allowed to open + maxNumStreams int // maximum number of streams newStream func(protocol.StreamID) receiveStreamI queueMaxStreamID func(*wire.MaxStreamIDFrame) @@ -30,19 +30,20 @@ type incomingUniStreamsMap struct { } func newIncomingUniStreamsMap( - nextStream protocol.StreamID, + nextStreamToAccept protocol.StreamID, initialMaxStreamID protocol.StreamID, maxNumStreams int, queueControlFrame func(wire.Frame), newStream func(protocol.StreamID) receiveStreamI, ) *incomingUniStreamsMap { m := &incomingUniStreamsMap{ - streams: make(map[protocol.StreamID]receiveStreamI), - nextStream: nextStream, - maxStream: initialMaxStreamID, - maxNumStreams: maxNumStreams, - newStream: newStream, - queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) }, + streams: make(map[protocol.StreamID]receiveStreamI), + nextStreamToAccept: nextStreamToAccept, + nextStreamToOpen: nextStreamToAccept, + maxStream: initialMaxStreamID, + maxNumStreams: maxNumStreams, + newStream: newStream, + queueMaxStreamID: func(f *wire.MaxStreamIDFrame) { queueControlFrame(f) }, } m.cond.L = &m.mutex return m @@ -58,13 +59,13 @@ func (m *incomingUniStreamsMap) AcceptStream() (receiveStreamI, error) { if m.closeErr != nil { return nil, m.closeErr } - str, ok = m.streams[m.nextStream] + str, ok = m.streams[m.nextStreamToAccept] if ok { break } m.cond.Wait() } - m.nextStream += 4 + m.nextStreamToAccept += 4 return str, nil } @@ -77,7 +78,7 @@ func (m *incomingUniStreamsMap) GetOrOpenStream(id protocol.StreamID) (receiveSt // if the id is smaller than the highest we accepted // * this stream exists in the map, and we can return it, or // * this stream was already closed, then we can return the nil - if id <= m.highestStream { + if id < m.nextStreamToOpen { s := m.streams[id] m.mutex.RUnlock() return s, nil @@ -88,17 +89,11 @@ func (m *incomingUniStreamsMap) GetOrOpenStream(id protocol.StreamID) (receiveSt // no need to check the two error conditions from above again // * maxStream can only increase, so if the id was valid before, it definitely is valid now // * highestStream is only modified by this function - var start protocol.StreamID - if m.highestStream == 0 { - start = m.nextStream - } else { - start = m.highestStream + 4 - } - for newID := start; newID <= id; newID += 4 { + for newID := m.nextStreamToOpen; newID <= id; newID += 4 { m.streams[newID] = m.newStream(newID) m.cond.Signal() } - m.highestStream = id + m.nextStreamToOpen = id + 4 s := m.streams[id] m.mutex.Unlock() return s, nil @@ -113,8 +108,8 @@ func (m *incomingUniStreamsMap) DeleteStream(id protocol.StreamID) error { } delete(m.streams, id) // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream - if numNewStreams := m.maxNumStreams - len(m.streams); numNewStreams > 0 { - m.maxStream = m.highestStream + protocol.StreamID(numNewStreams*4) + if numNewStreams := m.maxNumStreams - len(m.streams) - 1; numNewStreams > 0 { + m.maxStream = m.nextStreamToOpen + protocol.StreamID(numNewStreams*4) m.queueMaxStreamID(&wire.MaxStreamIDFrame{StreamID: m.maxStream}) } return nil