garbage collect deleted streams from scheduler in StreamFrameQueue

fixes #176
This commit is contained in:
Marten Seemann
2016-06-10 18:50:02 +07:00
parent fecfc171ae
commit 478e4c95e1
2 changed files with 61 additions and 5 deletions

View File

@@ -78,7 +78,7 @@ func (q *streamFrameQueue) Pop(maxLength protocol.ByteCount) (*frames.StreamFram
for len(q.prioFrames) > 0 {
frame = q.prioFrames[0]
if frame == nil {
if frame == nil { // this happens when a Stream that had prioFrames queued gets deleted
q.prioFrames = q.prioFrames[1:]
continue
}
@@ -148,6 +148,30 @@ func (q *streamFrameQueue) RemoveStream(streamID protocol.StreamID) {
q.activeStreams[i] = 0
}
}
q.garbageCollectActiveStreams()
}
func (q *streamFrameQueue) garbageCollectActiveStreams() {
var j int
var deletedIndex int
for i, str := range q.activeStreams {
if str != 0 {
q.activeStreams[j] = str
j++
} else {
deletedIndex = i
}
}
if len(q.activeStreams) > 0 {
q.activeStreams = q.activeStreams[:len(q.activeStreams)-1]
}
if deletedIndex < q.activeStreamsPosition {
q.activeStreamsPosition--
}
}
// front returns the next element without modifying the queue
@@ -163,10 +187,6 @@ func (q *streamFrameQueue) getNextStream() (protocol.StreamID, error) {
streamID := q.activeStreams[q.activeStreamsPosition]
q.activeStreamsPosition = (q.activeStreamsPosition + 1) % len(q.activeStreams)
if streamID == 0 { // this happens if the stream was deleted
continue
}
frameQueue, ok := q.frameMap[streamID]
if !ok {
return 0, errMapAccess

View File

@@ -423,5 +423,41 @@ var _ = Describe("streamFrameQueue", func() {
Expect(err).ToNot(HaveOccurred())
Expect(frame).To(Equal(frame2))
})
Context("garbage collection of activeStreams", func() {
It("adjusts the activeStreams slice", func() {
queue.activeStreams = []protocol.StreamID{5, 6, 10, 2, 3}
queue.RemoveStream(10)
Expect(queue.activeStreams).To(Equal([]protocol.StreamID{5, 6, 2, 3}))
})
It("garbage collects correctly if there is only one stream", func() {
queue.activeStreams = []protocol.StreamID{10}
queue.RemoveStream(10)
Expect(queue.activeStreams).To(BeEmpty())
Expect(queue.activeStreamsPosition).To(Equal(0))
})
It("does not change the scheduling, when the stream deleted is after the current position in activeStreams", func() {
queue.activeStreams = []protocol.StreamID{5, 6, 10, 2, 3}
queue.activeStreamsPosition = 0 // the next frame would be from Stream 5
queue.RemoveStream(10)
Expect(queue.activeStreamsPosition).To(Equal(0))
})
It("makes sure that scheduling is adjusted, if the stream deleted is before the current position in activeStreams", func() {
queue.activeStreams = []protocol.StreamID{5, 6, 10, 2, 3}
queue.activeStreamsPosition = 3 // the next frame would be from Stream 2
queue.RemoveStream(10)
Expect(queue.activeStreamsPosition).To(Equal(2))
})
It("makes sure that scheduling is adjusted, when a frame from the deleted stream was scheduled", func() {
queue.activeStreams = []protocol.StreamID{5, 6, 10, 2, 3}
queue.activeStreamsPosition = 2 // the next frame would be from Stream 10
queue.RemoveStream(10)
Expect(queue.activeStreamsPosition).To(Equal(2)) // the next frame will be from Stream 2
})
})
})
})