diff --git a/internal/flowcontrol/connection_flow_controller.go b/internal/flowcontrol/connection_flow_controller.go index e4dc4edc8..23e6d6786 100644 --- a/internal/flowcontrol/connection_flow_controller.go +++ b/internal/flowcontrol/connection_flow_controller.go @@ -54,7 +54,12 @@ func (c *connectionFlowController) IncrementHighestReceived(increment protocol.B return nil } -func (c *connectionFlowController) MaybeQueueWindowUpdate() { +func (c *connectionFlowController) AddBytesRead(n protocol.ByteCount) { + c.baseFlowController.AddBytesRead(n) + c.maybeQueueWindowUpdate() +} + +func (c *connectionFlowController) maybeQueueWindowUpdate() { c.mutex.Lock() hasWindowUpdate := c.hasWindowUpdate() c.mutex.Unlock() diff --git a/internal/flowcontrol/connection_flow_controller_test.go b/internal/flowcontrol/connection_flow_controller_test.go index 91edab427..b344b89ed 100644 --- a/internal/flowcontrol/connection_flow_controller_test.go +++ b/internal/flowcontrol/connection_flow_controller_test.go @@ -23,6 +23,7 @@ var _ = Describe("Connection Flow controller", func() { } BeforeEach(func() { + queuedWindowUpdate = false controller = &connectionFlowController{} controller.rttStats = &congestion.RTTStats{} controller.logger = utils.DefaultLogger @@ -58,14 +59,13 @@ var _ = Describe("Connection Flow controller", func() { }) It("queues window updates", func() { - controller.MaybeQueueWindowUpdate() + controller.AddBytesRead(1) Expect(queuedWindowUpdate).To(BeFalse()) - controller.AddBytesRead(30) - controller.MaybeQueueWindowUpdate() + controller.AddBytesRead(29) Expect(queuedWindowUpdate).To(BeTrue()) Expect(controller.GetWindowUpdate()).ToNot(BeZero()) queuedWindowUpdate = false - controller.MaybeQueueWindowUpdate() + controller.AddBytesRead(1) Expect(queuedWindowUpdate).To(BeFalse()) }) diff --git a/internal/flowcontrol/interface.go b/internal/flowcontrol/interface.go index 9f07c887a..378257bab 100644 --- a/internal/flowcontrol/interface.go +++ b/internal/flowcontrol/interface.go @@ -10,7 +10,6 @@ type flowController interface { // for receiving AddBytesRead(protocol.ByteCount) GetWindowUpdate() protocol.ByteCount // returns 0 if no update is necessary - MaybeQueueWindowUpdate() // queues a window update, if necessary IsNewlyBlocked() (bool, protocol.ByteCount) } diff --git a/internal/flowcontrol/stream_flow_controller.go b/internal/flowcontrol/stream_flow_controller.go index 5e58b5662..6062eec49 100644 --- a/internal/flowcontrol/stream_flow_controller.go +++ b/internal/flowcontrol/stream_flow_controller.go @@ -89,6 +89,7 @@ func (c *streamFlowController) UpdateHighestReceived(byteOffset protocol.ByteCou func (c *streamFlowController) AddBytesRead(n protocol.ByteCount) { c.baseFlowController.AddBytesRead(n) + c.maybeQueueWindowUpdate() c.connection.AddBytesRead(n) } @@ -101,14 +102,13 @@ func (c *streamFlowController) SendWindowSize() protocol.ByteCount { return utils.MinByteCount(c.baseFlowController.sendWindowSize(), c.connection.SendWindowSize()) } -func (c *streamFlowController) MaybeQueueWindowUpdate() { +func (c *streamFlowController) maybeQueueWindowUpdate() { c.mutex.Lock() hasWindowUpdate := !c.receivedFinalOffset && c.hasWindowUpdate() c.mutex.Unlock() if hasWindowUpdate { c.queueWindowUpdate() } - c.connection.MaybeQueueWindowUpdate() } func (c *streamFlowController) GetWindowUpdate() protocol.ByteCount { diff --git a/internal/flowcontrol/stream_flow_controller_test.go b/internal/flowcontrol/stream_flow_controller_test.go index 35e1d143e..f77d759ce 100644 --- a/internal/flowcontrol/stream_flow_controller_test.go +++ b/internal/flowcontrol/stream_flow_controller_test.go @@ -13,18 +13,16 @@ import ( var _ = Describe("Stream Flow controller", func() { var ( - controller *streamFlowController - queuedWindowUpdate bool - queuedConnWindowUpdate bool + controller *streamFlowController + queuedWindowUpdate bool ) BeforeEach(func() { queuedWindowUpdate = false - queuedConnWindowUpdate = false rttStats := &congestion.RTTStats{} controller = &streamFlowController{ streamID: 10, - connection: NewConnectionFlowController(1000, 1000, func() { queuedConnWindowUpdate = true }, rttStats, utils.DefaultLogger).(*connectionFlowController), + connection: NewConnectionFlowController(1000, 1000, func() {}, rttStats, utils.DefaultLogger).(*connectionFlowController), } controller.maxReceiveWindowSize = 10000 controller.rttStats = rttStats @@ -57,7 +55,6 @@ var _ = Describe("Stream Flow controller", func() { cc := NewConnectionFlowController(0, 0, nil, nil, utils.DefaultLogger) fc := NewStreamFlowController(5, cc, receiveWindow, maxReceiveWindow, sendWindow, queueWindowUpdate, rttStats, utils.DefaultLogger).(*streamFlowController) fc.AddBytesRead(receiveWindow) - fc.MaybeQueueWindowUpdate() Expect(queued).To(BeTrue()) }) }) @@ -179,25 +176,16 @@ var _ = Describe("Stream Flow controller", func() { }) It("queues window updates", func() { - controller.MaybeQueueWindowUpdate() + controller.AddBytesRead(1) Expect(queuedWindowUpdate).To(BeFalse()) - controller.AddBytesRead(30) - controller.MaybeQueueWindowUpdate() + controller.AddBytesRead(29) Expect(queuedWindowUpdate).To(BeTrue()) Expect(controller.GetWindowUpdate()).ToNot(BeZero()) queuedWindowUpdate = false - controller.MaybeQueueWindowUpdate() + controller.AddBytesRead(1) Expect(queuedWindowUpdate).To(BeFalse()) }) - It("queues connection-level window updates", func() { - controller.MaybeQueueWindowUpdate() - Expect(queuedConnWindowUpdate).To(BeFalse()) - controller.AddBytesRead(60) - controller.MaybeQueueWindowUpdate() - Expect(queuedConnWindowUpdate).To(BeTrue()) - }) - It("tells the connection flow controller when the window was autotuned", func() { oldOffset := controller.bytesRead setRtt(scaleDuration(20 * time.Millisecond)) @@ -211,10 +199,8 @@ var _ = Describe("Stream Flow controller", func() { }) It("doesn't increase the window after a final offset was already received", func() { + Expect(controller.UpdateHighestReceived(90, true)).To(Succeed()) controller.AddBytesRead(30) - err := controller.UpdateHighestReceived(90, true) - Expect(err).ToNot(HaveOccurred()) - controller.MaybeQueueWindowUpdate() Expect(queuedWindowUpdate).To(BeFalse()) offset := controller.GetWindowUpdate() Expect(offset).To(BeZero()) diff --git a/internal/mocks/connection_flow_controller.go b/internal/mocks/connection_flow_controller.go index 1a47362b9..ae10e785f 100644 --- a/internal/mocks/connection_flow_controller.go +++ b/internal/mocks/connection_flow_controller.go @@ -79,16 +79,6 @@ func (mr *MockConnectionFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockConnectionFlowController)(nil).IsNewlyBlocked)) } -// MaybeQueueWindowUpdate mocks base method -func (m *MockConnectionFlowController) MaybeQueueWindowUpdate() { - m.ctrl.Call(m, "MaybeQueueWindowUpdate") -} - -// MaybeQueueWindowUpdate indicates an expected call of MaybeQueueWindowUpdate -func (mr *MockConnectionFlowControllerMockRecorder) MaybeQueueWindowUpdate() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaybeQueueWindowUpdate", reflect.TypeOf((*MockConnectionFlowController)(nil).MaybeQueueWindowUpdate)) -} - // SendWindowSize mocks base method func (m *MockConnectionFlowController) SendWindowSize() protocol.ByteCount { ret := m.ctrl.Call(m, "SendWindowSize") diff --git a/internal/mocks/stream_flow_controller.go b/internal/mocks/stream_flow_controller.go index 955f55096..65e9d5f7e 100644 --- a/internal/mocks/stream_flow_controller.go +++ b/internal/mocks/stream_flow_controller.go @@ -34,6 +34,16 @@ func (m *MockStreamFlowController) EXPECT() *MockStreamFlowControllerMockRecorde return m.recorder } +// Abandon mocks base method +func (m *MockStreamFlowController) Abandon() { + m.ctrl.Call(m, "Abandon") +} + +// Abandon indicates an expected call of Abandon +func (mr *MockStreamFlowControllerMockRecorder) Abandon() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Abandon", reflect.TypeOf((*MockStreamFlowController)(nil).Abandon)) +} + // AddBytesRead mocks base method func (m *MockStreamFlowController) AddBytesRead(arg0 protocol.ByteCount) { m.ctrl.Call(m, "AddBytesRead", arg0) @@ -79,16 +89,6 @@ func (mr *MockStreamFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockStreamFlowController)(nil).IsNewlyBlocked)) } -// MaybeQueueWindowUpdate mocks base method -func (m *MockStreamFlowController) MaybeQueueWindowUpdate() { - m.ctrl.Call(m, "MaybeQueueWindowUpdate") -} - -// MaybeQueueWindowUpdate indicates an expected call of MaybeQueueWindowUpdate -func (mr *MockStreamFlowControllerMockRecorder) MaybeQueueWindowUpdate() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaybeQueueWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).MaybeQueueWindowUpdate)) -} - // SendWindowSize mocks base method func (m *MockStreamFlowController) SendWindowSize() protocol.ByteCount { ret := m.ctrl.Call(m, "SendWindowSize") diff --git a/receive_stream.go b/receive_stream.go index 175125202..4aa76b537 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -172,8 +172,6 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err if !s.resetRemotely { s.flowController.AddBytesRead(protocol.ByteCount(m)) } - // increase the flow control window, if necessary - s.flowController.MaybeQueueWindowUpdate() if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast { s.finRead = true diff --git a/receive_stream_test.go b/receive_stream_test.go index 7707aba2e..5b539a860 100644 --- a/receive_stream_test.go +++ b/receive_stream_test.go @@ -43,7 +43,6 @@ var _ = Describe("Receive Stream", func() { It("reads a single STREAM frame", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4)) - mockFC.EXPECT().MaybeQueueWindowUpdate() frame := wire.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, @@ -61,7 +60,6 @@ var _ = Describe("Receive Stream", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)) - mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2) frame := wire.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, @@ -83,7 +81,6 @@ var _ = Describe("Receive Stream", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2) - mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2) frame1 := wire.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD}, @@ -107,7 +104,6 @@ var _ = Describe("Receive Stream", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2) - mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2) frame1 := wire.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD}, @@ -130,7 +126,6 @@ var _ = Describe("Receive Stream", func() { It("waits until data is available", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)) - mockFC.EXPECT().MaybeQueueWindowUpdate() go func() { defer GinkgoRecover() frame := wire.StreamFrame{Data: []byte{0xDE, 0xAD}} @@ -148,7 +143,6 @@ var _ = Describe("Receive Stream", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2) - mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2) frame1 := wire.StreamFrame{ Offset: 2, Data: []byte{0xBE, 0xEF}, @@ -173,7 +167,6 @@ var _ = Describe("Receive Stream", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2) - mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2) frame1 := wire.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD}, @@ -204,7 +197,6 @@ var _ = Describe("Receive Stream", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4)) - mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2) frame1 := wire.StreamFrame{ Offset: 0, Data: []byte("foob"), @@ -337,7 +329,6 @@ var _ = Describe("Receive Stream", func() { It("returns EOFs", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4)) - mockFC.EXPECT().MaybeQueueWindowUpdate() str.handleStreamFrame(&wire.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, @@ -358,7 +349,6 @@ var _ = Describe("Receive Stream", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2) - mockFC.EXPECT().MaybeQueueWindowUpdate().Times(2) frame1 := wire.StreamFrame{ Offset: 2, Data: []byte{0xBE, 0xEF}, @@ -386,7 +376,6 @@ var _ = Describe("Receive Stream", func() { It("returns EOFs with partial read", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), true) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)) - mockFC.EXPECT().MaybeQueueWindowUpdate() err := str.handleStreamFrame(&wire.StreamFrame{ Offset: 0, Data: []byte{0xde, 0xad}, @@ -404,7 +393,6 @@ var _ = Describe("Receive Stream", func() { It("handles immediate FINs", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0)) - mockFC.EXPECT().MaybeQueueWindowUpdate() err := str.handleStreamFrame(&wire.StreamFrame{ Offset: 0, FinBit: true, @@ -421,7 +409,6 @@ var _ = Describe("Receive Stream", func() { It("closes when CloseRemote is called", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0)) - mockFC.EXPECT().MaybeQueueWindowUpdate() str.CloseRemote(0) mockSender.EXPECT().onStreamCompleted(streamID) b := make([]byte, 8) @@ -497,7 +484,6 @@ var _ = Describe("Receive Stream", func() { It("doesn't send a RESET_STREAM frame, if the FIN was already read", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true) mockFC.EXPECT().AddBytesRead(protocol.ByteCount(6)) - mockFC.EXPECT().MaybeQueueWindowUpdate() // no calls to mockSender.queueControlFrame err := str.handleStreamFrame(&wire.StreamFrame{ StreamID: streamID,