diff --git a/flowcontrol/flow_control_manager.go b/flowcontrol/flow_control_manager.go index 3e5a7113..2574888b 100644 --- a/flowcontrol/flow_control_manager.go +++ b/flowcontrol/flow_control_manager.go @@ -144,12 +144,27 @@ func (f *flowControlManager) AddBytesRead(streamID protocol.StreamID, n protocol func (f *flowControlManager) GetWindowUpdates() (res []WindowUpdate) { f.mutex.Lock() defer f.mutex.Unlock() + connFlowController := f.streamFlowController[0] + + // get WindowUpdates for streams for id, fc := range f.streamFlowController { - if necessary, offset := fc.MaybeUpdateWindow(); necessary { + if id == 0 { // connection-level updates are dealt with later + continue + } + if necessary, newIncrement, offset := fc.MaybeUpdateWindow(); necessary { res = append(res, WindowUpdate{StreamID: id, Offset: offset}) + contributes, _ := f.contributesToConnectionFlowControl[id] + if contributes && newIncrement != 0 { + connFlowController.EnsureMinimumWindowIncrement(protocol.ByteCount(float64(newIncrement) * protocol.ConnectionFlowControlMultiplier)) + } } } - return res + // get a WindowUpdate for the connection + if necessary, _, offset := connFlowController.MaybeUpdateWindow(); necessary { + res = append(res, WindowUpdate{StreamID: 0, Offset: offset}) + } + + return } func (f *flowControlManager) GetReceiveWindow(streamID protocol.StreamID) (protocol.ByteCount, error) { diff --git a/flowcontrol/flow_control_manager_test.go b/flowcontrol/flow_control_manager_test.go index 03909653..420b92de 100644 --- a/flowcontrol/flow_control_manager_test.go +++ b/flowcontrol/flow_control_manager_test.go @@ -1,6 +1,8 @@ package flowcontrol import ( + "time" + "github.com/lucas-clemente/quic-go/congestion" "github.com/lucas-clemente/quic-go/handshake" "github.com/lucas-clemente/quic-go/protocol" @@ -15,8 +17,10 @@ var _ = Describe("Flow Control Manager", func() { BeforeEach(func() { cpm = &mockConnectionParametersManager{ - receiveStreamFlowControlWindow: 100, - receiveConnectionFlowControlWindow: 200, + receiveStreamFlowControlWindow: 100, + receiveConnectionFlowControlWindow: 200, + maxReceiveStreamFlowControlWindow: 9999999, + maxReceiveConnectionFlowControlWindow: 9999999, } fcm = NewFlowControlManager(cpm, &congestion.RTTStats{}).(*flowControlManager) }) @@ -112,6 +116,14 @@ var _ = Describe("Flow Control Manager", func() { }) Context("window updates", func() { + // update the congestion such that it returns a given value for the smoothed RTT + setRtt := func(t time.Duration) { + for _, controller := range fcm.streamFlowController { + controller.rttStats.UpdateRTT(t, 0, time.Now()) + Expect(controller.rttStats.SmoothedRTT()).To(Equal(t)) // make sure it worked + } + } + It("gets stream level window updates", func() { err := fcm.UpdateHighestReceived(4, 100) Expect(err).ToNot(HaveOccurred()) @@ -119,8 +131,7 @@ var _ = Describe("Flow Control Manager", func() { Expect(err).ToNot(HaveOccurred()) updates := fcm.GetWindowUpdates() Expect(updates).To(HaveLen(1)) - Expect(updates[0].StreamID).To(Equal(protocol.StreamID(4))) - Expect(updates[0].Offset).To(Equal(protocol.ByteCount(190))) + Expect(updates[0]).To(Equal(WindowUpdate{StreamID: 4, Offset: 190})) }) It("gets connection level window updates", func() { @@ -141,6 +152,33 @@ var _ = Describe("Flow Control Manager", func() { err := fcm.AddBytesRead(17, 1000) Expect(err).To(MatchError(errMapAccess)) }) + + It("increases the connection-level window, when a stream window was increased by autotuning", func() { + setRtt(10 * time.Millisecond) + fcm.streamFlowController[4].lastWindowUpdateTime = time.Now().Add(-1 * time.Millisecond) + err := fcm.UpdateHighestReceived(4, 100) + Expect(err).ToNot(HaveOccurred()) + err = fcm.AddBytesRead(4, 90) + Expect(err).ToNot(HaveOccurred()) + updates := fcm.GetWindowUpdates() + Expect(updates).To(HaveLen(2)) + connLevelIncrement := protocol.ByteCount(protocol.ConnectionFlowControlMultiplier * 200) // 300 + Expect(updates).To(ContainElement(WindowUpdate{StreamID: 4, Offset: 290})) + Expect(updates).To(ContainElement(WindowUpdate{StreamID: 0, Offset: 90 + connLevelIncrement})) + }) + + It("doesn't increase the connection-level window, when a non-contributing stream window was increased by autotuning", func() { + setRtt(10 * time.Millisecond) + fcm.streamFlowController[1].lastWindowUpdateTime = time.Now().Add(-1 * time.Millisecond) + err := fcm.UpdateHighestReceived(1, 100) + Expect(err).ToNot(HaveOccurred()) + err = fcm.AddBytesRead(1, 90) + Expect(err).ToNot(HaveOccurred()) + updates := fcm.GetWindowUpdates() + Expect(updates).To(HaveLen(1)) + Expect(updates).To(ContainElement(WindowUpdate{StreamID: 1, Offset: 290})) + // the only window update is for stream 1, thus there's no connection-level window update + }) }) }) diff --git a/flowcontrol/flow_controller.go b/flowcontrol/flow_controller.go index 0a83cc56..247bf097 100644 --- a/flowcontrol/flow_controller.go +++ b/flowcontrol/flow_controller.go @@ -116,20 +116,28 @@ func (c *flowController) AddBytesRead(n protocol.ByteCount) { c.bytesRead += n } -// MaybeUpdateWindow determines if it is necessary to send a WindowUpdate -// if so, it returns true and the offset of the window -func (c *flowController) MaybeUpdateWindow() (bool, protocol.ByteCount) { +// MaybeUpdateWindow updates the receive window, if necessary +// if the receive window increment is changed, the new value is returned, otherwise a 0 +// the last return value is the new offset of the receive window +func (c *flowController) MaybeUpdateWindow() (bool, protocol.ByteCount /* new increment */, protocol.ByteCount /* new offset */) { diff := c.receiveWindow - c.bytesRead // Chromium implements the same threshold if diff < (c.receiveWindowIncrement / 2) { + var newWindowIncrement protocol.ByteCount + oldWindowIncrement := c.receiveWindowIncrement + c.maybeAdjustWindowIncrement() + if c.receiveWindowIncrement != oldWindowIncrement { + newWindowIncrement = c.receiveWindowIncrement + } + c.lastWindowUpdateTime = time.Now() c.receiveWindow = c.bytesRead + c.receiveWindowIncrement - return true, c.receiveWindow + return true, newWindowIncrement, c.receiveWindow } - return false, 0 + return false, 0, 0 } // maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending WindowUpdates too often @@ -164,6 +172,15 @@ func (c *flowController) maybeAdjustWindowIncrement() { } } +// EnsureMinimumWindowIncrement sets a minimum window increment +// it is intended be used for the connection-level flow controller +// it should make sure that the connection-level window is increased when a stream-level window grows +func (c *flowController) EnsureMinimumWindowIncrement(inc protocol.ByteCount) { + if inc > c.receiveWindowIncrement { + c.receiveWindowIncrement = utils.MinByteCount(inc, c.maxReceiveWindowIncrement) + } +} + func (c *flowController) CheckFlowControlViolation() bool { if c.highestReceived > c.receiveWindow { return true diff --git a/flowcontrol/flow_controller_test.go b/flowcontrol/flow_controller_test.go index 72f6b749..47c3ab14 100644 --- a/flowcontrol/flow_controller_test.go +++ b/flowcontrol/flow_controller_test.go @@ -197,7 +197,7 @@ var _ = Describe("Flow controller", func() { controller.lastWindowUpdateTime = time.Now().Add(-time.Hour) readPosition := receiveWindow - receiveWindowIncrement/2 + 1 controller.bytesRead = readPosition - updateNecessary, offset := controller.MaybeUpdateWindow() + updateNecessary, _, offset := controller.MaybeUpdateWindow() Expect(updateNecessary).To(BeTrue()) Expect(offset).To(Equal(readPosition + receiveWindowIncrement)) Expect(controller.receiveWindow).To(Equal(readPosition + receiveWindowIncrement)) @@ -209,7 +209,7 @@ var _ = Describe("Flow controller", func() { controller.lastWindowUpdateTime = lastWindowUpdateTime readPosition := receiveWindow - receiveWindow/2 - 1 controller.bytesRead = readPosition - updateNecessary, _ := controller.MaybeUpdateWindow() + updateNecessary, _, _ := controller.MaybeUpdateWindow() Expect(updateNecessary).To(BeFalse()) Expect(controller.lastWindowUpdateTime).To(Equal(lastWindowUpdateTime)) }) @@ -306,6 +306,46 @@ var _ = Describe("Flow controller", func() { controller.maybeAdjustWindowIncrement() Expect(controller.receiveWindowIncrement).To(Equal(controller.maxReceiveWindowIncrement)) // 3000 }) + + It("returns the new increment when updating the window", func() { + setRtt(10 * time.Millisecond) + controller.bytesRead = 9900 // receive window is 10000 + controller.lastWindowUpdateTime = time.Now().Add(-19 * time.Millisecond) + necessary, newIncrement, offset := controller.MaybeUpdateWindow() + Expect(necessary).To(BeTrue()) + Expect(newIncrement).To(Equal(2 * oldIncrement)) + Expect(controller.receiveWindowIncrement).To(Equal(newIncrement)) + Expect(offset).To(Equal(protocol.ByteCount(9900 + newIncrement))) + }) + + It("only returns the increment if it was increased", func() { + setRtt(10 * time.Millisecond) + controller.bytesRead = 9900 // receive window is 10000 + controller.lastWindowUpdateTime = time.Now().Add(-21 * time.Millisecond) + necessary, newIncrement, offset := controller.MaybeUpdateWindow() + Expect(necessary).To(BeTrue()) + Expect(newIncrement).To(BeZero()) + Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement)) + Expect(offset).To(Equal(protocol.ByteCount(9900 + oldIncrement))) + }) + + Context("setting the minimum increment", func() { + It("sets the minimum window increment", func() { + controller.EnsureMinimumWindowIncrement(1000) + Expect(controller.receiveWindowIncrement).To(Equal(protocol.ByteCount(1000))) + }) + + It("doesn't reduce the window increment", func() { + controller.EnsureMinimumWindowIncrement(1) + Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement)) + }) + + It("doens't increase the increment beyong the maxReceiveWindowIncrement", func() { + max := controller.maxReceiveWindowIncrement + controller.EnsureMinimumWindowIncrement(2 * max) + Expect(controller.receiveWindowIncrement).To(Equal(max)) + }) + }) }) }) }) diff --git a/protocol/server_parameters.go b/protocol/server_parameters.go index 11981823..d7173045 100644 --- a/protocol/server_parameters.go +++ b/protocol/server_parameters.go @@ -40,6 +40,10 @@ const MaxReceiveStreamFlowControlWindowClient ByteCount = 6 * (1 << 20) // 6 MB // This is the value that Google servers are using const MaxReceiveConnectionFlowControlWindowClient ByteCount = 15 * (1 << 20) // 15 MB +// ConnectionFlowControlMultiplier determines how much larger the connection flow control windows needs to be relative to any stream's flow control window +// This is the value that Chromium is using +const ConnectionFlowControlMultiplier = 1.5 + // MaxStreamsPerConnection is the maximum value accepted for the number of streams per connection const MaxStreamsPerConnection = 100