ensure minimum size of connection-level flow control window

fixes #409
This commit is contained in:
Marten Seemann
2017-02-08 11:23:37 +07:00
parent 84bda1a9f4
commit db11b25790
5 changed files with 127 additions and 13 deletions

View File

@@ -144,12 +144,27 @@ func (f *flowControlManager) AddBytesRead(streamID protocol.StreamID, n protocol
func (f *flowControlManager) GetWindowUpdates() (res []WindowUpdate) {
f.mutex.Lock()
defer f.mutex.Unlock()
connFlowController := f.streamFlowController[0]
// get WindowUpdates for streams
for id, fc := range f.streamFlowController {
if necessary, offset := fc.MaybeUpdateWindow(); necessary {
if id == 0 { // connection-level updates are dealt with later
continue
}
if necessary, newIncrement, offset := fc.MaybeUpdateWindow(); necessary {
res = append(res, WindowUpdate{StreamID: id, Offset: offset})
contributes, _ := f.contributesToConnectionFlowControl[id]
if contributes && newIncrement != 0 {
connFlowController.EnsureMinimumWindowIncrement(protocol.ByteCount(float64(newIncrement) * protocol.ConnectionFlowControlMultiplier))
}
}
}
return res
// get a WindowUpdate for the connection
if necessary, _, offset := connFlowController.MaybeUpdateWindow(); necessary {
res = append(res, WindowUpdate{StreamID: 0, Offset: offset})
}
return
}
func (f *flowControlManager) GetReceiveWindow(streamID protocol.StreamID) (protocol.ByteCount, error) {

View File

@@ -1,6 +1,8 @@
package flowcontrol
import (
"time"
"github.com/lucas-clemente/quic-go/congestion"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
@@ -15,8 +17,10 @@ var _ = Describe("Flow Control Manager", func() {
BeforeEach(func() {
cpm = &mockConnectionParametersManager{
receiveStreamFlowControlWindow: 100,
receiveConnectionFlowControlWindow: 200,
receiveStreamFlowControlWindow: 100,
receiveConnectionFlowControlWindow: 200,
maxReceiveStreamFlowControlWindow: 9999999,
maxReceiveConnectionFlowControlWindow: 9999999,
}
fcm = NewFlowControlManager(cpm, &congestion.RTTStats{}).(*flowControlManager)
})
@@ -112,6 +116,14 @@ var _ = Describe("Flow Control Manager", func() {
})
Context("window updates", func() {
// update the congestion such that it returns a given value for the smoothed RTT
setRtt := func(t time.Duration) {
for _, controller := range fcm.streamFlowController {
controller.rttStats.UpdateRTT(t, 0, time.Now())
Expect(controller.rttStats.SmoothedRTT()).To(Equal(t)) // make sure it worked
}
}
It("gets stream level window updates", func() {
err := fcm.UpdateHighestReceived(4, 100)
Expect(err).ToNot(HaveOccurred())
@@ -119,8 +131,7 @@ var _ = Describe("Flow Control Manager", func() {
Expect(err).ToNot(HaveOccurred())
updates := fcm.GetWindowUpdates()
Expect(updates).To(HaveLen(1))
Expect(updates[0].StreamID).To(Equal(protocol.StreamID(4)))
Expect(updates[0].Offset).To(Equal(protocol.ByteCount(190)))
Expect(updates[0]).To(Equal(WindowUpdate{StreamID: 4, Offset: 190}))
})
It("gets connection level window updates", func() {
@@ -141,6 +152,33 @@ var _ = Describe("Flow Control Manager", func() {
err := fcm.AddBytesRead(17, 1000)
Expect(err).To(MatchError(errMapAccess))
})
It("increases the connection-level window, when a stream window was increased by autotuning", func() {
setRtt(10 * time.Millisecond)
fcm.streamFlowController[4].lastWindowUpdateTime = time.Now().Add(-1 * time.Millisecond)
err := fcm.UpdateHighestReceived(4, 100)
Expect(err).ToNot(HaveOccurred())
err = fcm.AddBytesRead(4, 90)
Expect(err).ToNot(HaveOccurred())
updates := fcm.GetWindowUpdates()
Expect(updates).To(HaveLen(2))
connLevelIncrement := protocol.ByteCount(protocol.ConnectionFlowControlMultiplier * 200) // 300
Expect(updates).To(ContainElement(WindowUpdate{StreamID: 4, Offset: 290}))
Expect(updates).To(ContainElement(WindowUpdate{StreamID: 0, Offset: 90 + connLevelIncrement}))
})
It("doesn't increase the connection-level window, when a non-contributing stream window was increased by autotuning", func() {
setRtt(10 * time.Millisecond)
fcm.streamFlowController[1].lastWindowUpdateTime = time.Now().Add(-1 * time.Millisecond)
err := fcm.UpdateHighestReceived(1, 100)
Expect(err).ToNot(HaveOccurred())
err = fcm.AddBytesRead(1, 90)
Expect(err).ToNot(HaveOccurred())
updates := fcm.GetWindowUpdates()
Expect(updates).To(HaveLen(1))
Expect(updates).To(ContainElement(WindowUpdate{StreamID: 1, Offset: 290}))
// the only window update is for stream 1, thus there's no connection-level window update
})
})
})

View File

@@ -116,20 +116,28 @@ func (c *flowController) AddBytesRead(n protocol.ByteCount) {
c.bytesRead += n
}
// MaybeUpdateWindow determines if it is necessary to send a WindowUpdate
// if so, it returns true and the offset of the window
func (c *flowController) MaybeUpdateWindow() (bool, protocol.ByteCount) {
// MaybeUpdateWindow updates the receive window, if necessary
// if the receive window increment is changed, the new value is returned, otherwise a 0
// the last return value is the new offset of the receive window
func (c *flowController) MaybeUpdateWindow() (bool, protocol.ByteCount /* new increment */, protocol.ByteCount /* new offset */) {
diff := c.receiveWindow - c.bytesRead
// Chromium implements the same threshold
if diff < (c.receiveWindowIncrement / 2) {
var newWindowIncrement protocol.ByteCount
oldWindowIncrement := c.receiveWindowIncrement
c.maybeAdjustWindowIncrement()
if c.receiveWindowIncrement != oldWindowIncrement {
newWindowIncrement = c.receiveWindowIncrement
}
c.lastWindowUpdateTime = time.Now()
c.receiveWindow = c.bytesRead + c.receiveWindowIncrement
return true, c.receiveWindow
return true, newWindowIncrement, c.receiveWindow
}
return false, 0
return false, 0, 0
}
// maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending WindowUpdates too often
@@ -164,6 +172,15 @@ func (c *flowController) maybeAdjustWindowIncrement() {
}
}
// EnsureMinimumWindowIncrement sets a minimum window increment
// it is intended be used for the connection-level flow controller
// it should make sure that the connection-level window is increased when a stream-level window grows
func (c *flowController) EnsureMinimumWindowIncrement(inc protocol.ByteCount) {
if inc > c.receiveWindowIncrement {
c.receiveWindowIncrement = utils.MinByteCount(inc, c.maxReceiveWindowIncrement)
}
}
func (c *flowController) CheckFlowControlViolation() bool {
if c.highestReceived > c.receiveWindow {
return true

View File

@@ -197,7 +197,7 @@ var _ = Describe("Flow controller", func() {
controller.lastWindowUpdateTime = time.Now().Add(-time.Hour)
readPosition := receiveWindow - receiveWindowIncrement/2 + 1
controller.bytesRead = readPosition
updateNecessary, offset := controller.MaybeUpdateWindow()
updateNecessary, _, offset := controller.MaybeUpdateWindow()
Expect(updateNecessary).To(BeTrue())
Expect(offset).To(Equal(readPosition + receiveWindowIncrement))
Expect(controller.receiveWindow).To(Equal(readPosition + receiveWindowIncrement))
@@ -209,7 +209,7 @@ var _ = Describe("Flow controller", func() {
controller.lastWindowUpdateTime = lastWindowUpdateTime
readPosition := receiveWindow - receiveWindow/2 - 1
controller.bytesRead = readPosition
updateNecessary, _ := controller.MaybeUpdateWindow()
updateNecessary, _, _ := controller.MaybeUpdateWindow()
Expect(updateNecessary).To(BeFalse())
Expect(controller.lastWindowUpdateTime).To(Equal(lastWindowUpdateTime))
})
@@ -306,6 +306,46 @@ var _ = Describe("Flow controller", func() {
controller.maybeAdjustWindowIncrement()
Expect(controller.receiveWindowIncrement).To(Equal(controller.maxReceiveWindowIncrement)) // 3000
})
It("returns the new increment when updating the window", func() {
setRtt(10 * time.Millisecond)
controller.bytesRead = 9900 // receive window is 10000
controller.lastWindowUpdateTime = time.Now().Add(-19 * time.Millisecond)
necessary, newIncrement, offset := controller.MaybeUpdateWindow()
Expect(necessary).To(BeTrue())
Expect(newIncrement).To(Equal(2 * oldIncrement))
Expect(controller.receiveWindowIncrement).To(Equal(newIncrement))
Expect(offset).To(Equal(protocol.ByteCount(9900 + newIncrement)))
})
It("only returns the increment if it was increased", func() {
setRtt(10 * time.Millisecond)
controller.bytesRead = 9900 // receive window is 10000
controller.lastWindowUpdateTime = time.Now().Add(-21 * time.Millisecond)
necessary, newIncrement, offset := controller.MaybeUpdateWindow()
Expect(necessary).To(BeTrue())
Expect(newIncrement).To(BeZero())
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
Expect(offset).To(Equal(protocol.ByteCount(9900 + oldIncrement)))
})
Context("setting the minimum increment", func() {
It("sets the minimum window increment", func() {
controller.EnsureMinimumWindowIncrement(1000)
Expect(controller.receiveWindowIncrement).To(Equal(protocol.ByteCount(1000)))
})
It("doesn't reduce the window increment", func() {
controller.EnsureMinimumWindowIncrement(1)
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
})
It("doens't increase the increment beyong the maxReceiveWindowIncrement", func() {
max := controller.maxReceiveWindowIncrement
controller.EnsureMinimumWindowIncrement(2 * max)
Expect(controller.receiveWindowIncrement).To(Equal(max))
})
})
})
})
})

View File

@@ -40,6 +40,10 @@ const MaxReceiveStreamFlowControlWindowClient ByteCount = 6 * (1 << 20) // 6 MB
// This is the value that Google servers are using
const MaxReceiveConnectionFlowControlWindowClient ByteCount = 15 * (1 << 20) // 15 MB
// ConnectionFlowControlMultiplier determines how much larger the connection flow control windows needs to be relative to any stream's flow control window
// This is the value that Chromium is using
const ConnectionFlowControlMultiplier = 1.5
// MaxStreamsPerConnection is the maximum value accepted for the number of streams per connection
const MaxStreamsPerConnection = 100