From 92aa9c2b138c61d0d026094875c4b193b81f147d Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 10 Feb 2017 19:14:38 +0700 Subject: [PATCH] remove obsolete garbage collection from streamsMap --- streams_map.go | 42 +++------------------ streams_map_test.go | 92 ++------------------------------------------- 2 files changed, 9 insertions(+), 125 deletions(-) diff --git a/streams_map.go b/streams_map.go index dac409a5..0517685d 100644 --- a/streams_map.go +++ b/streams_map.go @@ -17,23 +17,19 @@ type streamsMap struct { connectionParameters handshake.ConnectionParametersManager streams map[protocol.StreamID]*stream - // TODO: remove this - openStreams []protocol.StreamID + // needed for round-robin scheduling + openStreams []protocol.StreamID + roundRobinIndex uint32 nextStream protocol.StreamID // StreamID of the next Stream that will be returned by OpenStream() highestStreamOpenedByPeer protocol.StreamID - // TODO: remove this - streamsOpenedAfterLastGarbageCollect int - newStream newStreamLambda maxOutgoingStreams uint32 numOutgoingStreams uint32 maxIncomingStreams uint32 numIncomingStreams uint32 - - roundRobinIndex uint32 } type streamLambda func(*stream) (bool, error) @@ -98,12 +94,6 @@ func (m *streamsMap) GetOrOpenStream(id protocol.StreamID) (*stream, error) { sid -= 2 } - // maybe trigger garbage collection of streams map - m.streamsOpenedAfterLastGarbageCollect++ - if m.streamsOpenedAfterLastGarbageCollect%protocol.MaxNewStreamIDDelta == 0 { - m.garbageCollectClosedStreams() - } - return m.streams[id], nil } @@ -194,7 +184,7 @@ func (m *streamsMap) RoundRobinIterate(fn streamLambda) error { m.mutex.Lock() defer m.mutex.Unlock() - numStreams := uint32(len(m.openStreams)) + numStreams := uint32(len(m.streams)) startIndex := m.roundRobinIndex for _, i := range []protocol.StreamID{1, 3} { @@ -209,7 +199,6 @@ func (m *streamsMap) RoundRobinIterate(fn streamLambda) error { for i := uint32(0); i < numStreams; i++ { streamID := m.openStreams[(i+startIndex)%numStreams] - if streamID == 1 || streamID == 3 { continue } @@ -231,9 +220,6 @@ func (m *streamsMap) iterateFunc(streamID protocol.StreamID, fn streamLambda) (b if !ok { return true, errMapAccess } - if str == nil { - return false, fmt.Errorf("BUG: Stream %d is closed, but still in openStreams map", streamID) - } return fn(str) } @@ -245,7 +231,6 @@ func (m *streamsMap) putStream(s *stream) error { m.streams[id] = s m.openStreams = append(m.openStreams, id) - return nil } @@ -256,7 +241,6 @@ func (m *streamsMap) RemoveStream(id protocol.StreamID) error { return fmt.Errorf("attempted to remove non-existing stream: %d", id) } - m.streams[id] = nil if id%2 == 0 { m.numOutgoingStreams-- } else { @@ -275,22 +259,6 @@ func (m *streamsMap) RemoveStream(id protocol.StreamID) error { } } + delete(m.streams, id) return nil } - -// garbageCollectClosedStreams deletes nil values in the streams if they are smaller than protocol.MaxNewStreamIDDelta than the highest stream opened by the client -// note that this garbage collection is relatively expensive, since it iterates over the whole streams map. It should not be called every time a stream is openend or closed -func (m *streamsMap) garbageCollectClosedStreams() { - for id, str := range m.streams { - if str != nil { - continue - } - - // server-side streams can be gargage collected immediately - // client-side streams need to be kept as nils in the streams map for a bit longer, in order to prevent a client from reopening closed streams - if id%2 == 0 || id+protocol.MaxNewStreamIDDelta <= m.highestStreamOpenedByPeer { - delete(m.streams, id) - } - } - m.streamsOpenedAfterLastGarbageCollect = 0 -} diff --git a/streams_map_test.go b/streams_map_test.go index c94e06d1..9277e6b7 100644 --- a/streams_map_test.go +++ b/streams_map_test.go @@ -71,6 +71,10 @@ var _ = Describe("Streams Map", func() { } }) + AfterEach(func() { + Expect(m.openStreams).To(HaveLen(len(m.streams))) + }) + Context("getting and creating streams", func() { Context("as a server", func() { BeforeEach(func() { @@ -262,94 +266,6 @@ var _ = Describe("Streams Map", func() { setNewStreamsMap(protocol.PerspectiveServer) }) - // TODO: remove when removing the openStreams slice - Context("DoS mitigation", func() { - It("opens and closes a lot of streams", func() { - for i := 1; i < 2*protocol.MaxNewStreamIDDelta; i += 2 { - streamID := protocol.StreamID(i) - _, err := m.GetOrOpenStream(streamID) - Expect(m.highestStreamOpenedByPeer).To(Equal(streamID)) - Expect(err).NotTo(HaveOccurred()) - err = m.RemoveStream(streamID) - Expect(err).NotTo(HaveOccurred()) - } - }) - - PIt("prevents opening of streams with very low StreamIDs, if higher streams have already been opened", func() { - for i := 1; i < protocol.MaxNewStreamIDDelta+14; i += 2 { - if i == 11 || i == 13 { - continue - } - streamID := protocol.StreamID(i) - _, err := m.GetOrOpenStream(streamID) - Expect(err).NotTo(HaveOccurred()) - err = m.RemoveStream(streamID) - Expect(err).NotTo(HaveOccurred()) - } - Expect(m.highestStreamOpenedByPeer).To(Equal(protocol.StreamID(protocol.MaxNewStreamIDDelta + 13))) - _, err := m.GetOrOpenStream(11) - Expect(err).To(MatchError("InvalidStreamID: attempted to open stream 11, which is a lot smaller than the highest opened stream, 413")) - _, err = m.GetOrOpenStream(13) - Expect(err).ToNot(HaveOccurred()) - }) - - It("garbage-collects closed streams", func() { - for i := 1; i < 4*protocol.MaxNewStreamIDDelta; i += 2 { - streamID := protocol.StreamID(i) - _, err := m.GetOrOpenStream(streamID) - Expect(m.highestStreamOpenedByPeer).To(Equal(streamID)) - Expect(err).NotTo(HaveOccurred()) - err = m.RemoveStream(streamID) - Expect(err).NotTo(HaveOccurred()) - } - m.garbageCollectClosedStreams() - for i := 1; i < 3*protocol.MaxNewStreamIDDelta; i += 2 { - Expect(m.streams).ToNot(HaveKey(protocol.StreamID(i))) - } - for i := 3*protocol.MaxNewStreamIDDelta + 1; i < 4*protocol.MaxNewStreamIDDelta; i += 2 { - Expect(m.streams).To(HaveKey(protocol.StreamID(i))) - } - }) - - It("does not garbage-collects open streams", func() { - for i := 1; i < 1002; i += 2 { - streamID := protocol.StreamID(i) - _, err := m.GetOrOpenStream(streamID) - Expect(m.highestStreamOpenedByPeer).To(Equal(streamID)) - Expect(err).NotTo(HaveOccurred()) - if streamID != 23 { - err = m.RemoveStream(streamID) - Expect(err).NotTo(HaveOccurred()) - } - } - lengthBefore := len(m.streams) - m.garbageCollectClosedStreams() - Expect(len(m.streams)).To(BeNumerically("<", lengthBefore)) - Expect(m.streams).To(HaveKey(protocol.StreamID(23))) - Expect(m.streams[23]).ToNot(BeNil()) - }) - - It("runs garbage-collection after a bunch of streams have been opened", func() { - numGarbageCollections := 0 - numSavedStreams := 0 - for i := 1; i < 4*protocol.MaxNewStreamIDDelta; i += 2 { - streamID := protocol.StreamID(i) - _, err := m.GetOrOpenStream(streamID) - Expect(m.highestStreamOpenedByPeer).To(Equal(streamID)) - Expect(err).NotTo(HaveOccurred()) - err = m.RemoveStream(streamID) - Expect(err).NotTo(HaveOccurred()) - if len(m.streams) != numSavedStreams+1 { - numGarbageCollections++ - } - numSavedStreams = len(m.streams) - } - Expect(numGarbageCollections).ToNot(BeZero()) - Expect(numGarbageCollections).To(BeNumerically("<", 4)) - Expect(len(m.streams)).To(BeNumerically("<", 2*protocol.MaxNewStreamIDDelta)) - }) - }) - Context("deleting streams", func() { BeforeEach(func() { for i := 1; i <= 5; i++ {