From f7526b9883830944fcd18cd8071a031232fda9b9 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 21 Dec 2017 15:29:47 +0700 Subject: [PATCH] rewrite flow control auto-tuning --- internal/flowcontrol/base_flow_controller.go | 31 +++--- .../flowcontrol/base_flow_controller_test.go | 101 +++++++++++------- .../flowcontrol/connection_flow_controller.go | 3 +- .../connection_flow_controller_test.go | 38 ++++--- .../stream_flow_controller_test.go | 23 ++-- 5 files changed, 118 insertions(+), 78 deletions(-) diff --git a/internal/flowcontrol/base_flow_controller.go b/internal/flowcontrol/base_flow_controller.go index fac9e194..cfdda698 100644 --- a/internal/flowcontrol/base_flow_controller.go +++ b/internal/flowcontrol/base_flow_controller.go @@ -22,8 +22,10 @@ type baseFlowController struct { receiveWindow protocol.ByteCount receiveWindowSize protocol.ByteCount maxReceiveWindowSize protocol.ByteCount - lastWindowUpdateTime time.Time - rttStats *congestion.RTTStats + + epochStartTime time.Time + epochStartOffset protocol.ByteCount + rttStats *congestion.RTTStats } func (c *baseFlowController) AddBytesSent(n protocol.ByteCount) { @@ -53,7 +55,7 @@ func (c *baseFlowController) AddBytesRead(n protocol.ByteCount) { // pretend we sent a WindowUpdate when reading the first byte // this way auto-tuning of the window size already works for the first WindowUpdate if c.bytesRead == 0 { - c.lastWindowUpdateTime = time.Now() + c.startNewAutoTuningEpoch() } c.bytesRead += n } @@ -69,7 +71,6 @@ func (c *baseFlowController) getWindowUpdate() protocol.ByteCount { c.maybeAdjustWindowSize() c.receiveWindow = c.bytesRead + c.receiveWindowSize - c.lastWindowUpdateTime = time.Now() return c.receiveWindow } @@ -85,23 +86,29 @@ func (c *baseFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) { } // maybeAdjustWindowSize increases the receiveWindowSize if we're sending updates too often. -// For details about auto-tuning, see https://docs.google.com/document/d/1F2YfdDXKpy20WVKJueEf4abn_LVZHhMUMS5gX6Pgjl4/edit#heading=h.hcm2y5x4qmqt. +// For details about auto-tuning, see https://docs.google.com/document/d/1SExkMmGiz8VYzV3s9E35JQlJ73vhzCekKkDi85F1qCE/edit?usp=sharing. func (c *baseFlowController) maybeAdjustWindowSize() { - if c.lastWindowUpdateTime.IsZero() { + bytesReadInEpoch := c.bytesRead - c.epochStartOffset + // don't do anything if less than half the window has been consumed + if bytesReadInEpoch <= c.receiveWindowSize/2 { return } - rtt := c.rttStats.SmoothedRTT() if rtt == 0 { return } - timeSinceLastWindowUpdate := time.Since(c.lastWindowUpdateTime) - // interval between the updates is sufficiently large, no need to increase the window size - if timeSinceLastWindowUpdate >= 4*protocol.WindowUpdateThreshold*rtt { - return + fraction := float64(bytesReadInEpoch) / float64(c.receiveWindowSize) + if time.Since(c.epochStartTime) < time.Duration(4*fraction*float64(rtt)) { + // window is consumed too fast, try to increase the window size + c.receiveWindowSize = utils.MinByteCount(2*c.receiveWindowSize, c.maxReceiveWindowSize) } - c.receiveWindowSize = utils.MinByteCount(2*c.receiveWindowSize, c.maxReceiveWindowSize) + c.startNewAutoTuningEpoch() +} + +func (c *baseFlowController) startNewAutoTuningEpoch() { + c.epochStartTime = time.Now() + c.epochStartOffset = c.bytesRead } func (c *baseFlowController) checkFlowControlViolation() bool { diff --git a/internal/flowcontrol/base_flow_controller_test.go b/internal/flowcontrol/base_flow_controller_test.go index 31b64fc9..0668f627 100644 --- a/internal/flowcontrol/base_flow_controller_test.go +++ b/internal/flowcontrol/base_flow_controller_test.go @@ -77,10 +77,11 @@ var _ = Describe("Base Flow controller", func() { Context("receive flow control", func() { var ( receiveWindow protocol.ByteCount = 10000 - receiveWindowSize protocol.ByteCount = 600 + receiveWindowSize protocol.ByteCount = 1000 ) BeforeEach(func() { + controller.bytesRead = receiveWindow - receiveWindowSize controller.receiveWindow = receiveWindow controller.receiveWindowSize = receiveWindowSize }) @@ -92,7 +93,6 @@ var _ = Describe("Base Flow controller", func() { }) It("triggers a window update when necessary", func() { - controller.lastWindowUpdateTime = time.Now().Add(-time.Hour) bytesConsumed := float64(receiveWindowSize)*protocol.WindowUpdateThreshold + 1 // consumed 1 byte more than the threshold bytesRemaining := receiveWindowSize - protocol.ByteCount(bytesConsumed) readPosition := receiveWindow - bytesRemaining @@ -100,19 +100,15 @@ var _ = Describe("Base Flow controller", func() { offset := controller.getWindowUpdate() Expect(offset).To(Equal(readPosition + receiveWindowSize)) Expect(controller.receiveWindow).To(Equal(readPosition + receiveWindowSize)) - Expect(controller.lastWindowUpdateTime).To(BeTemporally("~", time.Now(), 20*time.Millisecond)) }) It("doesn't trigger a window update when not necessary", func() { - lastWindowUpdateTime := time.Now().Add(-time.Hour) - controller.lastWindowUpdateTime = lastWindowUpdateTime bytesConsumed := float64(receiveWindowSize)*protocol.WindowUpdateThreshold - 1 // consumed 1 byte less than the threshold bytesRemaining := receiveWindowSize - protocol.ByteCount(bytesConsumed) readPosition := receiveWindow - bytesRemaining controller.bytesRead = readPosition offset := controller.getWindowUpdate() Expect(offset).To(BeZero()) - Expect(controller.lastWindowUpdateTime).To(Equal(lastWindowUpdateTime)) }) Context("receive window size auto-tuning", func() { @@ -120,7 +116,7 @@ var _ = Describe("Base Flow controller", func() { BeforeEach(func() { oldWindowSize = controller.receiveWindowSize - controller.maxReceiveWindowSize = 3000 + controller.maxReceiveWindowSize = 5000 }) // update the congestion such that it returns a given value for the smoothed RTT @@ -136,62 +132,91 @@ var _ = Describe("Base Flow controller", func() { It("doesn't increase the window size when no RTT estimate is available", func() { setRtt(0) - controller.lastWindowUpdateTime = time.Now() - controller.maybeAdjustWindowSize() + controller.startNewAutoTuningEpoch() + controller.AddBytesRead(400) + offset := controller.getWindowUpdate() + Expect(offset).ToNot(BeZero()) // make sure a window update is sent Expect(controller.receiveWindowSize).To(Equal(oldWindowSize)) }) - It("increases the window size when the last WindowUpdate was sent less than (4 * threshold) RTTs ago", func() { + It("increases the window size if read so fast that the window would be consumed in less than 4 RTTs", func() { + bytesRead := controller.bytesRead rtt := 20 * time.Millisecond setRtt(rtt) - controller.AddBytesRead(9900) // receive window is 10000 - controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond) + // consume more than 2/3 of the window... + dataRead := receiveWindowSize*2/3 + 1 + // ... in 4*2/3 of the RTT + controller.epochStartOffset = controller.bytesRead + controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3) + controller.AddBytesRead(dataRead) offset := controller.getWindowUpdate() Expect(offset).ToNot(BeZero()) // check that the window size was increased newWindowSize := controller.receiveWindowSize Expect(newWindowSize).To(Equal(2 * oldWindowSize)) // check that the new window size was used to increase the offset - Expect(offset).To(Equal(protocol.ByteCount(9900 + newWindowSize))) + Expect(offset).To(Equal(protocol.ByteCount(bytesRead + dataRead + newWindowSize))) }) - It("doesn't increase the increase window size when the last WindowUpdate was sent more than (4 * threshold) RTTs ago", func() { + It("doesn't increase the window size if data is read so fast that the window would be consumed in less than 4 RTTs, but less than half the window has been read", func() { + // this test only makes sense if a window update is triggered before half of the window has been consumed + Expect(protocol.WindowUpdateThreshold).To(BeNumerically(">", 1/3)) + bytesRead := controller.bytesRead rtt := 20 * time.Millisecond setRtt(rtt) - controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt - time.Millisecond) - controller.maybeAdjustWindowSize() + // consume more than 2/3 of the window... + dataRead := receiveWindowSize*1/3 + 1 + // ... in 4*2/3 of the RTT + controller.epochStartOffset = controller.bytesRead + controller.epochStartTime = time.Now().Add(-rtt * 4 * 1 / 3) + controller.AddBytesRead(dataRead) + offset := controller.getWindowUpdate() + Expect(offset).ToNot(BeZero()) + // check that the window size was not increased + newWindowSize := controller.receiveWindowSize + Expect(newWindowSize).To(Equal(oldWindowSize)) + // check that the new window size was used to increase the offset + Expect(offset).To(Equal(protocol.ByteCount(bytesRead + dataRead + newWindowSize))) + }) + + It("doesn't increase the window size if read too slowly", func() { + bytesRead := controller.bytesRead + rtt := 20 * time.Millisecond + setRtt(rtt) + // consume less than 2/3 of the window... + dataRead := receiveWindowSize*2/3 - 1 + // ... in 4*2/3 of the RTT + controller.epochStartOffset = controller.bytesRead + controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3) + controller.AddBytesRead(dataRead) + offset := controller.getWindowUpdate() + Expect(offset).ToNot(BeZero()) + // check that the window size was not increased Expect(controller.receiveWindowSize).To(Equal(oldWindowSize)) + // check that the new window size was used to increase the offset + Expect(offset).To(Equal(protocol.ByteCount(bytesRead + dataRead + oldWindowSize))) }) It("doesn't increase the window size to a value higher than the maxReceiveWindowSize", func() { + resetEpoch := func() { + // make sure the next call to maybeAdjustWindowSize will increase the window + controller.epochStartTime = time.Now().Add(-time.Millisecond) + controller.epochStartOffset = controller.bytesRead + controller.AddBytesRead(controller.receiveWindowSize/2 + 1) + } setRtt(20 * time.Millisecond) - controller.lastWindowUpdateTime = time.Now().Add(-time.Millisecond) + resetEpoch() controller.maybeAdjustWindowSize() - Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) // 1200 + Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) // 2000 // because the lastWindowUpdateTime is updated by MaybeTriggerWindowUpdate(), we can just call maybeAdjustWindowSize() multiple times and get an increase of the window size every time + resetEpoch() controller.maybeAdjustWindowSize() - Expect(controller.receiveWindowSize).To(Equal(2 * 2 * oldWindowSize)) // 2400 + Expect(controller.receiveWindowSize).To(Equal(2 * 2 * oldWindowSize)) // 4000 + resetEpoch() controller.maybeAdjustWindowSize() - Expect(controller.receiveWindowSize).To(Equal(controller.maxReceiveWindowSize)) // 3000 + Expect(controller.receiveWindowSize).To(Equal(controller.maxReceiveWindowSize)) // 5000 controller.maybeAdjustWindowSize() - Expect(controller.receiveWindowSize).To(Equal(controller.maxReceiveWindowSize)) // 3000 - }) - - It("increases the window size sent in the first WindowUpdate, if data is read fast enough", func() { - setRtt(20 * time.Millisecond) - controller.AddBytesRead(9900) - offset := controller.getWindowUpdate() - Expect(offset).ToNot(BeZero()) - Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) - }) - - It("doesn't increase the window size sent in the first WindowUpdate, if data is read slowly", func() { - setRtt(5 * time.Millisecond) - controller.AddBytesRead(9900) - time.Sleep(15 * time.Millisecond) // more than 2x RTT - offset := controller.getWindowUpdate() - Expect(offset).ToNot(BeZero()) - Expect(controller.receiveWindowSize).To(Equal(oldWindowSize)) + Expect(controller.receiveWindowSize).To(Equal(controller.maxReceiveWindowSize)) // 5000 }) }) }) diff --git a/internal/flowcontrol/connection_flow_controller.go b/internal/flowcontrol/connection_flow_controller.go index a4e1ea03..c6f01a58 100644 --- a/internal/flowcontrol/connection_flow_controller.go +++ b/internal/flowcontrol/connection_flow_controller.go @@ -2,7 +2,6 @@ package flowcontrol import ( "fmt" - "time" "github.com/lucas-clemente/quic-go/congestion" "github.com/lucas-clemente/quic-go/internal/protocol" @@ -66,7 +65,7 @@ func (c *connectionFlowController) EnsureMinimumWindowSize(inc protocol.ByteCoun c.mutex.Lock() if inc > c.receiveWindowSize { c.receiveWindowSize = utils.MinByteCount(inc, c.maxReceiveWindowSize) - c.lastWindowUpdateTime = time.Time{} // disables autotuning for the next window update + c.startNewAutoTuningEpoch() } c.mutex.Unlock() } diff --git a/internal/flowcontrol/connection_flow_controller_test.go b/internal/flowcontrol/connection_flow_controller_test.go index a130cfdc..d3941a81 100644 --- a/internal/flowcontrol/connection_flow_controller_test.go +++ b/internal/flowcontrol/connection_flow_controller_test.go @@ -48,21 +48,31 @@ var _ = Describe("Connection Flow controller", func() { controller.receiveWindow = 100 controller.receiveWindowSize = 60 controller.maxReceiveWindowSize = 1000 + controller.bytesRead = 100 - 60 }) It("gets a window update", func() { - controller.AddBytesRead(80) + windowSize := controller.receiveWindowSize + oldOffset := controller.bytesRead + dataRead := windowSize/2 - 1 // make sure not to trigger auto-tuning + controller.AddBytesRead(dataRead) offset := controller.GetWindowUpdate() - Expect(offset).To(Equal(protocol.ByteCount(80 + 60))) + Expect(offset).To(Equal(protocol.ByteCount(oldOffset + dataRead + 60))) }) It("autotunes the window", func() { - controller.AddBytesRead(80) + oldOffset := controller.bytesRead + oldWindowSize := controller.receiveWindowSize rtt := 20 * time.Millisecond setRtt(rtt) - controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond) + controller.epochStartTime = time.Now().Add(-time.Millisecond) + controller.epochStartOffset = oldOffset + dataRead := oldWindowSize/2 + 1 + controller.AddBytesRead(dataRead) offset := controller.GetWindowUpdate() - Expect(offset).To(Equal(protocol.ByteCount(80 + 2*60))) + newWindowSize := controller.receiveWindowSize + Expect(newWindowSize).To(Equal(2 * oldWindowSize)) + Expect(offset).To(Equal(protocol.ByteCount(oldOffset + dataRead + newWindowSize))) }) }) }) @@ -71,10 +81,11 @@ var _ = Describe("Connection Flow controller", func() { var ( oldWindowSize protocol.ByteCount receiveWindow protocol.ByteCount = 10000 - receiveWindowSize protocol.ByteCount = 600 + receiveWindowSize protocol.ByteCount = 1000 ) BeforeEach(func() { + controller.bytesRead = receiveWindowSize - receiveWindowSize controller.receiveWindow = receiveWindow controller.receiveWindowSize = receiveWindowSize oldWindowSize = controller.receiveWindowSize @@ -82,8 +93,8 @@ var _ = Describe("Connection Flow controller", func() { }) It("sets the minimum window window size", func() { - controller.EnsureMinimumWindowSize(1000) - Expect(controller.receiveWindowSize).To(Equal(protocol.ByteCount(1000))) + controller.EnsureMinimumWindowSize(1800) + Expect(controller.receiveWindowSize).To(Equal(protocol.ByteCount(1800))) }) It("doesn't reduce the window window size", func() { @@ -97,14 +108,9 @@ var _ = Describe("Connection Flow controller", func() { Expect(controller.receiveWindowSize).To(Equal(max)) }) - It("doesn't auto-tune the window after the window size was increased", func() { - setRtt(20 * time.Millisecond) - controller.bytesRead = 9900 // receive window is 10000 - controller.lastWindowUpdateTime = time.Now().Add(-20 * time.Millisecond) - controller.EnsureMinimumWindowSize(912) - offset := controller.getWindowUpdate() - Expect(controller.receiveWindowSize).To(Equal(protocol.ByteCount(912))) // no auto-tuning - Expect(offset).To(Equal(protocol.ByteCount(9900 + 912))) + It("starts a new epoch after the window size was increased", func() { + controller.EnsureMinimumWindowSize(1912) + Expect(controller.epochStartTime).To(BeTemporally("~", time.Now(), 100*time.Millisecond)) }) }) }) diff --git a/internal/flowcontrol/stream_flow_controller_test.go b/internal/flowcontrol/stream_flow_controller_test.go index 6f67b5e5..cb413f10 100644 --- a/internal/flowcontrol/stream_flow_controller_test.go +++ b/internal/flowcontrol/stream_flow_controller_test.go @@ -168,32 +168,35 @@ var _ = Describe("Stream Flow controller", func() { BeforeEach(func() { controller.receiveWindow = 100 controller.receiveWindowSize = 60 + controller.bytesRead = 100 - 60 controller.connection.(*connectionFlowController).receiveWindowSize = 120 oldWindowSize = controller.receiveWindowSize }) It("tells the connection flow controller when the window was autotuned", func() { + oldOffset := controller.bytesRead controller.contributesToConnection = true - controller.AddBytesRead(75) - rtt := 20 * time.Millisecond - setRtt(rtt) - controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond) + setRtt(20 * time.Millisecond) + controller.epochStartOffset = oldOffset + controller.epochStartTime = time.Now().Add(-time.Millisecond) + controller.AddBytesRead(55) offset := controller.GetWindowUpdate() - Expect(offset).To(Equal(protocol.ByteCount(75 + 2*60))) + Expect(offset).To(Equal(protocol.ByteCount(oldOffset + 55 + 2*oldWindowSize))) Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) Expect(controller.connection.(*connectionFlowController).receiveWindowSize).To(Equal(protocol.ByteCount(float64(controller.receiveWindowSize) * protocol.ConnectionFlowControlMultiplier))) }) It("doesn't tell the connection flow controller if it doesn't contribute", func() { + oldOffset := controller.bytesRead controller.contributesToConnection = false - controller.AddBytesRead(75) - rtt := 20 * time.Millisecond - setRtt(rtt) - controller.lastWindowUpdateTime = time.Now().Add(-4*protocol.WindowUpdateThreshold*rtt + time.Millisecond) + setRtt(20 * time.Millisecond) + controller.epochStartOffset = oldOffset + controller.epochStartTime = time.Now().Add(-time.Millisecond) + controller.AddBytesRead(55) offset := controller.GetWindowUpdate() Expect(offset).ToNot(BeZero()) Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) - Expect(controller.connection.(*connectionFlowController).receiveWindowSize).To(Equal(protocol.ByteCount(120))) // unchanged + Expect(controller.connection.(*connectionFlowController).receiveWindowSize).To(Equal(protocol.ByteCount(2 * oldWindowSize))) // unchanged }) It("doesn't increase the window after a final offset was already received", func() {