From b8ea5c798155950fb5bbfdd06cad1939c9355878 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 1 Aug 2024 13:08:09 -0700 Subject: [PATCH] simplify generation of STREAM_DATA_BLOCKED frames (#4608) The stream always gets blocked at the current write offset. There's no need to return this offset from the flow controller. --- internal/flowcontrol/interface.go | 3 +- .../flowcontrol/stream_flow_controller.go | 5 ++ internal/mocks/stream_flow_controller.go | 13 ++-- send_stream.go | 4 +- send_stream_test.go | 61 +++++-------------- 5 files changed, 31 insertions(+), 55 deletions(-) diff --git a/internal/flowcontrol/interface.go b/internal/flowcontrol/interface.go index befd5c126..57d12a95e 100644 --- a/internal/flowcontrol/interface.go +++ b/internal/flowcontrol/interface.go @@ -9,7 +9,6 @@ type flowController interface { AddBytesSent(protocol.ByteCount) // for receiving GetWindowUpdate() protocol.ByteCount // returns 0 if no update is necessary - IsNewlyBlocked() (bool, protocol.ByteCount) } // A StreamFlowController is a flow controller for a QUIC stream. @@ -23,6 +22,7 @@ type StreamFlowController interface { // Abandon is called when reading from the stream is aborted early, // and there won't be any further calls to AddBytesRead. Abandon() + IsNewlyBlocked() bool } // The ConnectionFlowController is the flow controller for the connection. @@ -30,6 +30,7 @@ type ConnectionFlowController interface { flowController AddBytesRead(protocol.ByteCount) Reset() error + IsNewlyBlocked() (bool, protocol.ByteCount) } type connectionFlowControllerI interface { diff --git a/internal/flowcontrol/stream_flow_controller.go b/internal/flowcontrol/stream_flow_controller.go index 6b432c439..2d58351cb 100644 --- a/internal/flowcontrol/stream_flow_controller.go +++ b/internal/flowcontrol/stream_flow_controller.go @@ -121,6 +121,11 @@ func (c *streamFlowController) SendWindowSize() protocol.ByteCount { return min(c.baseFlowController.sendWindowSize(), c.connection.SendWindowSize()) } +func (c *streamFlowController) IsNewlyBlocked() bool { + blocked, _ := c.baseFlowController.IsNewlyBlocked() + return blocked +} + func (c *streamFlowController) shouldQueueWindowUpdate() bool { return !c.receivedFinalOffset && c.hasWindowUpdate() } diff --git a/internal/mocks/stream_flow_controller.go b/internal/mocks/stream_flow_controller.go index 3b63680cc..b5ca05c18 100644 --- a/internal/mocks/stream_flow_controller.go +++ b/internal/mocks/stream_flow_controller.go @@ -188,12 +188,11 @@ func (c *MockStreamFlowControllerGetWindowUpdateCall) DoAndReturn(f func() proto } // IsNewlyBlocked mocks base method. -func (m *MockStreamFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) { +func (m *MockStreamFlowController) IsNewlyBlocked() bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "IsNewlyBlocked") ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(protocol.ByteCount) - return ret0, ret1 + return ret0 } // IsNewlyBlocked indicates an expected call of IsNewlyBlocked. @@ -209,19 +208,19 @@ type MockStreamFlowControllerIsNewlyBlockedCall struct { } // Return rewrite *gomock.Call.Return -func (c *MockStreamFlowControllerIsNewlyBlockedCall) Return(arg0 bool, arg1 protocol.ByteCount) *MockStreamFlowControllerIsNewlyBlockedCall { - c.Call = c.Call.Return(arg0, arg1) +func (c *MockStreamFlowControllerIsNewlyBlockedCall) Return(arg0 bool) *MockStreamFlowControllerIsNewlyBlockedCall { + c.Call = c.Call.Return(arg0) return c } // Do rewrite *gomock.Call.Do -func (c *MockStreamFlowControllerIsNewlyBlockedCall) Do(f func() (bool, protocol.ByteCount)) *MockStreamFlowControllerIsNewlyBlockedCall { +func (c *MockStreamFlowControllerIsNewlyBlockedCall) Do(f func() bool) *MockStreamFlowControllerIsNewlyBlockedCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockStreamFlowControllerIsNewlyBlockedCall) DoAndReturn(f func() (bool, protocol.ByteCount)) *MockStreamFlowControllerIsNewlyBlockedCall { +func (c *MockStreamFlowControllerIsNewlyBlockedCall) DoAndReturn(f func() bool) *MockStreamFlowControllerIsNewlyBlockedCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/send_stream.go b/send_stream.go index 2364c8d21..c89424e41 100644 --- a/send_stream.go +++ b/send_stream.go @@ -262,10 +262,10 @@ func (s *sendStream) popNewOrRetransmittedStreamFrame(maxBytes protocol.ByteCoun sendWindow := s.flowController.SendWindowSize() if sendWindow == 0 { - if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked { + if s.flowController.IsNewlyBlocked() { s.sender.queueControlFrame(&wire.StreamDataBlockedFrame{ StreamID: s.streamID, - MaximumStreamData: offset, + MaximumStreamData: s.writeOffset, }) return nil, false } diff --git a/send_stream_test.go b/send_stream_test.go index 7d0443f73..4c8df1c4e 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -324,56 +324,27 @@ var _ = Describe("Send Stream", func() { Context("flow control blocking", func() { It("queues a BLOCKED frame if the stream is flow control blocked", func() { - mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(0)) - mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(12)) - mockSender.EXPECT().queueControlFrame(&wire.StreamDataBlockedFrame{ - StreamID: streamID, - MaximumStreamData: 12, - }) - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - defer close(done) - mockSender.EXPECT().onHasStreamData(streamID, str) - _, err := str.Write([]byte("foobar")) - Expect(err).ToNot(HaveOccurred()) - }() - waitForWrite() - _, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1) - Expect(ok).To(BeFalse()) - Expect(hasMoreData).To(BeFalse()) - // make the Write go routine return - str.closeForShutdown(nil) - Eventually(done).Should(BeClosed()) - }) - - It("says that it doesn't have any more data, when it is flow control blocked", func() { - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - defer close(done) - mockSender.EXPECT().onHasStreamData(streamID, str) - _, err := str.Write([]byte("foobar")) - Expect(err).ToNot(HaveOccurred()) - }() - waitForWrite() - - // first pop a STREAM frame of the maximum size allowed by flow control mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(3)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3)) - f, ok, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+3, protocol.Version1) - Expect(ok).To(BeTrue()) - Expect(f).ToNot(BeNil()) - Expect(hasMoreData).To(BeTrue()) - - // try to pop again, this time noticing that we're blocked - mockFC.EXPECT().SendWindowSize() - // don't use offset 3 here, to make sure the BLOCKED frame contains the number returned by the flow controller - mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(10)) + mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(0)) + mockFC.EXPECT().IsNewlyBlocked().Return(true) mockSender.EXPECT().queueControlFrame(&wire.StreamDataBlockedFrame{ StreamID: streamID, - MaximumStreamData: 10, + MaximumStreamData: 3, }) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(done) + mockSender.EXPECT().onHasStreamData(streamID, str) + _, err := str.Write([]byte("foobar")) + Expect(err).ToNot(HaveOccurred()) + }() + waitForWrite() + f, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1) + Expect(ok).To(BeTrue()) + Expect(hasMoreData).To(BeTrue()) + Expect(f.Frame.Data).To(HaveLen(3)) _, ok, hasMoreData = str.popStreamFrame(1000, protocol.Version1) Expect(ok).To(BeFalse()) Expect(hasMoreData).To(BeFalse())