From ddc886be7a30a521369444eb4ff76caf5c6122f5 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 29 Jan 2020 11:14:29 +0700 Subject: [PATCH] remove inactive streams from the window update queue --- window_update_queue.go | 2 +- window_update_queue_test.go | 26 ++++++++++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/window_update_queue.go b/window_update_queue.go index 64b912a3f..607b72d7e 100644 --- a/window_update_queue.go +++ b/window_update_queue.go @@ -53,6 +53,7 @@ func (q *windowUpdateQueue) QueueAll() { } // queue all stream-level window updates for id := range q.queue { + delete(q.queue, id) str, err := q.streamGetter.GetOrOpenReceiveStream(id) if err != nil || str == nil { // the stream can be nil if it was completed before dequeing the window update continue @@ -65,7 +66,6 @@ func (q *windowUpdateQueue) QueueAll() { StreamID: id, ByteOffset: offset, }) - delete(q.queue, id) } q.mutex.Unlock() } diff --git a/window_update_queue_test.go b/window_update_queue_test.go index 76a6811b0..727565f37 100644 --- a/window_update_queue_test.go +++ b/window_update_queue_test.go @@ -52,8 +52,18 @@ var _ = Describe("Window Update Queue", func() { }) It("doesn't queue a MAX_STREAM_DATA for a closed stream", func() { - streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(12)).Return(nil, nil) q.AddStream(12) + streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(12)).Return(nil, nil) + q.QueueAll() + Expect(queuedFrames).To(BeEmpty()) + }) + + It("removes closed streams from the queue", func() { + q.AddStream(12) + streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(12)).Return(nil, nil) + q.QueueAll() + Expect(queuedFrames).To(BeEmpty()) + // don't EXPECT any further calls to GetOrOpenReceiveStream q.QueueAll() Expect(queuedFrames).To(BeEmpty()) }) @@ -61,8 +71,20 @@ var _ = Describe("Window Update Queue", func() { It("doesn't queue a MAX_STREAM_DATA if the flow controller returns an offset of 0", func() { stream5 := NewMockStreamI(mockCtrl) stream5.EXPECT().getWindowUpdate().Return(protocol.ByteCount(0)) - streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(5)).Return(stream5, nil) q.AddStream(5) + streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(5)).Return(stream5, nil) + q.QueueAll() + Expect(queuedFrames).To(BeEmpty()) + }) + + It("removes streams for which the flow controller returns an offset of 0 from the queue", func() { + stream5 := NewMockStreamI(mockCtrl) + stream5.EXPECT().getWindowUpdate().Return(protocol.ByteCount(0)) + q.AddStream(5) + streamGetter.EXPECT().GetOrOpenReceiveStream(protocol.StreamID(5)).Return(stream5, nil) + q.QueueAll() + Expect(queuedFrames).To(BeEmpty()) + // don't EXPECT any further calls to GetOrOpenReveiveStream and to getWindowUpdate q.QueueAll() Expect(queuedFrames).To(BeEmpty()) })