From 8a9d70488a36aab1307495492c4237e3c8c184ef Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 16 Jun 2016 16:00:18 +0700 Subject: [PATCH] respect connection flow control window in StreamFrameQueue --- stream_frame_queue.go | 18 +++++++++++++++--- stream_frame_queue_test.go | 23 ++++++++++++++++++++++- stream_test.go | 18 +++++++++++++----- 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/stream_frame_queue.go b/stream_frame_queue.go index 7ef2e7c6..284ee49b 100644 --- a/stream_frame_queue.go +++ b/stream_frame_queue.go @@ -148,7 +148,7 @@ func (q *streamFrameQueue) Pop(maxLength protocol.ByteCount) (*frames.StreamFram q.byteLen -= frame.DataLen() - // TODO: find a better solution for identifying streams that don't contribute to connection level flow control + // TODO: don't add retransmission to connection-level flow control q.flowControlManager.AddBytesSent(streamID, frame.DataLen()) q.len-- @@ -261,9 +261,21 @@ func (q *streamFrameQueue) getMaximumFrameDataSize(frame *frames.StreamFrame) (p if err != nil { return 0, err } - if frame.Offset > highestAllowedStreamOffset { // stream level flow control blocked + // stream level flow control blocked + // TODO: shouldn't that be >= + if frame.Offset > highestAllowedStreamOffset { return 0, errStreamFlowControlBlocked } - return highestAllowedStreamOffset - frame.Offset, nil + maxFrameSize := highestAllowedStreamOffset - frame.Offset + + contributes, err := q.flowControlManager.StreamContributesToConnectionFlowControl(frame.StreamID) + if err != nil { + return 0, err + } + if contributes { + maxFrameSize = utils.MinByteCount(maxFrameSize, q.flowControlManager.RemainingConnectionWindowSize()) + } + + return maxFrameSize, nil } diff --git a/stream_frame_queue_test.go b/stream_frame_queue_test.go index bb9c7aa8..6b2553df 100644 --- a/stream_frame_queue_test.go +++ b/stream_frame_queue_test.go @@ -406,7 +406,7 @@ var _ = Describe("streamFrameQueue", func() { Expect(frame.DataLen()).To(Equal(protocol.ByteCount(len))) }) - It("returns a split frame if the whole frame doesn't fit, for non-zero StreamFrame offset", func() { + It("returns a split frame if the whole frame doesn't fit in the stream flow control window, for non-zero StreamFrame offset", func() { frame1.Offset = 2 queue.Push(frame1, false) queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 4 @@ -415,6 +415,16 @@ var _ = Describe("streamFrameQueue", func() { Expect(frame.DataLen()).To(Equal(protocol.ByteCount(2))) }) + It("returns a split frame if the whole frame doesn't fit in the connection flow control window", func() { + frame1.Offset = 2 + queue.Push(frame1, false) + queue.flowControlManager.(*mockFlowControlHandler).streamsContributing = []protocol.StreamID{frame1.StreamID} + queue.flowControlManager.(*mockFlowControlHandler).remainingConnectionWindowSize = 3 + frame, err := queue.Pop(1000) + Expect(err).ToNot(HaveOccurred()) + Expect(frame.DataLen()).To(Equal(protocol.ByteCount(3))) + }) + It("skips a frame if the stream is flow control blocked", func() { queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 0 queue.Push(frame1, false) @@ -424,6 +434,17 @@ var _ = Describe("streamFrameQueue", func() { Expect(frame).To(Equal(frame2)) }) + It("skips a frame if the connection is flow control blocked", func() { + queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 10000 + queue.flowControlManager.(*mockFlowControlHandler).streamsContributing = []protocol.StreamID{frame1.StreamID} + queue.flowControlManager.(*mockFlowControlHandler).remainingConnectionWindowSize = 0 + queue.Push(frame1, false) + queue.Push(frame2, false) + frame, err := queue.Pop(1000) + Expect(err).ToNot(HaveOccurred()) + Expect(frame).To(Equal(frame2)) + }) + It("returns nil if no stream is not flow control blocked", func() { queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 0 queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame2.StreamID] = 0 diff --git a/stream_test.go b/stream_test.go index 814de7c6..a1395981 100644 --- a/stream_test.go +++ b/stream_test.go @@ -40,9 +40,12 @@ func (m *mockStreamHandler) queueStreamFrame(f *frames.StreamFrame) error { } type mockFlowControlHandler struct { - sendWindowSizes map[protocol.StreamID]protocol.ByteCount - bytesReadForStream protocol.StreamID - bytesRead protocol.ByteCount + streamsContributing []protocol.StreamID + + sendWindowSizes map[protocol.StreamID]protocol.ByteCount + remainingConnectionWindowSize protocol.ByteCount + bytesReadForStream protocol.StreamID + bytesRead protocol.ByteCount highestReceivedForStream protocol.StreamID highestReceived protocol.ByteCount @@ -88,13 +91,18 @@ func (m *mockFlowControlHandler) SendWindowSize(streamID protocol.StreamID) (pro return m.sendWindowSizes[streamID], nil } func (m *mockFlowControlHandler) RemainingConnectionWindowSize() protocol.ByteCount { - panic("not implemented") + return m.remainingConnectionWindowSize } func (m *mockFlowControlHandler) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) { panic("not implemented") } func (m *mockFlowControlHandler) StreamContributesToConnectionFlowControl(streamID protocol.StreamID) (bool, error) { - panic("not implemented") + for _, id := range m.streamsContributing { + if id == streamID { + return true, nil + } + } + return false, nil } var _ = Describe("Stream", func() {