diff --git a/internal/flowcontrol/base_flow_controller.go b/internal/flowcontrol/base_flow_controller.go index cfdda698e..47b9a5919 100644 --- a/internal/flowcontrol/base_flow_controller.go +++ b/internal/flowcontrol/base_flow_controller.go @@ -60,12 +60,16 @@ func (c *baseFlowController) AddBytesRead(n protocol.ByteCount) { c.bytesRead += n } +func (c *baseFlowController) hasWindowUpdate() bool { + bytesRemaining := c.receiveWindow - c.bytesRead + // update the window when more than the threshold was consumed + return bytesRemaining <= protocol.ByteCount((float64(c.receiveWindowSize) * float64((1 - protocol.WindowUpdateThreshold)))) +} + // getWindowUpdate updates the receive window, if necessary // it returns the new offset func (c *baseFlowController) getWindowUpdate() protocol.ByteCount { - bytesRemaining := c.receiveWindow - c.bytesRead - // update the window when more than the threshold was consumed - if bytesRemaining >= protocol.ByteCount((float64(c.receiveWindowSize) * float64((1 - protocol.WindowUpdateThreshold)))) { + if !c.hasWindowUpdate() { return 0 } diff --git a/internal/flowcontrol/interface.go b/internal/flowcontrol/interface.go index 5b859c686..d77bd9c0f 100644 --- a/internal/flowcontrol/interface.go +++ b/internal/flowcontrol/interface.go @@ -20,6 +20,8 @@ type StreamFlowController interface { // UpdateHighestReceived should be called when a new highest offset is received // final has to be to true if this is the final offset of the stream, as contained in a STREAM frame with FIN bit, and the RST_STREAM frame UpdateHighestReceived(offset protocol.ByteCount, final bool) error + // HasWindowUpdate says if it is necessary to update the window + HasWindowUpdate() bool } // The ConnectionFlowController is the flow controller for the connection. diff --git a/internal/flowcontrol/stream_flow_controller.go b/internal/flowcontrol/stream_flow_controller.go index 36c7539fb..94e1cb9bf 100644 --- a/internal/flowcontrol/stream_flow_controller.go +++ b/internal/flowcontrol/stream_flow_controller.go @@ -109,6 +109,13 @@ func (c *streamFlowController) SendWindowSize() protocol.ByteCount { return window } +func (c *streamFlowController) HasWindowUpdate() bool { + c.mutex.Lock() + hasWindowUpdate := !c.receivedFinalOffset && c.hasWindowUpdate() + c.mutex.Unlock() + return hasWindowUpdate +} + func (c *streamFlowController) GetWindowUpdate() protocol.ByteCount { // don't use defer for unlocking the mutex here, GetWindowUpdate() is called frequently and defer shows up in the profiler c.mutex.Lock() diff --git a/internal/flowcontrol/stream_flow_controller_test.go b/internal/flowcontrol/stream_flow_controller_test.go index cb413f102..191d47337 100644 --- a/internal/flowcontrol/stream_flow_controller_test.go +++ b/internal/flowcontrol/stream_flow_controller_test.go @@ -173,6 +173,14 @@ var _ = Describe("Stream Flow controller", func() { oldWindowSize = controller.receiveWindowSize }) + It("tells if it has window updates", func() { + Expect(controller.HasWindowUpdate()).To(BeFalse()) + controller.AddBytesRead(30) + Expect(controller.HasWindowUpdate()).To(BeTrue()) + Expect(controller.GetWindowUpdate()).ToNot(BeZero()) + Expect(controller.HasWindowUpdate()).To(BeFalse()) + }) + It("tells the connection flow controller when the window was autotuned", func() { oldOffset := controller.bytesRead controller.contributesToConnection = true @@ -200,9 +208,10 @@ var _ = Describe("Stream Flow controller", func() { }) It("doesn't increase the window after a final offset was already received", func() { - controller.AddBytesRead(80) + controller.AddBytesRead(30) err := controller.UpdateHighestReceived(90, true) Expect(err).ToNot(HaveOccurred()) + Expect(controller.HasWindowUpdate()).To(BeFalse()) offset := controller.GetWindowUpdate() Expect(offset).To(BeZero()) }) diff --git a/internal/mocks/stream_flow_controller.go b/internal/mocks/stream_flow_controller.go index bbecadc99..d3450d723 100644 --- a/internal/mocks/stream_flow_controller.go +++ b/internal/mocks/stream_flow_controller.go @@ -66,6 +66,18 @@ func (mr *MockStreamFlowControllerMockRecorder) GetWindowUpdate() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).GetWindowUpdate)) } +// HasWindowUpdate mocks base method +func (m *MockStreamFlowController) HasWindowUpdate() bool { + ret := m.ctrl.Call(m, "HasWindowUpdate") + ret0, _ := ret[0].(bool) + return ret0 +} + +// HasWindowUpdate indicates an expected call of HasWindowUpdate +func (mr *MockStreamFlowControllerMockRecorder) HasWindowUpdate() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).HasWindowUpdate)) +} + // IsNewlyBlocked mocks base method func (m *MockStreamFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) { ret := m.ctrl.Call(m, "IsNewlyBlocked")