flowcontrol: fix timestamp used for receive window auto-tuning (#4735)

This commit is contained in:
Marten Seemann
2024-11-29 13:59:22 +08:00
committed by GitHub
parent a7795541c9
commit 36ed693996
4 changed files with 17 additions and 18 deletions

View File

@@ -77,19 +77,19 @@ func (c *baseFlowController) hasWindowUpdate() bool {
// getWindowUpdate updates the receive window, if necessary
// it returns the new offset
func (c *baseFlowController) getWindowUpdate() protocol.ByteCount {
func (c *baseFlowController) getWindowUpdate(now time.Time) protocol.ByteCount {
if !c.hasWindowUpdate() {
return 0
}
c.maybeAdjustWindowSize()
c.maybeAdjustWindowSize(now)
c.receiveWindow = c.bytesRead + c.receiveWindowSize
return c.receiveWindow
}
// maybeAdjustWindowSize increases the receiveWindowSize if we're sending updates too often.
// For details about auto-tuning, see https://docs.google.com/document/d/1SExkMmGiz8VYzV3s9E35JQlJ73vhzCekKkDi85F1qCE/edit?usp=sharing.
func (c *baseFlowController) maybeAdjustWindowSize() {
func (c *baseFlowController) maybeAdjustWindowSize(now time.Time) {
bytesReadInEpoch := c.bytesRead - c.epochStartOffset
// don't do anything if less than half the window has been consumed
if bytesReadInEpoch <= c.receiveWindowSize/2 {
@@ -101,7 +101,6 @@ func (c *baseFlowController) maybeAdjustWindowSize() {
}
fraction := float64(bytesReadInEpoch) / float64(c.receiveWindowSize)
now := time.Now()
if now.Sub(c.epochStartTime) < time.Duration(4*fraction*float64(rtt)) {
// window is consumed too fast, try to increase the window size
newSize := min(2*c.receiveWindowSize, c.maxReceiveWindowSize)

View File

@@ -112,7 +112,7 @@ var _ = Describe("Base Flow controller", func() {
bytesRemaining := receiveWindowSize - protocol.ByteCount(bytesConsumed)
readPosition := receiveWindow - bytesRemaining
controller.bytesRead = readPosition
offset := controller.getWindowUpdate()
offset := controller.getWindowUpdate(time.Now())
Expect(offset).To(Equal(readPosition + receiveWindowSize))
Expect(controller.receiveWindow).To(Equal(readPosition + receiveWindowSize))
})
@@ -122,7 +122,7 @@ var _ = Describe("Base Flow controller", func() {
bytesRemaining := receiveWindowSize - protocol.ByteCount(bytesConsumed)
readPosition := receiveWindow - bytesRemaining
controller.bytesRead = readPosition
offset := controller.getWindowUpdate()
offset := controller.getWindowUpdate(time.Now())
Expect(offset).To(BeZero())
})
@@ -141,7 +141,7 @@ var _ = Describe("Base Flow controller", func() {
}
It("doesn't increase the window size for a new stream", func() {
controller.maybeAdjustWindowSize()
controller.maybeAdjustWindowSize(time.Now())
Expect(controller.receiveWindowSize).To(Equal(oldWindowSize))
})
@@ -149,7 +149,7 @@ var _ = Describe("Base Flow controller", func() {
setRtt(0)
controller.startNewAutoTuningEpoch(time.Now())
controller.addBytesRead(400)
offset := controller.getWindowUpdate()
offset := controller.getWindowUpdate(time.Now())
Expect(offset).ToNot(BeZero()) // make sure a window update is sent
Expect(controller.receiveWindowSize).To(Equal(oldWindowSize))
})
@@ -164,7 +164,7 @@ var _ = Describe("Base Flow controller", func() {
controller.epochStartOffset = controller.bytesRead
controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3)
controller.addBytesRead(dataRead)
offset := controller.getWindowUpdate()
offset := controller.getWindowUpdate(time.Now())
Expect(offset).ToNot(BeZero())
// check that the window size was increased
newWindowSize := controller.receiveWindowSize
@@ -183,7 +183,7 @@ var _ = Describe("Base Flow controller", func() {
controller.epochStartOffset = controller.bytesRead
controller.epochStartTime = time.Now().Add(-rtt * 4 * 1 / 3)
controller.addBytesRead(dataRead)
offset := controller.getWindowUpdate()
offset := controller.getWindowUpdate(time.Now())
Expect(offset).ToNot(BeZero())
// check that the window size was not increased
newWindowSize := controller.receiveWindowSize
@@ -202,7 +202,7 @@ var _ = Describe("Base Flow controller", func() {
controller.epochStartOffset = controller.bytesRead
controller.epochStartTime = time.Now().Add(-rtt * 4 * 2 / 3)
controller.addBytesRead(dataRead)
offset := controller.getWindowUpdate()
offset := controller.getWindowUpdate(time.Now())
Expect(offset).ToNot(BeZero())
// check that the window size was not increased
Expect(controller.receiveWindowSize).To(Equal(oldWindowSize))
@@ -219,16 +219,16 @@ var _ = Describe("Base Flow controller", func() {
}
setRtt(scaleDuration(20 * time.Millisecond))
resetEpoch()
controller.maybeAdjustWindowSize()
controller.maybeAdjustWindowSize(time.Now())
Expect(controller.receiveWindowSize).To(Equal(2 * oldWindowSize)) // 2000
// because the lastWindowUpdateTime is updated by MaybeTriggerWindowUpdate(), we can just call maybeAdjustWindowSize() multiple times and get an increase of the window size every time
resetEpoch()
controller.maybeAdjustWindowSize()
controller.maybeAdjustWindowSize(time.Now())
Expect(controller.receiveWindowSize).To(Equal(2 * 2 * oldWindowSize)) // 4000
resetEpoch()
controller.maybeAdjustWindowSize()
controller.maybeAdjustWindowSize(time.Now())
Expect(controller.receiveWindowSize).To(Equal(controller.maxReceiveWindowSize)) // 5000
controller.maybeAdjustWindowSize()
controller.maybeAdjustWindowSize(time.Now())
Expect(controller.receiveWindowSize).To(Equal(controller.maxReceiveWindowSize)) // 5000
})
})

View File

@@ -67,10 +67,10 @@ func (c *connectionFlowController) AddBytesRead(n protocol.ByteCount) {
c.mutex.Unlock()
}
func (c *connectionFlowController) GetWindowUpdate(time.Time) protocol.ByteCount {
func (c *connectionFlowController) GetWindowUpdate(now time.Time) protocol.ByteCount {
c.mutex.Lock()
oldWindowSize := c.receiveWindowSize
offset := c.baseFlowController.getWindowUpdate()
offset := c.baseFlowController.getWindowUpdate(now)
if c.logger.Debug() && oldWindowSize < c.receiveWindowSize {
c.logger.Debugf("Increasing receive flow control window for the connection to %d kB", c.receiveWindowSize/(1<<10))
}

View File

@@ -144,7 +144,7 @@ func (c *streamFlowController) GetWindowUpdate(now time.Time) protocol.ByteCount
// Don't use defer for unlocking the mutex here, GetWindowUpdate() is called frequently and defer shows up in the profiler
c.mutex.Lock()
oldWindowSize := c.receiveWindowSize
offset := c.baseFlowController.getWindowUpdate()
offset := c.baseFlowController.getWindowUpdate(now)
if c.receiveWindowSize > oldWindowSize { // auto-tuning enlarged the window size
c.logger.Debugf("Increasing receive flow control window for stream %d to %d kB", c.streamID, c.receiveWindowSize/(1<<10))
c.connection.EnsureMinimumWindowSize(protocol.ByteCount(float64(c.receiveWindowSize)*protocol.ConnectionFlowControlMultiplier), now)