From 478e4c95e16ca6f85d5e862a43f9d5c53dc9b596 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 10 Jun 2016 18:50:02 +0700 Subject: [PATCH] garbage collect deleted streams from scheduler in StreamFrameQueue fixes #176 --- stream_frame_queue.go | 30 +++++++++++++++++++++++++----- stream_frame_queue_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/stream_frame_queue.go b/stream_frame_queue.go index 1070bed2..ca9d8241 100644 --- a/stream_frame_queue.go +++ b/stream_frame_queue.go @@ -78,7 +78,7 @@ func (q *streamFrameQueue) Pop(maxLength protocol.ByteCount) (*frames.StreamFram for len(q.prioFrames) > 0 { frame = q.prioFrames[0] - if frame == nil { + if frame == nil { // this happens when a Stream that had prioFrames queued gets deleted q.prioFrames = q.prioFrames[1:] continue } @@ -148,6 +148,30 @@ func (q *streamFrameQueue) RemoveStream(streamID protocol.StreamID) { q.activeStreams[i] = 0 } } + + q.garbageCollectActiveStreams() +} + +func (q *streamFrameQueue) garbageCollectActiveStreams() { + var j int + var deletedIndex int + + for i, str := range q.activeStreams { + if str != 0 { + q.activeStreams[j] = str + j++ + } else { + deletedIndex = i + } + } + + if len(q.activeStreams) > 0 { + q.activeStreams = q.activeStreams[:len(q.activeStreams)-1] + } + + if deletedIndex < q.activeStreamsPosition { + q.activeStreamsPosition-- + } } // front returns the next element without modifying the queue @@ -163,10 +187,6 @@ func (q *streamFrameQueue) getNextStream() (protocol.StreamID, error) { streamID := q.activeStreams[q.activeStreamsPosition] q.activeStreamsPosition = (q.activeStreamsPosition + 1) % len(q.activeStreams) - if streamID == 0 { // this happens if the stream was deleted - continue - } - frameQueue, ok := q.frameMap[streamID] if !ok { return 0, errMapAccess diff --git a/stream_frame_queue_test.go b/stream_frame_queue_test.go index d34a1e8a..96aa7c87 100644 --- a/stream_frame_queue_test.go +++ b/stream_frame_queue_test.go @@ -423,5 +423,41 @@ var _ = Describe("streamFrameQueue", func() { Expect(err).ToNot(HaveOccurred()) Expect(frame).To(Equal(frame2)) }) + + Context("garbage collection of activeStreams", func() { + It("adjusts the activeStreams slice", func() { + queue.activeStreams = []protocol.StreamID{5, 6, 10, 2, 3} + queue.RemoveStream(10) + Expect(queue.activeStreams).To(Equal([]protocol.StreamID{5, 6, 2, 3})) + }) + + It("garbage collects correctly if there is only one stream", func() { + queue.activeStreams = []protocol.StreamID{10} + queue.RemoveStream(10) + Expect(queue.activeStreams).To(BeEmpty()) + Expect(queue.activeStreamsPosition).To(Equal(0)) + }) + + It("does not change the scheduling, when the stream deleted is after the current position in activeStreams", func() { + queue.activeStreams = []protocol.StreamID{5, 6, 10, 2, 3} + queue.activeStreamsPosition = 0 // the next frame would be from Stream 5 + queue.RemoveStream(10) + Expect(queue.activeStreamsPosition).To(Equal(0)) + }) + + It("makes sure that scheduling is adjusted, if the stream deleted is before the current position in activeStreams", func() { + queue.activeStreams = []protocol.StreamID{5, 6, 10, 2, 3} + queue.activeStreamsPosition = 3 // the next frame would be from Stream 2 + queue.RemoveStream(10) + Expect(queue.activeStreamsPosition).To(Equal(2)) + }) + + It("makes sure that scheduling is adjusted, when a frame from the deleted stream was scheduled", func() { + queue.activeStreams = []protocol.StreamID{5, 6, 10, 2, 3} + queue.activeStreamsPosition = 2 // the next frame would be from Stream 10 + queue.RemoveStream(10) + Expect(queue.activeStreamsPosition).To(Equal(2)) // the next frame will be from Stream 2 + }) + }) }) })