respect connection flow control window in StreamFrameQueue

This commit is contained in:
Marten Seemann
2016-06-16 16:00:18 +07:00
committed by Lucas Clemente
parent 1b732a4afa
commit 8a9d70488a
3 changed files with 50 additions and 9 deletions

View File

@@ -148,7 +148,7 @@ func (q *streamFrameQueue) Pop(maxLength protocol.ByteCount) (*frames.StreamFram
q.byteLen -= frame.DataLen()
// TODO: find a better solution for identifying streams that don't contribute to connection level flow control
// TODO: don't add retransmission to connection-level flow control
q.flowControlManager.AddBytesSent(streamID, frame.DataLen())
q.len--
@@ -261,9 +261,21 @@ func (q *streamFrameQueue) getMaximumFrameDataSize(frame *frames.StreamFrame) (p
if err != nil {
return 0, err
}
if frame.Offset > highestAllowedStreamOffset { // stream level flow control blocked
// stream level flow control blocked
// TODO: shouldn't that be >=
if frame.Offset > highestAllowedStreamOffset {
return 0, errStreamFlowControlBlocked
}
return highestAllowedStreamOffset - frame.Offset, nil
maxFrameSize := highestAllowedStreamOffset - frame.Offset
contributes, err := q.flowControlManager.StreamContributesToConnectionFlowControl(frame.StreamID)
if err != nil {
return 0, err
}
if contributes {
maxFrameSize = utils.MinByteCount(maxFrameSize, q.flowControlManager.RemainingConnectionWindowSize())
}
return maxFrameSize, nil
}

View File

@@ -406,7 +406,7 @@ var _ = Describe("streamFrameQueue", func() {
Expect(frame.DataLen()).To(Equal(protocol.ByteCount(len)))
})
It("returns a split frame if the whole frame doesn't fit, for non-zero StreamFrame offset", func() {
It("returns a split frame if the whole frame doesn't fit in the stream flow control window, for non-zero StreamFrame offset", func() {
frame1.Offset = 2
queue.Push(frame1, false)
queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 4
@@ -415,6 +415,16 @@ var _ = Describe("streamFrameQueue", func() {
Expect(frame.DataLen()).To(Equal(protocol.ByteCount(2)))
})
It("returns a split frame if the whole frame doesn't fit in the connection flow control window", func() {
frame1.Offset = 2
queue.Push(frame1, false)
queue.flowControlManager.(*mockFlowControlHandler).streamsContributing = []protocol.StreamID{frame1.StreamID}
queue.flowControlManager.(*mockFlowControlHandler).remainingConnectionWindowSize = 3
frame, err := queue.Pop(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame.DataLen()).To(Equal(protocol.ByteCount(3)))
})
It("skips a frame if the stream is flow control blocked", func() {
queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 0
queue.Push(frame1, false)
@@ -424,6 +434,17 @@ var _ = Describe("streamFrameQueue", func() {
Expect(frame).To(Equal(frame2))
})
It("skips a frame if the connection is flow control blocked", func() {
queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 10000
queue.flowControlManager.(*mockFlowControlHandler).streamsContributing = []protocol.StreamID{frame1.StreamID}
queue.flowControlManager.(*mockFlowControlHandler).remainingConnectionWindowSize = 0
queue.Push(frame1, false)
queue.Push(frame2, false)
frame, err := queue.Pop(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame).To(Equal(frame2))
})
It("returns nil if no stream is not flow control blocked", func() {
queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 0
queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame2.StreamID] = 0

View File

@@ -40,9 +40,12 @@ func (m *mockStreamHandler) queueStreamFrame(f *frames.StreamFrame) error {
}
type mockFlowControlHandler struct {
sendWindowSizes map[protocol.StreamID]protocol.ByteCount
bytesReadForStream protocol.StreamID
bytesRead protocol.ByteCount
streamsContributing []protocol.StreamID
sendWindowSizes map[protocol.StreamID]protocol.ByteCount
remainingConnectionWindowSize protocol.ByteCount
bytesReadForStream protocol.StreamID
bytesRead protocol.ByteCount
highestReceivedForStream protocol.StreamID
highestReceived protocol.ByteCount
@@ -88,13 +91,18 @@ func (m *mockFlowControlHandler) SendWindowSize(streamID protocol.StreamID) (pro
return m.sendWindowSizes[streamID], nil
}
func (m *mockFlowControlHandler) RemainingConnectionWindowSize() protocol.ByteCount {
panic("not implemented")
return m.remainingConnectionWindowSize
}
func (m *mockFlowControlHandler) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) {
panic("not implemented")
}
func (m *mockFlowControlHandler) StreamContributesToConnectionFlowControl(streamID protocol.StreamID) (bool, error) {
panic("not implemented")
for _, id := range m.streamsContributing {
if id == streamID {
return true, nil
}
}
return false, nil
}
var _ = Describe("Stream", func() {