forked from quic-go/quic-go
Merge pull request #881 from lucas-clemente/improve-flow-controller
remove the flow control manager
This commit is contained in:
110
internal/flowcontrol/base_flow_controller.go
Normal file
110
internal/flowcontrol/base_flow_controller.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/congestion"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
"github.com/lucas-clemente/quic-go/internal/utils"
|
||||
)
|
||||
|
||||
type baseFlowController struct {
|
||||
mutex sync.RWMutex
|
||||
|
||||
rttStats *congestion.RTTStats
|
||||
|
||||
bytesSent protocol.ByteCount
|
||||
sendWindow protocol.ByteCount
|
||||
|
||||
lastWindowUpdateTime time.Time
|
||||
|
||||
bytesRead protocol.ByteCount
|
||||
highestReceived protocol.ByteCount
|
||||
receiveWindow protocol.ByteCount
|
||||
receiveWindowIncrement protocol.ByteCount
|
||||
maxReceiveWindowIncrement protocol.ByteCount
|
||||
}
|
||||
|
||||
func (c *baseFlowController) AddBytesSent(n protocol.ByteCount) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
c.bytesSent += n
|
||||
}
|
||||
|
||||
// UpdateSendWindow should be called after receiving a WindowUpdateFrame
|
||||
// it returns true if the window was actually updated
|
||||
func (c *baseFlowController) UpdateSendWindow(offset protocol.ByteCount) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
if offset > c.sendWindow {
|
||||
c.sendWindow = offset
|
||||
}
|
||||
}
|
||||
|
||||
func (c *baseFlowController) sendWindowSize() protocol.ByteCount {
|
||||
// this only happens during connection establishment, when data is sent before we receive the peer's transport parameters
|
||||
if c.bytesSent > c.sendWindow {
|
||||
return 0
|
||||
}
|
||||
return c.sendWindow - c.bytesSent
|
||||
}
|
||||
|
||||
func (c *baseFlowController) AddBytesRead(n protocol.ByteCount) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
// pretend we sent a WindowUpdate when reading the first byte
|
||||
// this way auto-tuning of the window increment already works for the first WindowUpdate
|
||||
if c.bytesRead == 0 {
|
||||
c.lastWindowUpdateTime = time.Now()
|
||||
}
|
||||
c.bytesRead += n
|
||||
}
|
||||
|
||||
// getWindowUpdate updates the receive window, if necessary
|
||||
// it returns the new offset
|
||||
func (c *baseFlowController) getWindowUpdate() protocol.ByteCount {
|
||||
diff := c.receiveWindow - c.bytesRead
|
||||
// update the window when more than half of it was already consumed
|
||||
if diff >= (c.receiveWindowIncrement / 2) {
|
||||
return 0
|
||||
}
|
||||
|
||||
c.maybeAdjustWindowIncrement()
|
||||
c.receiveWindow = c.bytesRead + c.receiveWindowIncrement
|
||||
c.lastWindowUpdateTime = time.Now()
|
||||
return c.receiveWindow
|
||||
}
|
||||
|
||||
func (c *baseFlowController) IsBlocked() bool {
|
||||
c.mutex.RLock()
|
||||
defer c.mutex.RUnlock()
|
||||
|
||||
return c.sendWindowSize() == 0
|
||||
}
|
||||
|
||||
// maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending WindowUpdates too often
|
||||
func (c *baseFlowController) maybeAdjustWindowIncrement() {
|
||||
if c.lastWindowUpdateTime.IsZero() {
|
||||
return
|
||||
}
|
||||
|
||||
rtt := c.rttStats.SmoothedRTT()
|
||||
if rtt == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
timeSinceLastWindowUpdate := time.Since(c.lastWindowUpdateTime)
|
||||
// interval between the window updates is sufficiently large, no need to increase the increment
|
||||
if timeSinceLastWindowUpdate >= 2*rtt {
|
||||
return
|
||||
}
|
||||
c.receiveWindowIncrement = utils.MinByteCount(2*c.receiveWindowIncrement, c.maxReceiveWindowIncrement)
|
||||
}
|
||||
|
||||
func (c *baseFlowController) checkFlowControlViolation() bool {
|
||||
return c.highestReceived > c.receiveWindow
|
||||
}
|
||||
179
internal/flowcontrol/base_flow_controller_test.go
Normal file
179
internal/flowcontrol/base_flow_controller_test.go
Normal file
@@ -0,0 +1,179 @@
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/congestion"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Base Flow controller", func() {
|
||||
var controller *baseFlowController
|
||||
|
||||
BeforeEach(func() {
|
||||
controller = &baseFlowController{}
|
||||
controller.rttStats = &congestion.RTTStats{}
|
||||
})
|
||||
|
||||
Context("send flow control", func() {
|
||||
It("adds bytes sent", func() {
|
||||
controller.bytesSent = 5
|
||||
controller.AddBytesSent(6)
|
||||
Expect(controller.bytesSent).To(Equal(protocol.ByteCount(5 + 6)))
|
||||
})
|
||||
|
||||
It("gets the size of the remaining flow control window", func() {
|
||||
controller.bytesSent = 5
|
||||
controller.sendWindow = 12
|
||||
Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(12 - 5)))
|
||||
})
|
||||
|
||||
It("updates the size of the flow control window", func() {
|
||||
controller.AddBytesSent(5)
|
||||
controller.UpdateSendWindow(15)
|
||||
Expect(controller.sendWindow).To(Equal(protocol.ByteCount(15)))
|
||||
Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(15 - 5)))
|
||||
})
|
||||
|
||||
It("says that the window size is 0 if we sent more than we were allowed to", func() {
|
||||
controller.AddBytesSent(15)
|
||||
controller.UpdateSendWindow(10)
|
||||
Expect(controller.sendWindowSize()).To(BeZero())
|
||||
})
|
||||
|
||||
It("does not decrease the flow control window", func() {
|
||||
controller.UpdateSendWindow(20)
|
||||
Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(20)))
|
||||
controller.UpdateSendWindow(10)
|
||||
Expect(controller.sendWindowSize()).To(Equal(protocol.ByteCount(20)))
|
||||
})
|
||||
|
||||
It("says when it's blocked", func() {
|
||||
controller.UpdateSendWindow(100)
|
||||
Expect(controller.IsBlocked()).To(BeFalse())
|
||||
controller.AddBytesSent(100)
|
||||
Expect(controller.IsBlocked()).To(BeTrue())
|
||||
})
|
||||
})
|
||||
|
||||
Context("receive flow control", func() {
|
||||
var receiveWindow protocol.ByteCount = 10000
|
||||
var receiveWindowIncrement protocol.ByteCount = 600
|
||||
|
||||
BeforeEach(func() {
|
||||
controller.receiveWindow = receiveWindow
|
||||
controller.receiveWindowIncrement = receiveWindowIncrement
|
||||
})
|
||||
|
||||
It("adds bytes read", func() {
|
||||
controller.bytesRead = 5
|
||||
controller.AddBytesRead(6)
|
||||
Expect(controller.bytesRead).To(Equal(protocol.ByteCount(5 + 6)))
|
||||
})
|
||||
|
||||
It("triggers a window update when necessary", func() {
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-time.Hour)
|
||||
readPosition := receiveWindow - receiveWindowIncrement/2 + 1
|
||||
controller.bytesRead = readPosition
|
||||
offset := controller.getWindowUpdate()
|
||||
Expect(offset).To(Equal(readPosition + receiveWindowIncrement))
|
||||
Expect(controller.receiveWindow).To(Equal(readPosition + receiveWindowIncrement))
|
||||
Expect(controller.lastWindowUpdateTime).To(BeTemporally("~", time.Now(), 20*time.Millisecond))
|
||||
})
|
||||
|
||||
It("doesn't trigger a window update when not necessary", func() {
|
||||
lastWindowUpdateTime := time.Now().Add(-time.Hour)
|
||||
controller.lastWindowUpdateTime = lastWindowUpdateTime
|
||||
readPosition := receiveWindow - receiveWindow/2 - 1
|
||||
controller.bytesRead = readPosition
|
||||
offset := controller.getWindowUpdate()
|
||||
Expect(offset).To(BeZero())
|
||||
Expect(controller.lastWindowUpdateTime).To(Equal(lastWindowUpdateTime))
|
||||
})
|
||||
|
||||
Context("receive window increment auto-tuning", func() {
|
||||
var oldIncrement protocol.ByteCount
|
||||
|
||||
BeforeEach(func() {
|
||||
oldIncrement = controller.receiveWindowIncrement
|
||||
controller.maxReceiveWindowIncrement = 3000
|
||||
})
|
||||
|
||||
// update the congestion such that it returns a given value for the smoothed RTT
|
||||
setRtt := func(t time.Duration) {
|
||||
controller.rttStats.UpdateRTT(t, 0, time.Now())
|
||||
Expect(controller.rttStats.SmoothedRTT()).To(Equal(t)) // make sure it worked
|
||||
}
|
||||
|
||||
It("doesn't increase the increment for a new stream", func() {
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
|
||||
})
|
||||
|
||||
It("doesn't increase the increment when no RTT estimate is available", func() {
|
||||
setRtt(0)
|
||||
controller.lastWindowUpdateTime = time.Now()
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
|
||||
})
|
||||
|
||||
It("increases the increment when the last WindowUpdate was sent less than two RTTs ago", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond)
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement))
|
||||
})
|
||||
|
||||
It("doesn't increase the increase increment when the last WindowUpdate was sent more than two RTTs ago", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-45 * time.Millisecond)
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
|
||||
})
|
||||
|
||||
It("doesn't increase the increment to a value higher than the maxReceiveWindowIncrement", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond)
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement)) // 1200
|
||||
// because the lastWindowUpdateTime is updated by MaybeTriggerWindowUpdate(), we can just call maybeAdjustWindowIncrement() multiple times and get an increase of the increment every time
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(2 * 2 * oldIncrement)) // 2400
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(controller.maxReceiveWindowIncrement)) // 3000
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(controller.maxReceiveWindowIncrement)) // 3000
|
||||
})
|
||||
|
||||
It("returns the new increment when updating the window", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.AddBytesRead(9900) // receive window is 10000
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond)
|
||||
offset := controller.getWindowUpdate()
|
||||
Expect(offset).ToNot(BeZero())
|
||||
newIncrement := controller.receiveWindowIncrement
|
||||
Expect(newIncrement).To(Equal(2 * oldIncrement))
|
||||
Expect(offset).To(Equal(protocol.ByteCount(9900 + newIncrement)))
|
||||
})
|
||||
|
||||
It("increases the increment sent in the first WindowUpdate, if data is read fast enough", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.AddBytesRead(9900)
|
||||
offset := controller.getWindowUpdate()
|
||||
Expect(offset).ToNot(BeZero())
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement))
|
||||
})
|
||||
|
||||
It("doesn't increamse the increment sent in the first WindowUpdate, if data is read slowly", func() {
|
||||
setRtt(5 * time.Millisecond)
|
||||
controller.AddBytesRead(9900)
|
||||
time.Sleep(15 * time.Millisecond) // more than 2x RTT
|
||||
offset := controller.getWindowUpdate()
|
||||
Expect(offset).ToNot(BeZero())
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
77
internal/flowcontrol/connection_flow_controller.go
Normal file
77
internal/flowcontrol/connection_flow_controller.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/congestion"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
"github.com/lucas-clemente/quic-go/internal/utils"
|
||||
"github.com/lucas-clemente/quic-go/qerr"
|
||||
)
|
||||
|
||||
type connectionFlowController struct {
|
||||
baseFlowController
|
||||
}
|
||||
|
||||
var _ ConnectionFlowController = &connectionFlowController{}
|
||||
|
||||
// NewConnectionFlowController gets a new flow controller for the connection
|
||||
// It is created before we receive the peer's transport paramenters, thus it starts with a sendWindow of 0.
|
||||
func NewConnectionFlowController(
|
||||
receiveWindow protocol.ByteCount,
|
||||
maxReceiveWindow protocol.ByteCount,
|
||||
rttStats *congestion.RTTStats,
|
||||
) ConnectionFlowController {
|
||||
return &connectionFlowController{
|
||||
baseFlowController: baseFlowController{
|
||||
rttStats: rttStats,
|
||||
receiveWindow: receiveWindow,
|
||||
receiveWindowIncrement: receiveWindow,
|
||||
maxReceiveWindowIncrement: maxReceiveWindow,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *connectionFlowController) SendWindowSize() protocol.ByteCount {
|
||||
c.mutex.RLock()
|
||||
defer c.mutex.RUnlock()
|
||||
|
||||
return c.baseFlowController.sendWindowSize()
|
||||
}
|
||||
|
||||
// IncrementHighestReceived adds an increment to the highestReceived value
|
||||
func (c *connectionFlowController) IncrementHighestReceived(increment protocol.ByteCount) error {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
c.highestReceived += increment
|
||||
if c.checkFlowControlViolation() {
|
||||
return qerr.Error(qerr.FlowControlReceivedTooMuchData, fmt.Sprintf("Received %d bytes for the connection, allowed %d bytes", c.highestReceived, c.receiveWindow))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *connectionFlowController) GetWindowUpdate() protocol.ByteCount {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
oldWindowIncrement := c.receiveWindowIncrement
|
||||
offset := c.baseFlowController.getWindowUpdate()
|
||||
if oldWindowIncrement < c.receiveWindowIncrement {
|
||||
utils.Debugf("Increasing receive flow control window for the connection to %d kB", c.receiveWindowIncrement/(1<<10))
|
||||
}
|
||||
return offset
|
||||
}
|
||||
|
||||
// EnsureMinimumWindowIncrement sets a minimum window increment
|
||||
// it should make sure that the connection-level window is increased when a stream-level window grows
|
||||
func (c *connectionFlowController) EnsureMinimumWindowIncrement(inc protocol.ByteCount) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
if inc > c.receiveWindowIncrement {
|
||||
c.receiveWindowIncrement = utils.MinByteCount(inc, c.maxReceiveWindowIncrement)
|
||||
c.lastWindowUpdateTime = time.Time{} // disables autotuning for the next window update
|
||||
}
|
||||
}
|
||||
109
internal/flowcontrol/connection_flow_controller_test.go
Normal file
109
internal/flowcontrol/connection_flow_controller_test.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/congestion"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Connection Flow controller", func() {
|
||||
var controller *connectionFlowController
|
||||
|
||||
// update the congestion such that it returns a given value for the smoothed RTT
|
||||
setRtt := func(t time.Duration) {
|
||||
controller.rttStats.UpdateRTT(t, 0, time.Now())
|
||||
Expect(controller.rttStats.SmoothedRTT()).To(Equal(t)) // make sure it worked
|
||||
}
|
||||
|
||||
BeforeEach(func() {
|
||||
controller = &connectionFlowController{}
|
||||
controller.rttStats = &congestion.RTTStats{}
|
||||
})
|
||||
|
||||
Context("Constructor", func() {
|
||||
rttStats := &congestion.RTTStats{}
|
||||
|
||||
It("sets the send and receive windows", func() {
|
||||
receiveWindow := protocol.ByteCount(2000)
|
||||
maxReceiveWindow := protocol.ByteCount(3000)
|
||||
|
||||
fc := NewConnectionFlowController(receiveWindow, maxReceiveWindow, rttStats).(*connectionFlowController)
|
||||
Expect(fc.receiveWindow).To(Equal(receiveWindow))
|
||||
Expect(fc.maxReceiveWindowIncrement).To(Equal(maxReceiveWindow))
|
||||
})
|
||||
})
|
||||
|
||||
Context("receive flow control", func() {
|
||||
It("increases the highestReceived by a given increment", func() {
|
||||
controller.highestReceived = 1337
|
||||
controller.IncrementHighestReceived(123)
|
||||
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337 + 123)))
|
||||
})
|
||||
|
||||
Context("getting window updates", func() {
|
||||
BeforeEach(func() {
|
||||
controller.receiveWindow = 100
|
||||
controller.receiveWindowIncrement = 60
|
||||
controller.maxReceiveWindowIncrement = 1000
|
||||
})
|
||||
|
||||
It("gets a window update", func() {
|
||||
controller.AddBytesRead(80)
|
||||
offset := controller.GetWindowUpdate()
|
||||
Expect(offset).To(Equal(protocol.ByteCount(80 + 60)))
|
||||
})
|
||||
|
||||
It("autotunes the window", func() {
|
||||
controller.AddBytesRead(80)
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond)
|
||||
offset := controller.GetWindowUpdate()
|
||||
Expect(offset).To(Equal(protocol.ByteCount(80 + 2*60)))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Context("setting the minimum increment", func() {
|
||||
var (
|
||||
oldIncrement protocol.ByteCount
|
||||
receiveWindow protocol.ByteCount = 10000
|
||||
receiveWindowIncrement protocol.ByteCount = 600
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
controller.receiveWindow = receiveWindow
|
||||
controller.receiveWindowIncrement = receiveWindowIncrement
|
||||
oldIncrement = controller.receiveWindowIncrement
|
||||
controller.maxReceiveWindowIncrement = 3000
|
||||
})
|
||||
|
||||
It("sets the minimum window increment", func() {
|
||||
controller.EnsureMinimumWindowIncrement(1000)
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(protocol.ByteCount(1000)))
|
||||
})
|
||||
|
||||
It("doesn't reduce the window increment", func() {
|
||||
controller.EnsureMinimumWindowIncrement(1)
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
|
||||
})
|
||||
|
||||
It("doens't increase the increment beyond the maxReceiveWindowIncrement", func() {
|
||||
max := controller.maxReceiveWindowIncrement
|
||||
controller.EnsureMinimumWindowIncrement(2 * max)
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(max))
|
||||
})
|
||||
|
||||
It("doesn't auto-tune the window after the increment was increased", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.bytesRead = 9900 // receive window is 10000
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-20 * time.Millisecond)
|
||||
controller.EnsureMinimumWindowIncrement(912)
|
||||
offset := controller.getWindowUpdate()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(protocol.ByteCount(912))) // no auto-tuning
|
||||
Expect(offset).To(Equal(protocol.ByteCount(9900 + 912)))
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -1,255 +0,0 @@
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/congestion"
|
||||
"github.com/lucas-clemente/quic-go/internal/handshake"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
"github.com/lucas-clemente/quic-go/internal/utils"
|
||||
"github.com/lucas-clemente/quic-go/qerr"
|
||||
)
|
||||
|
||||
type flowControlManager struct {
|
||||
rttStats *congestion.RTTStats
|
||||
maxReceiveStreamWindow protocol.ByteCount
|
||||
|
||||
streamFlowController map[protocol.StreamID]*flowController
|
||||
connFlowController *flowController
|
||||
mutex sync.RWMutex
|
||||
|
||||
initialStreamSendWindow protocol.ByteCount
|
||||
}
|
||||
|
||||
var _ FlowControlManager = &flowControlManager{}
|
||||
|
||||
var errMapAccess = errors.New("Error accessing the flowController map")
|
||||
|
||||
// NewFlowControlManager creates a new flow control manager
|
||||
func NewFlowControlManager(
|
||||
maxReceiveStreamWindow protocol.ByteCount,
|
||||
maxReceiveConnectionWindow protocol.ByteCount,
|
||||
rttStats *congestion.RTTStats,
|
||||
) FlowControlManager {
|
||||
return &flowControlManager{
|
||||
rttStats: rttStats,
|
||||
maxReceiveStreamWindow: maxReceiveStreamWindow,
|
||||
streamFlowController: make(map[protocol.StreamID]*flowController),
|
||||
connFlowController: newFlowController(0, false, protocol.ReceiveConnectionFlowControlWindow, maxReceiveConnectionWindow, 0, rttStats),
|
||||
}
|
||||
}
|
||||
|
||||
// NewStream creates new flow controllers for a stream
|
||||
// it does nothing if the stream already exists
|
||||
func (f *flowControlManager) NewStream(streamID protocol.StreamID, contributesToConnection bool) {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
if _, ok := f.streamFlowController[streamID]; ok {
|
||||
return
|
||||
}
|
||||
f.streamFlowController[streamID] = newFlowController(streamID, contributesToConnection, protocol.ReceiveStreamFlowControlWindow, f.maxReceiveStreamWindow, f.initialStreamSendWindow, f.rttStats)
|
||||
}
|
||||
|
||||
// RemoveStream removes a closed stream from flow control
|
||||
func (f *flowControlManager) RemoveStream(streamID protocol.StreamID) {
|
||||
f.mutex.Lock()
|
||||
delete(f.streamFlowController, streamID)
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (f *flowControlManager) UpdateTransportParameters(params *handshake.TransportParameters) {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
f.connFlowController.UpdateSendWindow(params.ConnectionFlowControlWindow)
|
||||
f.initialStreamSendWindow = params.StreamFlowControlWindow
|
||||
for _, fc := range f.streamFlowController {
|
||||
fc.UpdateSendWindow(params.StreamFlowControlWindow)
|
||||
}
|
||||
}
|
||||
|
||||
// ResetStream should be called when receiving a RstStreamFrame
|
||||
// it updates the byte offset to the value in the RstStreamFrame
|
||||
// streamID must not be 0 here
|
||||
func (f *flowControlManager) ResetStream(streamID protocol.StreamID, byteOffset protocol.ByteCount) error {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
streamFlowController, err := f.getFlowController(streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
increment, err := streamFlowController.UpdateHighestReceived(byteOffset)
|
||||
if err != nil {
|
||||
return qerr.StreamDataAfterTermination
|
||||
}
|
||||
|
||||
if streamFlowController.CheckFlowControlViolation() {
|
||||
return qerr.Error(qerr.FlowControlReceivedTooMuchData, fmt.Sprintf("Received %d bytes on stream %d, allowed %d bytes", byteOffset, streamID, streamFlowController.receiveWindow))
|
||||
}
|
||||
|
||||
if streamFlowController.ContributesToConnection() {
|
||||
f.connFlowController.IncrementHighestReceived(increment)
|
||||
if f.connFlowController.CheckFlowControlViolation() {
|
||||
return qerr.Error(qerr.FlowControlReceivedTooMuchData, fmt.Sprintf("Received %d bytes for the connection, allowed %d bytes", f.connFlowController.highestReceived, f.connFlowController.receiveWindow))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateHighestReceived updates the highest received byte offset for a stream
|
||||
// it adds the number of additional bytes to connection level flow control
|
||||
// streamID must not be 0 here
|
||||
func (f *flowControlManager) UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
streamFlowController, err := f.getFlowController(streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// UpdateHighestReceived returns an ErrReceivedSmallerByteOffset when StreamFrames got reordered
|
||||
// this error can be ignored here
|
||||
increment, _ := streamFlowController.UpdateHighestReceived(byteOffset)
|
||||
|
||||
if streamFlowController.CheckFlowControlViolation() {
|
||||
return qerr.Error(qerr.FlowControlReceivedTooMuchData, fmt.Sprintf("Received %d bytes on stream %d, allowed %d bytes", byteOffset, streamID, streamFlowController.receiveWindow))
|
||||
}
|
||||
|
||||
if streamFlowController.ContributesToConnection() {
|
||||
f.connFlowController.IncrementHighestReceived(increment)
|
||||
if f.connFlowController.CheckFlowControlViolation() {
|
||||
return qerr.Error(qerr.FlowControlReceivedTooMuchData, fmt.Sprintf("Received %d bytes for the connection, allowed %d bytes", f.connFlowController.highestReceived, f.connFlowController.receiveWindow))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// streamID must not be 0 here
|
||||
func (f *flowControlManager) AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
fc, err := f.getFlowController(streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fc.AddBytesRead(n)
|
||||
if fc.ContributesToConnection() {
|
||||
f.connFlowController.AddBytesRead(n)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *flowControlManager) GetWindowUpdates() (res []WindowUpdate) {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
// get WindowUpdates for streams
|
||||
for id, fc := range f.streamFlowController {
|
||||
if necessary, newIncrement, offset := fc.MaybeUpdateWindow(); necessary {
|
||||
res = append(res, WindowUpdate{StreamID: id, Offset: offset})
|
||||
if fc.ContributesToConnection() && newIncrement != 0 {
|
||||
f.connFlowController.EnsureMinimumWindowIncrement(protocol.ByteCount(float64(newIncrement) * protocol.ConnectionFlowControlMultiplier))
|
||||
}
|
||||
}
|
||||
}
|
||||
// get a WindowUpdate for the connection
|
||||
if necessary, _, offset := f.connFlowController.MaybeUpdateWindow(); necessary {
|
||||
res = append(res, WindowUpdate{StreamID: 0, Offset: offset})
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (f *flowControlManager) GetReceiveWindow(streamID protocol.StreamID) (protocol.ByteCount, error) {
|
||||
f.mutex.RLock()
|
||||
defer f.mutex.RUnlock()
|
||||
|
||||
// StreamID can be 0 when retransmitting
|
||||
if streamID == 0 {
|
||||
return f.connFlowController.receiveWindow, nil
|
||||
}
|
||||
|
||||
flowController, err := f.getFlowController(streamID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return flowController.receiveWindow, nil
|
||||
}
|
||||
|
||||
// streamID must not be 0 here
|
||||
func (f *flowControlManager) AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
fc, err := f.getFlowController(streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fc.AddBytesSent(n)
|
||||
if fc.ContributesToConnection() {
|
||||
f.connFlowController.AddBytesSent(n)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// must not be called with StreamID 0
|
||||
func (f *flowControlManager) SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error) {
|
||||
f.mutex.RLock()
|
||||
defer f.mutex.RUnlock()
|
||||
|
||||
fc, err := f.getFlowController(streamID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
res := fc.SendWindowSize()
|
||||
|
||||
if fc.ContributesToConnection() {
|
||||
res = utils.MinByteCount(res, f.connFlowController.SendWindowSize())
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (f *flowControlManager) RemainingConnectionWindowSize() protocol.ByteCount {
|
||||
f.mutex.RLock()
|
||||
defer f.mutex.RUnlock()
|
||||
|
||||
return f.connFlowController.SendWindowSize()
|
||||
}
|
||||
|
||||
// streamID may be 0 here
|
||||
func (f *flowControlManager) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
var fc *flowController
|
||||
if streamID == 0 {
|
||||
fc = f.connFlowController
|
||||
} else {
|
||||
var err error
|
||||
fc, err = f.getFlowController(streamID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
return fc.UpdateSendWindow(offset), nil
|
||||
}
|
||||
|
||||
func (f *flowControlManager) getFlowController(streamID protocol.StreamID) (*flowController, error) {
|
||||
streamFlowController, ok := f.streamFlowController[streamID]
|
||||
if !ok {
|
||||
return nil, errMapAccess
|
||||
}
|
||||
return streamFlowController, nil
|
||||
}
|
||||
@@ -1,374 +0,0 @@
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/internal/handshake"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/congestion"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
"github.com/lucas-clemente/quic-go/qerr"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Flow Control Manager", func() {
|
||||
var fcm *flowControlManager
|
||||
|
||||
BeforeEach(func() {
|
||||
fcm = NewFlowControlManager(
|
||||
0x2000, // maxReceiveStreamWindow
|
||||
0x4000, // maxReceiveConnectionWindow
|
||||
&congestion.RTTStats{},
|
||||
).(*flowControlManager)
|
||||
})
|
||||
|
||||
It("creates a connection level flow controller", func() {
|
||||
Expect(fcm.streamFlowController).To(BeEmpty())
|
||||
Expect(fcm.connFlowController.ContributesToConnection()).To(BeFalse())
|
||||
Expect(fcm.connFlowController.sendWindow).To(BeZero())
|
||||
Expect(fcm.connFlowController.maxReceiveWindowIncrement).To(Equal(protocol.ByteCount(0x4000)))
|
||||
})
|
||||
|
||||
Context("creating new streams", func() {
|
||||
It("creates a new stream", func() {
|
||||
fcm.NewStream(5, false)
|
||||
Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(5)))
|
||||
fc := fcm.streamFlowController[5]
|
||||
Expect(fc.streamID).To(Equal(protocol.StreamID(5)))
|
||||
Expect(fc.ContributesToConnection()).To(BeFalse())
|
||||
// the transport parameters have not yet been received. Start with a window of size 0
|
||||
Expect(fc.sendWindow).To(BeZero())
|
||||
Expect(fc.maxReceiveWindowIncrement).To(Equal(protocol.ByteCount(0x2000)))
|
||||
})
|
||||
|
||||
It("creates a new stream after it has received transport parameters", func() {
|
||||
fcm.UpdateTransportParameters(&handshake.TransportParameters{
|
||||
StreamFlowControlWindow: 0x3000,
|
||||
})
|
||||
fcm.NewStream(5, false)
|
||||
Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(5)))
|
||||
fc := fcm.streamFlowController[5]
|
||||
Expect(fc.sendWindow).To(Equal(protocol.ByteCount(0x3000)))
|
||||
})
|
||||
|
||||
It("doesn't create a new flow controller if called for an existing stream", func() {
|
||||
fcm.NewStream(5, true)
|
||||
Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(5)))
|
||||
fcm.streamFlowController[5].bytesRead = 0x1337
|
||||
fcm.NewStream(5, false)
|
||||
fc := fcm.streamFlowController[5]
|
||||
Expect(fc.bytesRead).To(BeEquivalentTo(0x1337))
|
||||
Expect(fc.ContributesToConnection()).To(BeTrue())
|
||||
})
|
||||
})
|
||||
|
||||
It("removes streams", func() {
|
||||
fcm.NewStream(5, true)
|
||||
Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(5)))
|
||||
fcm.RemoveStream(5)
|
||||
Expect(fcm.streamFlowController).ToNot(HaveKey(protocol.StreamID(5)))
|
||||
})
|
||||
|
||||
It("updates the send windows for existing streams when receiveing the transport parameters", func() {
|
||||
fcm.NewStream(5, false)
|
||||
fcm.UpdateTransportParameters(&handshake.TransportParameters{
|
||||
StreamFlowControlWindow: 0x3000,
|
||||
ConnectionFlowControlWindow: 0x6000,
|
||||
})
|
||||
Expect(fcm.connFlowController.sendWindow).To(Equal(protocol.ByteCount(0x6000)))
|
||||
Expect(fcm.streamFlowController[5].sendWindow).To(Equal(protocol.ByteCount(0x3000)))
|
||||
})
|
||||
|
||||
Context("receiving data", func() {
|
||||
BeforeEach(func() {
|
||||
fcm.NewStream(1, false)
|
||||
fcm.NewStream(4, true)
|
||||
fcm.NewStream(6, true)
|
||||
|
||||
for _, fc := range fcm.streamFlowController {
|
||||
fc.receiveWindow = 100
|
||||
fc.receiveWindowIncrement = 100
|
||||
}
|
||||
fcm.connFlowController.receiveWindow = 200
|
||||
fcm.connFlowController.receiveWindowIncrement = 200
|
||||
})
|
||||
|
||||
It("updates the connection level flow controller if the stream contributes", func() {
|
||||
err := fcm.UpdateHighestReceived(4, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(fcm.connFlowController.highestReceived).To(Equal(protocol.ByteCount(100)))
|
||||
Expect(fcm.streamFlowController[4].highestReceived).To(Equal(protocol.ByteCount(100)))
|
||||
})
|
||||
|
||||
It("adds the offsets of multiple streams for the connection flow control window", func() {
|
||||
err := fcm.UpdateHighestReceived(4, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.UpdateHighestReceived(6, 50)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(fcm.connFlowController.highestReceived).To(Equal(protocol.ByteCount(100 + 50)))
|
||||
})
|
||||
|
||||
It("does not update the connection level flow controller if the stream does not contribute", func() {
|
||||
err := fcm.UpdateHighestReceived(1, 100)
|
||||
// fcm.streamFlowController[4].receiveWindow = 0x1000
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(fcm.connFlowController.highestReceived).To(BeZero())
|
||||
Expect(fcm.streamFlowController[1].highestReceived).To(Equal(protocol.ByteCount(100)))
|
||||
})
|
||||
|
||||
It("returns an error when called with an unknown stream", func() {
|
||||
err := fcm.UpdateHighestReceived(1337, 0x1337)
|
||||
Expect(err).To(MatchError(errMapAccess))
|
||||
})
|
||||
|
||||
It("gets the offset of the receive window", func() {
|
||||
offset, err := fcm.GetReceiveWindow(4)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(offset).To(Equal(protocol.ByteCount(100)))
|
||||
})
|
||||
|
||||
It("errors when asked for the receive window of a stream that doesn't exist", func() {
|
||||
_, err := fcm.GetReceiveWindow(17)
|
||||
Expect(err).To(MatchError(errMapAccess))
|
||||
})
|
||||
|
||||
It("gets the offset of the connection-level receive window", func() {
|
||||
offset, err := fcm.GetReceiveWindow(0)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(offset).To(Equal(protocol.ByteCount(200)))
|
||||
})
|
||||
|
||||
Context("flow control violations", func() {
|
||||
It("errors when encountering a stream level flow control violation", func() {
|
||||
err := fcm.UpdateHighestReceived(4, 101)
|
||||
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 101 bytes on stream 4, allowed 100 bytes")))
|
||||
})
|
||||
|
||||
It("errors when encountering a connection-level flow control violation", func() {
|
||||
fcm.streamFlowController[4].receiveWindow = 300
|
||||
fcm.streamFlowController[6].receiveWindow = 300
|
||||
err := fcm.UpdateHighestReceived(6, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.UpdateHighestReceived(4, 103)
|
||||
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 203 bytes for the connection, allowed 200 bytes")))
|
||||
})
|
||||
})
|
||||
|
||||
Context("window updates", func() {
|
||||
// update the congestion such that it returns a given value for the smoothed RTT
|
||||
setRtt := func(t time.Duration) {
|
||||
for _, controller := range fcm.streamFlowController {
|
||||
controller.rttStats.UpdateRTT(t, 0, time.Now())
|
||||
Expect(controller.rttStats.SmoothedRTT()).To(Equal(t)) // make sure it worked
|
||||
}
|
||||
}
|
||||
|
||||
It("gets stream level window updates", func() {
|
||||
err := fcm.UpdateHighestReceived(4, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.AddBytesRead(4, 90)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
updates := fcm.GetWindowUpdates()
|
||||
Expect(updates).To(HaveLen(1))
|
||||
Expect(updates[0]).To(Equal(WindowUpdate{StreamID: 4, Offset: 190}))
|
||||
})
|
||||
|
||||
It("gets connection level window updates", func() {
|
||||
err := fcm.UpdateHighestReceived(4, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.UpdateHighestReceived(6, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.AddBytesRead(4, 90)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.AddBytesRead(6, 90)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
updates := fcm.GetWindowUpdates()
|
||||
Expect(updates).To(HaveLen(3))
|
||||
Expect(updates).ToNot(ContainElement(WindowUpdate{StreamID: 0, Offset: 200}))
|
||||
})
|
||||
|
||||
It("errors when AddBytesRead is called for a stream doesn't exist", func() {
|
||||
err := fcm.AddBytesRead(17, 1000)
|
||||
Expect(err).To(MatchError(errMapAccess))
|
||||
})
|
||||
|
||||
It("increases the connection-level window, when a stream window was increased by autotuning", func() {
|
||||
setRtt(10 * time.Millisecond)
|
||||
fcm.streamFlowController[4].lastWindowUpdateTime = time.Now().Add(-1 * time.Millisecond)
|
||||
err := fcm.UpdateHighestReceived(4, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.AddBytesRead(4, 90)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
updates := fcm.GetWindowUpdates()
|
||||
Expect(updates).To(HaveLen(2))
|
||||
connLevelIncrement := protocol.ByteCount(protocol.ConnectionFlowControlMultiplier * 200) // 300
|
||||
Expect(updates).To(ContainElement(WindowUpdate{StreamID: 4, Offset: 290}))
|
||||
Expect(updates).To(ContainElement(WindowUpdate{StreamID: 0, Offset: 90 + connLevelIncrement}))
|
||||
})
|
||||
|
||||
It("doesn't increase the connection-level window, when a non-contributing stream window was increased by autotuning", func() {
|
||||
setRtt(10 * time.Millisecond)
|
||||
fcm.streamFlowController[1].lastWindowUpdateTime = time.Now().Add(-1 * time.Millisecond)
|
||||
err := fcm.UpdateHighestReceived(1, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.AddBytesRead(1, 90)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
updates := fcm.GetWindowUpdates()
|
||||
Expect(updates).To(HaveLen(1))
|
||||
Expect(updates).To(ContainElement(WindowUpdate{StreamID: 1, Offset: 290}))
|
||||
// the only window update is for stream 1, thus there's no connection-level window update
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Context("resetting a stream", func() {
|
||||
BeforeEach(func() {
|
||||
fcm.NewStream(1, false)
|
||||
fcm.NewStream(4, true)
|
||||
fcm.NewStream(6, true)
|
||||
fcm.streamFlowController[1].bytesSent = 41
|
||||
fcm.streamFlowController[4].bytesSent = 42
|
||||
|
||||
for _, fc := range fcm.streamFlowController {
|
||||
fc.receiveWindow = 100
|
||||
fc.receiveWindowIncrement = 100
|
||||
}
|
||||
fcm.connFlowController.receiveWindow = 200
|
||||
fcm.connFlowController.receiveWindowIncrement = 200
|
||||
})
|
||||
|
||||
It("updates the connection level flow controller if the stream contributes", func() {
|
||||
err := fcm.ResetStream(4, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(fcm.connFlowController.highestReceived).To(Equal(protocol.ByteCount(100)))
|
||||
Expect(fcm.streamFlowController[4].highestReceived).To(Equal(protocol.ByteCount(100)))
|
||||
})
|
||||
|
||||
It("does not update the connection level flow controller if the stream does not contribute", func() {
|
||||
err := fcm.ResetStream(1, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(fcm.connFlowController.highestReceived).To(BeZero())
|
||||
Expect(fcm.streamFlowController[1].highestReceived).To(Equal(protocol.ByteCount(100)))
|
||||
})
|
||||
|
||||
It("errors if the byteOffset is smaller than a byteOffset that set earlier", func() {
|
||||
err := fcm.UpdateHighestReceived(4, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.ResetStream(4, 50)
|
||||
Expect(err).To(MatchError(qerr.StreamDataAfterTermination))
|
||||
})
|
||||
|
||||
It("returns an error when called with an unknown stream", func() {
|
||||
err := fcm.ResetStream(1337, 0x1337)
|
||||
Expect(err).To(MatchError(errMapAccess))
|
||||
})
|
||||
|
||||
Context("flow control violations", func() {
|
||||
It("errors when encountering a stream level flow control violation", func() {
|
||||
err := fcm.ResetStream(4, 101)
|
||||
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 101 bytes on stream 4, allowed 100 bytes")))
|
||||
})
|
||||
|
||||
It("errors when encountering a connection-level flow control violation", func() {
|
||||
fcm.streamFlowController[4].receiveWindow = 300
|
||||
fcm.streamFlowController[6].receiveWindow = 300
|
||||
err := fcm.ResetStream(4, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.ResetStream(6, 101)
|
||||
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 201 bytes for the connection, allowed 200 bytes")))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Context("sending data", func() {
|
||||
It("adds bytes sent for all stream contributing to connection level flow control", func() {
|
||||
fcm.NewStream(1, false)
|
||||
fcm.NewStream(3, true)
|
||||
fcm.NewStream(5, true)
|
||||
err := fcm.AddBytesSent(1, 100)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.AddBytesSent(3, 200)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = fcm.AddBytesSent(5, 500)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(fcm.connFlowController.bytesSent).To(Equal(protocol.ByteCount(200 + 500)))
|
||||
})
|
||||
|
||||
It("errors when called for a stream doesn't exist", func() {
|
||||
err := fcm.AddBytesSent(17, 1000)
|
||||
Expect(err).To(MatchError(errMapAccess))
|
||||
})
|
||||
|
||||
Context("window updates", func() {
|
||||
It("updates the window for a normal stream", func() {
|
||||
fcm.NewStream(5, true)
|
||||
updated, err := fcm.UpdateWindow(5, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(updated).To(BeTrue())
|
||||
})
|
||||
|
||||
It("updates the connection level window", func() {
|
||||
updated, err := fcm.UpdateWindow(0, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(updated).To(BeTrue())
|
||||
})
|
||||
|
||||
It("errors when called for a stream that doesn't exist", func() {
|
||||
_, err := fcm.UpdateWindow(17, 1000)
|
||||
Expect(err).To(MatchError(errMapAccess))
|
||||
})
|
||||
})
|
||||
|
||||
Context("window sizes", func() {
|
||||
It("gets the window size of a stream", func() {
|
||||
fcm.NewStream(5, false)
|
||||
updated, err := fcm.UpdateWindow(5, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(updated).To(BeTrue())
|
||||
fcm.AddBytesSent(5, 500)
|
||||
size, err := fcm.SendWindowSize(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(size).To(Equal(protocol.ByteCount(1000 - 500)))
|
||||
})
|
||||
|
||||
It("gets the connection window size", func() {
|
||||
fcm.NewStream(5, true)
|
||||
updated, err := fcm.UpdateWindow(0, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(updated).To(BeTrue())
|
||||
fcm.AddBytesSent(5, 500)
|
||||
size := fcm.RemainingConnectionWindowSize()
|
||||
Expect(size).To(Equal(protocol.ByteCount(1000 - 500)))
|
||||
})
|
||||
|
||||
It("erros when asked for the send window size of a stream that doesn't exist", func() {
|
||||
_, err := fcm.SendWindowSize(17)
|
||||
Expect(err).To(MatchError(errMapAccess))
|
||||
})
|
||||
|
||||
It("limits the stream window size by the connection window size", func() {
|
||||
fcm.NewStream(5, true)
|
||||
updated, err := fcm.UpdateWindow(0, 500)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(updated).To(BeTrue())
|
||||
updated, err = fcm.UpdateWindow(5, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(updated).To(BeTrue())
|
||||
size, err := fcm.SendWindowSize(5)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(size).To(Equal(protocol.ByteCount(500)))
|
||||
})
|
||||
|
||||
It("does not reduce the size of the connection level window, if the stream does not contribute", func() {
|
||||
fcm.NewStream(3, false)
|
||||
updated, err := fcm.UpdateWindow(0, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(updated).To(BeTrue())
|
||||
fcm.AddBytesSent(3, 456) // WindowSize should return the same value no matter how much was sent
|
||||
size := fcm.RemainingConnectionWindowSize()
|
||||
Expect(size).To(Equal(protocol.ByteCount(1000)))
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -1,179 +0,0 @@
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/congestion"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
"github.com/lucas-clemente/quic-go/internal/utils"
|
||||
)
|
||||
|
||||
type flowController struct {
|
||||
streamID protocol.StreamID
|
||||
contributesToConnection bool // does the stream contribute to connection level flow control
|
||||
|
||||
rttStats *congestion.RTTStats
|
||||
|
||||
bytesSent protocol.ByteCount
|
||||
sendWindow protocol.ByteCount
|
||||
|
||||
lastWindowUpdateTime time.Time
|
||||
|
||||
bytesRead protocol.ByteCount
|
||||
highestReceived protocol.ByteCount
|
||||
receiveWindow protocol.ByteCount
|
||||
receiveWindowIncrement protocol.ByteCount
|
||||
maxReceiveWindowIncrement protocol.ByteCount
|
||||
}
|
||||
|
||||
// ErrReceivedSmallerByteOffset occurs if the ByteOffset received is smaller than a ByteOffset that was set previously
|
||||
var ErrReceivedSmallerByteOffset = errors.New("Received a smaller byte offset")
|
||||
|
||||
// newFlowController gets a new flow controller
|
||||
func newFlowController(
|
||||
streamID protocol.StreamID,
|
||||
contributesToConnection bool,
|
||||
receiveWindow protocol.ByteCount,
|
||||
maxReceiveWindow protocol.ByteCount,
|
||||
initialSendWindow protocol.ByteCount,
|
||||
rttStats *congestion.RTTStats,
|
||||
) *flowController {
|
||||
return &flowController{
|
||||
streamID: streamID,
|
||||
contributesToConnection: contributesToConnection,
|
||||
rttStats: rttStats,
|
||||
receiveWindow: receiveWindow,
|
||||
receiveWindowIncrement: receiveWindow,
|
||||
maxReceiveWindowIncrement: maxReceiveWindow,
|
||||
sendWindow: initialSendWindow,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *flowController) ContributesToConnection() bool {
|
||||
return c.contributesToConnection
|
||||
}
|
||||
|
||||
func (c *flowController) AddBytesSent(n protocol.ByteCount) {
|
||||
c.bytesSent += n
|
||||
}
|
||||
|
||||
// UpdateSendWindow should be called after receiving a WindowUpdateFrame
|
||||
// it returns true if the window was actually updated
|
||||
func (c *flowController) UpdateSendWindow(newOffset protocol.ByteCount) bool {
|
||||
if newOffset > c.sendWindow {
|
||||
c.sendWindow = newOffset
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *flowController) SendWindowSize() protocol.ByteCount {
|
||||
// this only happens during connection establishment, when data is sent before we receive the peer's transport parameters
|
||||
if c.bytesSent > c.sendWindow {
|
||||
return 0
|
||||
}
|
||||
return c.sendWindow - c.bytesSent
|
||||
}
|
||||
|
||||
// UpdateHighestReceived updates the highestReceived value, if the byteOffset is higher
|
||||
// Should **only** be used for the stream-level FlowController
|
||||
// it returns an ErrReceivedSmallerByteOffset if the received byteOffset is smaller than any byteOffset received before
|
||||
// This error occurs every time StreamFrames get reordered and has to be ignored in that case
|
||||
// It should only be treated as an error when resetting a stream
|
||||
func (c *flowController) UpdateHighestReceived(byteOffset protocol.ByteCount) (protocol.ByteCount, error) {
|
||||
if byteOffset == c.highestReceived {
|
||||
return 0, nil
|
||||
}
|
||||
if byteOffset > c.highestReceived {
|
||||
increment := byteOffset - c.highestReceived
|
||||
c.highestReceived = byteOffset
|
||||
return increment, nil
|
||||
}
|
||||
return 0, ErrReceivedSmallerByteOffset
|
||||
}
|
||||
|
||||
// IncrementHighestReceived adds an increment to the highestReceived value
|
||||
// Should **only** be used for the connection-level FlowController
|
||||
func (c *flowController) IncrementHighestReceived(increment protocol.ByteCount) {
|
||||
c.highestReceived += increment
|
||||
}
|
||||
|
||||
func (c *flowController) AddBytesRead(n protocol.ByteCount) {
|
||||
// pretend we sent a WindowUpdate when reading the first byte
|
||||
// this way auto-tuning of the window increment already works for the first WindowUpdate
|
||||
if c.bytesRead == 0 {
|
||||
c.lastWindowUpdateTime = time.Now()
|
||||
}
|
||||
c.bytesRead += n
|
||||
}
|
||||
|
||||
// MaybeUpdateWindow updates the receive window, if necessary
|
||||
// if the receive window increment is changed, the new value is returned, otherwise a 0
|
||||
// the last return value is the new offset of the receive window
|
||||
func (c *flowController) MaybeUpdateWindow() (bool, protocol.ByteCount /* new increment */, protocol.ByteCount /* new offset */) {
|
||||
diff := c.receiveWindow - c.bytesRead
|
||||
|
||||
// Chromium implements the same threshold
|
||||
if diff < (c.receiveWindowIncrement / 2) {
|
||||
var newWindowIncrement protocol.ByteCount
|
||||
oldWindowIncrement := c.receiveWindowIncrement
|
||||
|
||||
c.maybeAdjustWindowIncrement()
|
||||
if c.receiveWindowIncrement != oldWindowIncrement {
|
||||
newWindowIncrement = c.receiveWindowIncrement
|
||||
}
|
||||
|
||||
c.lastWindowUpdateTime = time.Now()
|
||||
c.receiveWindow = c.bytesRead + c.receiveWindowIncrement
|
||||
return true, newWindowIncrement, c.receiveWindow
|
||||
}
|
||||
|
||||
return false, 0, 0
|
||||
}
|
||||
|
||||
// maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending WindowUpdates too often
|
||||
func (c *flowController) maybeAdjustWindowIncrement() {
|
||||
if c.lastWindowUpdateTime.IsZero() {
|
||||
return
|
||||
}
|
||||
|
||||
rtt := c.rttStats.SmoothedRTT()
|
||||
if rtt == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
timeSinceLastWindowUpdate := time.Since(c.lastWindowUpdateTime)
|
||||
|
||||
// interval between the window updates is sufficiently large, no need to increase the increment
|
||||
if timeSinceLastWindowUpdate >= 2*rtt {
|
||||
return
|
||||
}
|
||||
|
||||
oldWindowSize := c.receiveWindowIncrement
|
||||
c.receiveWindowIncrement = utils.MinByteCount(2*c.receiveWindowIncrement, c.maxReceiveWindowIncrement)
|
||||
|
||||
// debug log, if the window size was actually increased
|
||||
if oldWindowSize < c.receiveWindowIncrement {
|
||||
newWindowSize := c.receiveWindowIncrement / (1 << 10)
|
||||
if c.streamID == 0 {
|
||||
utils.Debugf("Increasing receive flow control window for the connection to %d kB", newWindowSize)
|
||||
} else {
|
||||
utils.Debugf("Increasing receive flow control window increment for stream %d to %d kB", c.streamID, newWindowSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// EnsureMinimumWindowIncrement sets a minimum window increment
|
||||
// it is intended be used for the connection-level flow controller
|
||||
// it should make sure that the connection-level window is increased when a stream-level window grows
|
||||
func (c *flowController) EnsureMinimumWindowIncrement(inc protocol.ByteCount) {
|
||||
if inc > c.receiveWindowIncrement {
|
||||
c.receiveWindowIncrement = utils.MinByteCount(inc, c.maxReceiveWindowIncrement)
|
||||
c.lastWindowUpdateTime = time.Time{} // disables autotuning for the next window update
|
||||
}
|
||||
}
|
||||
|
||||
func (c *flowController) CheckFlowControlViolation() bool {
|
||||
return c.highestReceived > c.receiveWindow
|
||||
}
|
||||
@@ -1,277 +0,0 @@
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/congestion"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Flow controller", func() {
|
||||
var controller *flowController
|
||||
|
||||
BeforeEach(func() {
|
||||
controller = &flowController{}
|
||||
controller.rttStats = &congestion.RTTStats{}
|
||||
})
|
||||
|
||||
Context("Constructor", func() {
|
||||
rttStats := &congestion.RTTStats{}
|
||||
|
||||
It("sets the send and receive windows", func() {
|
||||
receiveWindow := protocol.ByteCount(2000)
|
||||
maxReceiveWindow := protocol.ByteCount(3000)
|
||||
sendWindow := protocol.ByteCount(4000)
|
||||
fc := newFlowController(5, true, receiveWindow, maxReceiveWindow, sendWindow, rttStats)
|
||||
Expect(fc.streamID).To(Equal(protocol.StreamID(5)))
|
||||
Expect(fc.receiveWindow).To(Equal(receiveWindow))
|
||||
Expect(fc.maxReceiveWindowIncrement).To(Equal(maxReceiveWindow))
|
||||
Expect(fc.sendWindow).To(Equal(sendWindow))
|
||||
})
|
||||
|
||||
It("says if it contributes to connection-level flow control", func() {
|
||||
fc := newFlowController(1, false, protocol.MaxByteCount, protocol.MaxByteCount, protocol.MaxByteCount, rttStats)
|
||||
Expect(fc.ContributesToConnection()).To(BeFalse())
|
||||
fc = newFlowController(5, true, protocol.MaxByteCount, protocol.MaxByteCount, protocol.MaxByteCount, rttStats)
|
||||
Expect(fc.ContributesToConnection()).To(BeTrue())
|
||||
})
|
||||
})
|
||||
|
||||
Context("send flow control", func() {
|
||||
It("adds bytes sent", func() {
|
||||
controller.bytesSent = 5
|
||||
controller.AddBytesSent(6)
|
||||
Expect(controller.bytesSent).To(Equal(protocol.ByteCount(5 + 6)))
|
||||
})
|
||||
|
||||
It("gets the size of the remaining flow control window", func() {
|
||||
controller.bytesSent = 5
|
||||
controller.sendWindow = 12
|
||||
Expect(controller.SendWindowSize()).To(Equal(protocol.ByteCount(12 - 5)))
|
||||
})
|
||||
|
||||
It("gets the offset of the flow control window", func() {
|
||||
controller.bytesSent = 5
|
||||
controller.sendWindow = 12
|
||||
Expect(controller.sendWindow).To(Equal(protocol.ByteCount(12)))
|
||||
})
|
||||
|
||||
It("updates the size of the flow control window", func() {
|
||||
controller.bytesSent = 5
|
||||
updateSuccessful := controller.UpdateSendWindow(15)
|
||||
Expect(updateSuccessful).To(BeTrue())
|
||||
Expect(controller.sendWindow).To(Equal(protocol.ByteCount(15)))
|
||||
Expect(controller.SendWindowSize()).To(Equal(protocol.ByteCount(15 - 5)))
|
||||
})
|
||||
|
||||
It("does not decrease the flow control window", func() {
|
||||
updateSuccessful := controller.UpdateSendWindow(20)
|
||||
Expect(updateSuccessful).To(BeTrue())
|
||||
Expect(controller.SendWindowSize()).To(Equal(protocol.ByteCount(20)))
|
||||
updateSuccessful = controller.UpdateSendWindow(10)
|
||||
Expect(updateSuccessful).To(BeFalse())
|
||||
Expect(controller.SendWindowSize()).To(Equal(protocol.ByteCount(20)))
|
||||
})
|
||||
})
|
||||
|
||||
Context("receive flow control", func() {
|
||||
var receiveWindow protocol.ByteCount = 10000
|
||||
var receiveWindowIncrement protocol.ByteCount = 600
|
||||
|
||||
BeforeEach(func() {
|
||||
controller.receiveWindow = receiveWindow
|
||||
controller.receiveWindowIncrement = receiveWindowIncrement
|
||||
})
|
||||
|
||||
It("adds bytes read", func() {
|
||||
controller.bytesRead = 5
|
||||
controller.AddBytesRead(6)
|
||||
Expect(controller.bytesRead).To(Equal(protocol.ByteCount(5 + 6)))
|
||||
})
|
||||
|
||||
It("triggers a window update when necessary", func() {
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-time.Hour)
|
||||
readPosition := receiveWindow - receiveWindowIncrement/2 + 1
|
||||
controller.bytesRead = readPosition
|
||||
updateNecessary, _, offset := controller.MaybeUpdateWindow()
|
||||
Expect(updateNecessary).To(BeTrue())
|
||||
Expect(offset).To(Equal(readPosition + receiveWindowIncrement))
|
||||
Expect(controller.receiveWindow).To(Equal(readPosition + receiveWindowIncrement))
|
||||
Expect(controller.lastWindowUpdateTime).To(BeTemporally("~", time.Now(), 20*time.Millisecond))
|
||||
})
|
||||
|
||||
It("doesn't trigger a window update when not necessary", func() {
|
||||
lastWindowUpdateTime := time.Now().Add(-time.Hour)
|
||||
controller.lastWindowUpdateTime = lastWindowUpdateTime
|
||||
readPosition := receiveWindow - receiveWindow/2 - 1
|
||||
controller.bytesRead = readPosition
|
||||
updateNecessary, _, _ := controller.MaybeUpdateWindow()
|
||||
Expect(updateNecessary).To(BeFalse())
|
||||
Expect(controller.lastWindowUpdateTime).To(Equal(lastWindowUpdateTime))
|
||||
})
|
||||
|
||||
It("updates the highestReceived", func() {
|
||||
controller.highestReceived = 1337
|
||||
increment, err := controller.UpdateHighestReceived(1338)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(increment).To(Equal(protocol.ByteCount(1338 - 1337)))
|
||||
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1338)))
|
||||
})
|
||||
|
||||
It("does not decrease the highestReceived", func() {
|
||||
controller.highestReceived = 1337
|
||||
increment, err := controller.UpdateHighestReceived(1000)
|
||||
Expect(err).To(MatchError(ErrReceivedSmallerByteOffset))
|
||||
Expect(increment).To(BeZero())
|
||||
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337)))
|
||||
})
|
||||
|
||||
It("does not error when setting the same byte offset", func() {
|
||||
controller.highestReceived = 1337
|
||||
increment, err := controller.UpdateHighestReceived(1337)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(increment).To(BeZero())
|
||||
})
|
||||
|
||||
It("increases the highestReceived by a given increment", func() {
|
||||
controller.highestReceived = 1337
|
||||
controller.IncrementHighestReceived(123)
|
||||
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337 + 123)))
|
||||
})
|
||||
|
||||
It("detects a flow control violation", func() {
|
||||
controller.UpdateHighestReceived(receiveWindow + 1)
|
||||
Expect(controller.CheckFlowControlViolation()).To(BeTrue())
|
||||
})
|
||||
|
||||
It("does not give a flow control violation when using the window completely", func() {
|
||||
controller.UpdateHighestReceived(receiveWindow)
|
||||
Expect(controller.CheckFlowControlViolation()).To(BeFalse())
|
||||
})
|
||||
|
||||
Context("receive window increment auto-tuning", func() {
|
||||
var oldIncrement protocol.ByteCount
|
||||
|
||||
BeforeEach(func() {
|
||||
oldIncrement = controller.receiveWindowIncrement
|
||||
controller.maxReceiveWindowIncrement = 3000
|
||||
})
|
||||
|
||||
// update the congestion such that it returns a given value for the smoothed RTT
|
||||
setRtt := func(t time.Duration) {
|
||||
controller.rttStats.UpdateRTT(t, 0, time.Now())
|
||||
Expect(controller.rttStats.SmoothedRTT()).To(Equal(t)) // make sure it worked
|
||||
}
|
||||
|
||||
It("doesn't increase the increment for a new stream", func() {
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
|
||||
})
|
||||
|
||||
It("doesn't increase the increment when no RTT estimate is available", func() {
|
||||
setRtt(0)
|
||||
controller.lastWindowUpdateTime = time.Now()
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
|
||||
})
|
||||
|
||||
It("increases the increment when the last WindowUpdate was sent less than two RTTs ago", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond)
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement))
|
||||
})
|
||||
|
||||
It("doesn't increase the increase increment when the last WindowUpdate was sent more than two RTTs ago", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-45 * time.Millisecond)
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
|
||||
})
|
||||
|
||||
It("doesn't increase the increment to a value higher than the maxReceiveWindowIncrement", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond)
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement)) // 1200
|
||||
// because the lastWindowUpdateTime is updated by MaybeTriggerWindowUpdate(), we can just call maybeAdjustWindowIncrement() multiple times and get an increase of the increment every time
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(2 * 2 * oldIncrement)) // 2400
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(controller.maxReceiveWindowIncrement)) // 3000
|
||||
controller.maybeAdjustWindowIncrement()
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(controller.maxReceiveWindowIncrement)) // 3000
|
||||
})
|
||||
|
||||
It("returns the new increment when updating the window", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.AddBytesRead(9900) // receive window is 10000
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond)
|
||||
necessary, newIncrement, offset := controller.MaybeUpdateWindow()
|
||||
Expect(necessary).To(BeTrue())
|
||||
Expect(newIncrement).To(Equal(2 * oldIncrement))
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(newIncrement))
|
||||
Expect(offset).To(Equal(protocol.ByteCount(9900 + newIncrement)))
|
||||
})
|
||||
|
||||
It("increases the increment sent in the first WindowUpdate, if data is read fast enough", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.AddBytesRead(9900)
|
||||
necessary, newIncrement, _ := controller.MaybeUpdateWindow()
|
||||
Expect(necessary).To(BeTrue())
|
||||
Expect(newIncrement).To(Equal(2 * oldIncrement))
|
||||
})
|
||||
|
||||
It("doesn't increamse the increment sent in the first WindowUpdate, if data is read slowly", func() {
|
||||
setRtt(5 * time.Millisecond)
|
||||
controller.AddBytesRead(9900)
|
||||
time.Sleep(15 * time.Millisecond) // more than 2x RTT
|
||||
necessary, newIncrement, _ := controller.MaybeUpdateWindow()
|
||||
Expect(necessary).To(BeTrue())
|
||||
Expect(newIncrement).To(BeZero())
|
||||
})
|
||||
|
||||
It("only returns the increment if it was increased", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.AddBytesRead(9900) // receive window is 10000
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-45 * time.Millisecond)
|
||||
necessary, newIncrement, offset := controller.MaybeUpdateWindow()
|
||||
Expect(necessary).To(BeTrue())
|
||||
Expect(newIncrement).To(BeZero())
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
|
||||
Expect(offset).To(Equal(protocol.ByteCount(9900 + oldIncrement)))
|
||||
})
|
||||
|
||||
Context("setting the minimum increment", func() {
|
||||
It("sets the minimum window increment", func() {
|
||||
controller.EnsureMinimumWindowIncrement(1000)
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(protocol.ByteCount(1000)))
|
||||
})
|
||||
|
||||
It("doesn't reduce the window increment", func() {
|
||||
controller.EnsureMinimumWindowIncrement(1)
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(oldIncrement))
|
||||
})
|
||||
|
||||
It("doens't increase the increment beyong the maxReceiveWindowIncrement", func() {
|
||||
max := controller.maxReceiveWindowIncrement
|
||||
controller.EnsureMinimumWindowIncrement(2 * max)
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(max))
|
||||
})
|
||||
|
||||
It("doesn't auto-tune the window after the increment was increased", func() {
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.bytesRead = 9900 // receive window is 10000
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-20 * time.Millisecond)
|
||||
controller.EnsureMinimumWindowIncrement(912)
|
||||
necessary, newIncrement, offset := controller.MaybeUpdateWindow()
|
||||
Expect(necessary).To(BeTrue())
|
||||
Expect(newIncrement).To(BeZero()) // no auto-tuning
|
||||
Expect(offset).To(Equal(protocol.ByteCount(9900 + 912)))
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -1,28 +1,37 @@
|
||||
package flowcontrol
|
||||
|
||||
import "github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
import "github.com/lucas-clemente/quic-go/internal/handshake"
|
||||
|
||||
// WindowUpdate provides the data for WindowUpdateFrames.
|
||||
type WindowUpdate struct {
|
||||
StreamID protocol.StreamID
|
||||
Offset protocol.ByteCount
|
||||
type flowController interface {
|
||||
// for sending
|
||||
SendWindowSize() protocol.ByteCount
|
||||
IsBlocked() bool
|
||||
UpdateSendWindow(protocol.ByteCount)
|
||||
AddBytesSent(protocol.ByteCount)
|
||||
// for receiving
|
||||
AddBytesRead(protocol.ByteCount)
|
||||
GetWindowUpdate() protocol.ByteCount // returns 0 if no update is necessary
|
||||
}
|
||||
|
||||
// A FlowControlManager manages the flow control
|
||||
type FlowControlManager interface {
|
||||
NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool)
|
||||
RemoveStream(streamID protocol.StreamID)
|
||||
UpdateTransportParameters(*handshake.TransportParameters)
|
||||
// methods needed for receiving data
|
||||
ResetStream(streamID protocol.StreamID, byteOffset protocol.ByteCount) error
|
||||
UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error
|
||||
AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error
|
||||
GetWindowUpdates() []WindowUpdate
|
||||
GetReceiveWindow(streamID protocol.StreamID) (protocol.ByteCount, error)
|
||||
// methods needed for sending data
|
||||
AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error
|
||||
SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error)
|
||||
RemainingConnectionWindowSize() protocol.ByteCount
|
||||
UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error)
|
||||
// A StreamFlowController is a flow controller for a QUIC stream.
|
||||
type StreamFlowController interface {
|
||||
flowController
|
||||
// for receiving
|
||||
// UpdateHighestReceived should be called when a new highest offset is received
|
||||
// final has to be to true if this is the final offset of the stream, as contained in a STREAM frame with FIN bit, and the RST_STREAM frame
|
||||
UpdateHighestReceived(offset protocol.ByteCount, final bool) error
|
||||
}
|
||||
|
||||
// The ConnectionFlowController is the flow controller for the connection.
|
||||
type ConnectionFlowController interface {
|
||||
flowController
|
||||
}
|
||||
|
||||
type connectionFlowControllerI interface {
|
||||
ConnectionFlowController
|
||||
// The following two methods are not supposed to be called from outside this packet, but are needed internally
|
||||
// for sending
|
||||
EnsureMinimumWindowIncrement(protocol.ByteCount)
|
||||
// for receiving
|
||||
IncrementHighestReceived(protocol.ByteCount) error
|
||||
}
|
||||
|
||||
116
internal/flowcontrol/stream_flow_controller.go
Normal file
116
internal/flowcontrol/stream_flow_controller.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/congestion"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
"github.com/lucas-clemente/quic-go/internal/utils"
|
||||
"github.com/lucas-clemente/quic-go/qerr"
|
||||
)
|
||||
|
||||
type streamFlowController struct {
|
||||
baseFlowController
|
||||
|
||||
connection connectionFlowControllerI
|
||||
|
||||
streamID protocol.StreamID
|
||||
contributesToConnection bool // does the stream contribute to connection level flow control
|
||||
}
|
||||
|
||||
var _ StreamFlowController = &streamFlowController{}
|
||||
|
||||
// NewStreamFlowController gets a new flow controller for a stream
|
||||
func NewStreamFlowController(
|
||||
streamID protocol.StreamID,
|
||||
contributesToConnection bool,
|
||||
cfc ConnectionFlowController,
|
||||
receiveWindow protocol.ByteCount,
|
||||
maxReceiveWindow protocol.ByteCount,
|
||||
initialSendWindow protocol.ByteCount,
|
||||
rttStats *congestion.RTTStats,
|
||||
) StreamFlowController {
|
||||
return &streamFlowController{
|
||||
streamID: streamID,
|
||||
contributesToConnection: contributesToConnection,
|
||||
connection: cfc.(connectionFlowControllerI),
|
||||
baseFlowController: baseFlowController{
|
||||
rttStats: rttStats,
|
||||
receiveWindow: receiveWindow,
|
||||
receiveWindowIncrement: receiveWindow,
|
||||
maxReceiveWindowIncrement: maxReceiveWindow,
|
||||
sendWindow: initialSendWindow,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateHighestReceived updates the highestReceived value, if the byteOffset is higher
|
||||
// it returns an ErrReceivedSmallerByteOffset if the received byteOffset is smaller than any byteOffset received before
|
||||
func (c *streamFlowController) UpdateHighestReceived(byteOffset protocol.ByteCount, final bool) error {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
// TODO(#382): check for StreamDataAfterTermination errors, when receiving an offset after we already received a final offset
|
||||
if byteOffset == c.highestReceived {
|
||||
return nil
|
||||
}
|
||||
if byteOffset <= c.highestReceived {
|
||||
// a STREAM_FRAME with a higher offset was received before.
|
||||
if final {
|
||||
// If the current byteOffset is smaller than the offset in that STREAM_FRAME, this STREAM_FRAME contained data after the end of the stream
|
||||
return qerr.StreamDataAfterTermination
|
||||
}
|
||||
// this is a reordered STREAM_FRAME
|
||||
return nil
|
||||
}
|
||||
|
||||
increment := byteOffset - c.highestReceived
|
||||
c.highestReceived = byteOffset
|
||||
if c.checkFlowControlViolation() {
|
||||
return qerr.Error(qerr.FlowControlReceivedTooMuchData, fmt.Sprintf("Received %d bytes on stream %d, allowed %d bytes", byteOffset, c.streamID, c.receiveWindow))
|
||||
}
|
||||
if c.contributesToConnection {
|
||||
return c.connection.IncrementHighestReceived(increment)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *streamFlowController) AddBytesRead(n protocol.ByteCount) {
|
||||
c.baseFlowController.AddBytesRead(n)
|
||||
if c.contributesToConnection {
|
||||
c.connection.AddBytesRead(n)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *streamFlowController) AddBytesSent(n protocol.ByteCount) {
|
||||
c.baseFlowController.AddBytesSent(n)
|
||||
if c.contributesToConnection {
|
||||
c.connection.AddBytesSent(n)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *streamFlowController) SendWindowSize() protocol.ByteCount {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
window := c.baseFlowController.sendWindowSize()
|
||||
if c.contributesToConnection {
|
||||
window = utils.MinByteCount(window, c.connection.SendWindowSize())
|
||||
}
|
||||
return window
|
||||
}
|
||||
|
||||
func (c *streamFlowController) GetWindowUpdate() protocol.ByteCount {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
oldWindowIncrement := c.receiveWindowIncrement
|
||||
offset := c.baseFlowController.getWindowUpdate()
|
||||
if c.receiveWindowIncrement > oldWindowIncrement { // auto-tuning enlarged the window increment
|
||||
utils.Debugf("Increasing receive flow control window for the connection to %d kB", c.receiveWindowIncrement/(1<<10))
|
||||
if c.contributesToConnection {
|
||||
c.connection.EnsureMinimumWindowIncrement(protocol.ByteCount(float64(c.receiveWindowIncrement) * protocol.ConnectionFlowControlMultiplier))
|
||||
}
|
||||
}
|
||||
return offset
|
||||
}
|
||||
201
internal/flowcontrol/stream_flow_controller_test.go
Normal file
201
internal/flowcontrol/stream_flow_controller_test.go
Normal file
@@ -0,0 +1,201 @@
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/congestion"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
"github.com/lucas-clemente/quic-go/qerr"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Stream Flow controller", func() {
|
||||
var controller *streamFlowController
|
||||
|
||||
BeforeEach(func() {
|
||||
rttStats := &congestion.RTTStats{}
|
||||
controller = &streamFlowController{
|
||||
streamID: 10,
|
||||
connection: NewConnectionFlowController(1000, 1000, rttStats).(*connectionFlowController),
|
||||
}
|
||||
controller.maxReceiveWindowIncrement = 10000
|
||||
controller.rttStats = rttStats
|
||||
})
|
||||
|
||||
Context("Constructor", func() {
|
||||
rttStats := &congestion.RTTStats{}
|
||||
|
||||
It("sets the send and receive windows", func() {
|
||||
receiveWindow := protocol.ByteCount(2000)
|
||||
maxReceiveWindow := protocol.ByteCount(3000)
|
||||
sendWindow := protocol.ByteCount(4000)
|
||||
|
||||
cc := NewConnectionFlowController(0, 0, nil)
|
||||
fc := NewStreamFlowController(5, true, cc, receiveWindow, maxReceiveWindow, sendWindow, rttStats).(*streamFlowController)
|
||||
Expect(fc.streamID).To(Equal(protocol.StreamID(5)))
|
||||
Expect(fc.receiveWindow).To(Equal(receiveWindow))
|
||||
Expect(fc.maxReceiveWindowIncrement).To(Equal(maxReceiveWindow))
|
||||
Expect(fc.sendWindow).To(Equal(sendWindow))
|
||||
Expect(fc.contributesToConnection).To(BeTrue())
|
||||
})
|
||||
})
|
||||
|
||||
Context("receiving data", func() {
|
||||
Context("registering received offsets", func() {
|
||||
var receiveWindow protocol.ByteCount = 10000
|
||||
var receiveWindowIncrement protocol.ByteCount = 600
|
||||
|
||||
BeforeEach(func() {
|
||||
controller.receiveWindow = receiveWindow
|
||||
controller.receiveWindowIncrement = receiveWindowIncrement
|
||||
})
|
||||
|
||||
It("updates the highestReceived", func() {
|
||||
controller.highestReceived = 1337
|
||||
err := controller.UpdateHighestReceived(1338, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1338)))
|
||||
})
|
||||
|
||||
It("informs the connection flow controller about received data", func() {
|
||||
controller.highestReceived = 10
|
||||
controller.contributesToConnection = true
|
||||
controller.connection.(*connectionFlowController).highestReceived = 100
|
||||
err := controller.UpdateHighestReceived(20, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.connection.(*connectionFlowController).highestReceived).To(Equal(protocol.ByteCount(100 + 10)))
|
||||
})
|
||||
|
||||
It("doesn't informs the connection flow controller about received data if it doesn't contribute", func() {
|
||||
controller.highestReceived = 10
|
||||
controller.connection.(*connectionFlowController).highestReceived = 100
|
||||
err := controller.UpdateHighestReceived(20, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.connection.(*connectionFlowController).highestReceived).To(Equal(protocol.ByteCount(100)))
|
||||
})
|
||||
|
||||
It("does not decrease the highestReceived", func() {
|
||||
controller.highestReceived = 1337
|
||||
err := controller.UpdateHighestReceived(1000, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337)))
|
||||
})
|
||||
|
||||
It("does nothing when setting the same byte offset", func() {
|
||||
controller.highestReceived = 1337
|
||||
err := controller.UpdateHighestReceived(1337, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
It("does not give a flow control violation when using the window completely", func() {
|
||||
err := controller.UpdateHighestReceived(receiveWindow, false)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
It("detects a flow control violation", func() {
|
||||
err := controller.UpdateHighestReceived(receiveWindow+1, false)
|
||||
Expect(err).To(MatchError("FlowControlReceivedTooMuchData: Received 10001 bytes on stream 10, allowed 10000 bytes"))
|
||||
})
|
||||
|
||||
It("accepts a final offset higher than the highest received", func() {
|
||||
controller.highestReceived = 100
|
||||
err := controller.UpdateHighestReceived(101, true)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(101)))
|
||||
})
|
||||
|
||||
It("errors when receiving a final offset smaller than the highest offset received so far", func() {
|
||||
controller.highestReceived = 100
|
||||
err := controller.UpdateHighestReceived(99, true)
|
||||
Expect(err).To(MatchError(qerr.StreamDataAfterTermination))
|
||||
})
|
||||
})
|
||||
|
||||
Context("registering data read", func() {
|
||||
It("saves when data is read, on a stream not contributing to the connection", func() {
|
||||
controller.AddBytesRead(100)
|
||||
Expect(controller.bytesRead).To(Equal(protocol.ByteCount(100)))
|
||||
Expect(controller.connection.(*connectionFlowController).bytesRead).To(BeZero())
|
||||
})
|
||||
|
||||
It("saves when data is read, on a stream not contributing to the connection", func() {
|
||||
controller.contributesToConnection = true
|
||||
controller.AddBytesRead(200)
|
||||
Expect(controller.bytesRead).To(Equal(protocol.ByteCount(200)))
|
||||
Expect(controller.connection.(*connectionFlowController).bytesRead).To(Equal(protocol.ByteCount(200)))
|
||||
})
|
||||
})
|
||||
|
||||
Context("generating window updates", func() {
|
||||
var oldIncrement protocol.ByteCount
|
||||
|
||||
// update the congestion such that it returns a given value for the smoothed RTT
|
||||
setRtt := func(t time.Duration) {
|
||||
controller.rttStats.UpdateRTT(t, 0, time.Now())
|
||||
Expect(controller.rttStats.SmoothedRTT()).To(Equal(t)) // make sure it worked
|
||||
}
|
||||
|
||||
BeforeEach(func() {
|
||||
controller.receiveWindow = 100
|
||||
controller.receiveWindowIncrement = 60
|
||||
controller.connection.(*connectionFlowController).receiveWindowIncrement = 120
|
||||
oldIncrement = controller.receiveWindowIncrement
|
||||
})
|
||||
|
||||
It("tells the connection flow controller when the window was autotuned", func() {
|
||||
controller.contributesToConnection = true
|
||||
controller.AddBytesRead(75)
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond)
|
||||
offset := controller.GetWindowUpdate()
|
||||
Expect(offset).To(Equal(protocol.ByteCount(75 + 2*60)))
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement))
|
||||
Expect(controller.connection.(*connectionFlowController).receiveWindowIncrement).To(Equal(protocol.ByteCount(float64(controller.receiveWindowIncrement) * protocol.ConnectionFlowControlMultiplier)))
|
||||
})
|
||||
|
||||
It("doesn't tell the connection flow controller if it doesn't contribute", func() {
|
||||
controller.contributesToConnection = false
|
||||
controller.AddBytesRead(75)
|
||||
setRtt(20 * time.Millisecond)
|
||||
controller.lastWindowUpdateTime = time.Now().Add(-35 * time.Millisecond)
|
||||
offset := controller.GetWindowUpdate()
|
||||
Expect(offset).ToNot(BeZero())
|
||||
Expect(controller.receiveWindowIncrement).To(Equal(2 * oldIncrement))
|
||||
Expect(controller.connection.(*connectionFlowController).receiveWindowIncrement).To(Equal(protocol.ByteCount(120))) // unchanged
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Context("sending data", func() {
|
||||
It("gets the size of the send window", func() {
|
||||
controller.UpdateSendWindow(15)
|
||||
controller.AddBytesSent(5)
|
||||
Expect(controller.SendWindowSize()).To(Equal(protocol.ByteCount(10)))
|
||||
})
|
||||
|
||||
It("doesn't care about the connection-level window, if it doesn't contribute", func() {
|
||||
controller.UpdateSendWindow(15)
|
||||
controller.connection.UpdateSendWindow(1)
|
||||
controller.AddBytesSent(5)
|
||||
Expect(controller.SendWindowSize()).To(Equal(protocol.ByteCount(10)))
|
||||
})
|
||||
|
||||
It("makes sure that it doesn't overflow the connection-level window", func() {
|
||||
controller.contributesToConnection = true
|
||||
controller.connection.UpdateSendWindow(12)
|
||||
controller.UpdateSendWindow(20)
|
||||
controller.AddBytesSent(10)
|
||||
Expect(controller.SendWindowSize()).To(Equal(protocol.ByteCount(2)))
|
||||
})
|
||||
|
||||
It("doesn't say that it's blocked, if only the connection is blocked", func() {
|
||||
controller.contributesToConnection = true
|
||||
controller.connection.UpdateSendWindow(50)
|
||||
controller.UpdateSendWindow(100)
|
||||
controller.AddBytesSent(50)
|
||||
Expect(controller.connection.IsBlocked()).To(BeTrue())
|
||||
Expect(controller.IsBlocked()).To(BeFalse())
|
||||
})
|
||||
})
|
||||
})
|
||||
100
internal/mocks/connection_flow_controller.go
Normal file
100
internal/mocks/connection_flow_controller.go
Normal file
@@ -0,0 +1,100 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/lucas-clemente/quic-go/internal/flowcontrol (interfaces: ConnectionFlowController)
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
protocol "github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
)
|
||||
|
||||
// MockConnectionFlowController is a mock of ConnectionFlowController interface
|
||||
type MockConnectionFlowController struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockConnectionFlowControllerMockRecorder
|
||||
}
|
||||
|
||||
// MockConnectionFlowControllerMockRecorder is the mock recorder for MockConnectionFlowController
|
||||
type MockConnectionFlowControllerMockRecorder struct {
|
||||
mock *MockConnectionFlowController
|
||||
}
|
||||
|
||||
// NewMockConnectionFlowController creates a new mock instance
|
||||
func NewMockConnectionFlowController(ctrl *gomock.Controller) *MockConnectionFlowController {
|
||||
mock := &MockConnectionFlowController{ctrl: ctrl}
|
||||
mock.recorder = &MockConnectionFlowControllerMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (_m *MockConnectionFlowController) EXPECT() *MockConnectionFlowControllerMockRecorder {
|
||||
return _m.recorder
|
||||
}
|
||||
|
||||
// AddBytesRead mocks base method
|
||||
func (_m *MockConnectionFlowController) AddBytesRead(_param0 protocol.ByteCount) {
|
||||
_m.ctrl.Call(_m, "AddBytesRead", _param0)
|
||||
}
|
||||
|
||||
// AddBytesRead indicates an expected call of AddBytesRead
|
||||
func (_mr *MockConnectionFlowControllerMockRecorder) AddBytesRead(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "AddBytesRead", reflect.TypeOf((*MockConnectionFlowController)(nil).AddBytesRead), arg0)
|
||||
}
|
||||
|
||||
// AddBytesSent mocks base method
|
||||
func (_m *MockConnectionFlowController) AddBytesSent(_param0 protocol.ByteCount) {
|
||||
_m.ctrl.Call(_m, "AddBytesSent", _param0)
|
||||
}
|
||||
|
||||
// AddBytesSent indicates an expected call of AddBytesSent
|
||||
func (_mr *MockConnectionFlowControllerMockRecorder) AddBytesSent(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "AddBytesSent", reflect.TypeOf((*MockConnectionFlowController)(nil).AddBytesSent), arg0)
|
||||
}
|
||||
|
||||
// GetWindowUpdate mocks base method
|
||||
func (_m *MockConnectionFlowController) GetWindowUpdate() protocol.ByteCount {
|
||||
ret := _m.ctrl.Call(_m, "GetWindowUpdate")
|
||||
ret0, _ := ret[0].(protocol.ByteCount)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// GetWindowUpdate indicates an expected call of GetWindowUpdate
|
||||
func (_mr *MockConnectionFlowControllerMockRecorder) GetWindowUpdate() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetWindowUpdate", reflect.TypeOf((*MockConnectionFlowController)(nil).GetWindowUpdate))
|
||||
}
|
||||
|
||||
// IsBlocked mocks base method
|
||||
func (_m *MockConnectionFlowController) IsBlocked() bool {
|
||||
ret := _m.ctrl.Call(_m, "IsBlocked")
|
||||
ret0, _ := ret[0].(bool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// IsBlocked indicates an expected call of IsBlocked
|
||||
func (_mr *MockConnectionFlowControllerMockRecorder) IsBlocked() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsBlocked", reflect.TypeOf((*MockConnectionFlowController)(nil).IsBlocked))
|
||||
}
|
||||
|
||||
// SendWindowSize mocks base method
|
||||
func (_m *MockConnectionFlowController) SendWindowSize() protocol.ByteCount {
|
||||
ret := _m.ctrl.Call(_m, "SendWindowSize")
|
||||
ret0, _ := ret[0].(protocol.ByteCount)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SendWindowSize indicates an expected call of SendWindowSize
|
||||
func (_mr *MockConnectionFlowControllerMockRecorder) SendWindowSize() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "SendWindowSize", reflect.TypeOf((*MockConnectionFlowController)(nil).SendWindowSize))
|
||||
}
|
||||
|
||||
// UpdateSendWindow mocks base method
|
||||
func (_m *MockConnectionFlowController) UpdateSendWindow(_param0 protocol.ByteCount) {
|
||||
_m.ctrl.Call(_m, "UpdateSendWindow", _param0)
|
||||
}
|
||||
|
||||
// UpdateSendWindow indicates an expected call of UpdateSendWindow
|
||||
func (_mr *MockConnectionFlowControllerMockRecorder) UpdateSendWindow(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateSendWindow", reflect.TypeOf((*MockConnectionFlowController)(nil).UpdateSendWindow), arg0)
|
||||
}
|
||||
@@ -1,177 +0,0 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: ../flowcontrol/interface.go
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
|
||||
handshake "github.com/lucas-clemente/quic-go/internal/handshake"
|
||||
protocol "github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
)
|
||||
|
||||
// MockFlowControlManager is a mock of FlowControlManager interface
|
||||
type MockFlowControlManager struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockFlowControlManagerMockRecorder
|
||||
}
|
||||
|
||||
// MockFlowControlManagerMockRecorder is the mock recorder for MockFlowControlManager
|
||||
type MockFlowControlManagerMockRecorder struct {
|
||||
mock *MockFlowControlManager
|
||||
}
|
||||
|
||||
// NewMockFlowControlManager creates a new mock instance
|
||||
func NewMockFlowControlManager(ctrl *gomock.Controller) *MockFlowControlManager {
|
||||
mock := &MockFlowControlManager{ctrl: ctrl}
|
||||
mock.recorder = &MockFlowControlManagerMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (_m *MockFlowControlManager) EXPECT() *MockFlowControlManagerMockRecorder {
|
||||
return _m.recorder
|
||||
}
|
||||
|
||||
// NewStream mocks base method
|
||||
func (_m *MockFlowControlManager) NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool) {
|
||||
_m.ctrl.Call(_m, "NewStream", streamID, contributesToConnectionFlow)
|
||||
}
|
||||
|
||||
// NewStream indicates an expected call of NewStream
|
||||
func (_mr *MockFlowControlManagerMockRecorder) NewStream(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "NewStream", reflect.TypeOf((*MockFlowControlManager)(nil).NewStream), arg0, arg1)
|
||||
}
|
||||
|
||||
// RemoveStream mocks base method
|
||||
func (_m *MockFlowControlManager) RemoveStream(streamID protocol.StreamID) {
|
||||
_m.ctrl.Call(_m, "RemoveStream", streamID)
|
||||
}
|
||||
|
||||
// RemoveStream indicates an expected call of RemoveStream
|
||||
func (_mr *MockFlowControlManagerMockRecorder) RemoveStream(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "RemoveStream", reflect.TypeOf((*MockFlowControlManager)(nil).RemoveStream), arg0)
|
||||
}
|
||||
|
||||
// UpdateTransportParameters mocks base method
|
||||
func (_m *MockFlowControlManager) UpdateTransportParameters(_param0 *handshake.TransportParameters) {
|
||||
_m.ctrl.Call(_m, "UpdateTransportParameters", _param0)
|
||||
}
|
||||
|
||||
// UpdateTransportParameters indicates an expected call of UpdateTransportParameters
|
||||
func (_mr *MockFlowControlManagerMockRecorder) UpdateTransportParameters(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateTransportParameters", reflect.TypeOf((*MockFlowControlManager)(nil).UpdateTransportParameters), arg0)
|
||||
}
|
||||
|
||||
// ResetStream mocks base method
|
||||
func (_m *MockFlowControlManager) ResetStream(streamID protocol.StreamID, byteOffset protocol.ByteCount) error {
|
||||
ret := _m.ctrl.Call(_m, "ResetStream", streamID, byteOffset)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// ResetStream indicates an expected call of ResetStream
|
||||
func (_mr *MockFlowControlManagerMockRecorder) ResetStream(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "ResetStream", reflect.TypeOf((*MockFlowControlManager)(nil).ResetStream), arg0, arg1)
|
||||
}
|
||||
|
||||
// UpdateHighestReceived mocks base method
|
||||
func (_m *MockFlowControlManager) UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error {
|
||||
ret := _m.ctrl.Call(_m, "UpdateHighestReceived", streamID, byteOffset)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// UpdateHighestReceived indicates an expected call of UpdateHighestReceived
|
||||
func (_mr *MockFlowControlManagerMockRecorder) UpdateHighestReceived(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateHighestReceived", reflect.TypeOf((*MockFlowControlManager)(nil).UpdateHighestReceived), arg0, arg1)
|
||||
}
|
||||
|
||||
// AddBytesRead mocks base method
|
||||
func (_m *MockFlowControlManager) AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error {
|
||||
ret := _m.ctrl.Call(_m, "AddBytesRead", streamID, n)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// AddBytesRead indicates an expected call of AddBytesRead
|
||||
func (_mr *MockFlowControlManagerMockRecorder) AddBytesRead(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "AddBytesRead", reflect.TypeOf((*MockFlowControlManager)(nil).AddBytesRead), arg0, arg1)
|
||||
}
|
||||
|
||||
// GetWindowUpdates mocks base method
|
||||
func (_m *MockFlowControlManager) GetWindowUpdates() []flowcontrol.WindowUpdate {
|
||||
ret := _m.ctrl.Call(_m, "GetWindowUpdates")
|
||||
ret0, _ := ret[0].([]flowcontrol.WindowUpdate)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// GetWindowUpdates indicates an expected call of GetWindowUpdates
|
||||
func (_mr *MockFlowControlManagerMockRecorder) GetWindowUpdates() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetWindowUpdates", reflect.TypeOf((*MockFlowControlManager)(nil).GetWindowUpdates))
|
||||
}
|
||||
|
||||
// GetReceiveWindow mocks base method
|
||||
func (_m *MockFlowControlManager) GetReceiveWindow(streamID protocol.StreamID) (protocol.ByteCount, error) {
|
||||
ret := _m.ctrl.Call(_m, "GetReceiveWindow", streamID)
|
||||
ret0, _ := ret[0].(protocol.ByteCount)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetReceiveWindow indicates an expected call of GetReceiveWindow
|
||||
func (_mr *MockFlowControlManagerMockRecorder) GetReceiveWindow(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetReceiveWindow", reflect.TypeOf((*MockFlowControlManager)(nil).GetReceiveWindow), arg0)
|
||||
}
|
||||
|
||||
// AddBytesSent mocks base method
|
||||
func (_m *MockFlowControlManager) AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error {
|
||||
ret := _m.ctrl.Call(_m, "AddBytesSent", streamID, n)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// AddBytesSent indicates an expected call of AddBytesSent
|
||||
func (_mr *MockFlowControlManagerMockRecorder) AddBytesSent(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "AddBytesSent", reflect.TypeOf((*MockFlowControlManager)(nil).AddBytesSent), arg0, arg1)
|
||||
}
|
||||
|
||||
// SendWindowSize mocks base method
|
||||
func (_m *MockFlowControlManager) SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error) {
|
||||
ret := _m.ctrl.Call(_m, "SendWindowSize", streamID)
|
||||
ret0, _ := ret[0].(protocol.ByteCount)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// SendWindowSize indicates an expected call of SendWindowSize
|
||||
func (_mr *MockFlowControlManagerMockRecorder) SendWindowSize(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "SendWindowSize", reflect.TypeOf((*MockFlowControlManager)(nil).SendWindowSize), arg0)
|
||||
}
|
||||
|
||||
// RemainingConnectionWindowSize mocks base method
|
||||
func (_m *MockFlowControlManager) RemainingConnectionWindowSize() protocol.ByteCount {
|
||||
ret := _m.ctrl.Call(_m, "RemainingConnectionWindowSize")
|
||||
ret0, _ := ret[0].(protocol.ByteCount)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// RemainingConnectionWindowSize indicates an expected call of RemainingConnectionWindowSize
|
||||
func (_mr *MockFlowControlManagerMockRecorder) RemainingConnectionWindowSize() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "RemainingConnectionWindowSize", reflect.TypeOf((*MockFlowControlManager)(nil).RemainingConnectionWindowSize))
|
||||
}
|
||||
|
||||
// UpdateWindow mocks base method
|
||||
func (_m *MockFlowControlManager) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) {
|
||||
ret := _m.ctrl.Call(_m, "UpdateWindow", streamID, offset)
|
||||
ret0, _ := ret[0].(bool)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// UpdateWindow indicates an expected call of UpdateWindow
|
||||
func (_mr *MockFlowControlManagerMockRecorder) UpdateWindow(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateWindow", reflect.TypeOf((*MockFlowControlManager)(nil).UpdateWindow), arg0, arg1)
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
package mocks
|
||||
|
||||
// mockgen source mode doesn't properly recognize structs defined in the same package
|
||||
// so we have to use sed to correct for that
|
||||
|
||||
//go:generate sh -c "mockgen -package mocks -source ../flowcontrol/interface.go | sed \"s/\\[\\]WindowUpdate/[]flowcontrol.WindowUpdate/g\" > flow_control_manager.go"
|
||||
//go:generate sh -c "./mockgen_internal.sh mocks stream_flow_controller.go github.com/lucas-clemente/quic-go/internal/flowcontrol StreamFlowController"
|
||||
//go:generate sh -c "./mockgen_internal.sh mocks connection_flow_controller.go github.com/lucas-clemente/quic-go/internal/flowcontrol ConnectionFlowController"
|
||||
//go:generate sh -c "./mockgen_stream.sh mocks stream.go github.com/lucas-clemente/quic-go StreamI"
|
||||
//go:generate sh -c "goimports -w ."
|
||||
|
||||
21
internal/mocks/mockgen_internal.sh
Executable file
21
internal/mocks/mockgen_internal.sh
Executable file
@@ -0,0 +1,21 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Mockgen refuses to generate mocks for internal packages.
|
||||
# This script copies the internal directory and renames it to internalpackage.
|
||||
# That way, mockgen can generate the mock.
|
||||
# Afterwards, it corrects the import paths (replaces internalpackage back to internal).
|
||||
|
||||
TEMP_DIR=$(mktemp -d)
|
||||
mkdir -p $TEMP_DIR/src/github.com/lucas-clemente/quic-go/internalpackage
|
||||
|
||||
cp -r $GOPATH/src/github.com/lucas-clemente/quic-go/internal/* $TEMP_DIR/src/github.com/lucas-clemente/quic-go/internalpackage
|
||||
find $TEMP_DIR -type f -name "*.go" -exec sed -i '' 's/internal/internalpackage/g' {} \;
|
||||
|
||||
export GOPATH="$TEMP_DIR:$GOPATH"
|
||||
PACKAGE_PATH=${3/internal/internalpackage}
|
||||
|
||||
|
||||
mockgen -package $1 -self_package $1 -destination $2 $PACKAGE_PATH $4
|
||||
sed -i '' 's/internalpackage/internal/g' $2
|
||||
|
||||
rm -r "$TEMP_DIR"
|
||||
18
internal/mocks/mockgen_stream.sh
Executable file
18
internal/mocks/mockgen_stream.sh
Executable file
@@ -0,0 +1,18 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Mockgen refuses to generate mocks for internal packages.
|
||||
# This script copies the internal directory and renames it to internalpackage.
|
||||
# That way, mockgen can generate the mock.
|
||||
# Afterwards, it corrects the import paths (replaces internalpackage back to internal).
|
||||
|
||||
TEMP_DIR=$(mktemp -d)
|
||||
mkdir -p $TEMP_DIR/src/github.com/lucas-clemente/quic-go/
|
||||
|
||||
cp -r $GOPATH/src/github.com/lucas-clemente/quic-go/ $TEMP_DIR/src/github.com/lucas-clemente/quic-go/
|
||||
echo "type StreamI = streamI" >> $TEMP_DIR/src/github.com/lucas-clemente/quic-go/stream.go
|
||||
|
||||
export GOPATH="$TEMP_DIR:$GOPATH"
|
||||
|
||||
mockgen -package $1 -self_package $1 -destination $2 $3 $4
|
||||
|
||||
rm -r "$TEMP_DIR"
|
||||
283
internal/mocks/stream.go
Normal file
283
internal/mocks/stream.go
Normal file
@@ -0,0 +1,283 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/lucas-clemente/quic-go (interfaces: StreamI)
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
reflect "reflect"
|
||||
time "time"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
protocol "github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
wire "github.com/lucas-clemente/quic-go/internal/wire"
|
||||
)
|
||||
|
||||
// MockStreamI is a mock of StreamI interface
|
||||
type MockStreamI struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockStreamIMockRecorder
|
||||
}
|
||||
|
||||
// MockStreamIMockRecorder is the mock recorder for MockStreamI
|
||||
type MockStreamIMockRecorder struct {
|
||||
mock *MockStreamI
|
||||
}
|
||||
|
||||
// NewMockStreamI creates a new mock instance
|
||||
func NewMockStreamI(ctrl *gomock.Controller) *MockStreamI {
|
||||
mock := &MockStreamI{ctrl: ctrl}
|
||||
mock.recorder = &MockStreamIMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (_m *MockStreamI) EXPECT() *MockStreamIMockRecorder {
|
||||
return _m.recorder
|
||||
}
|
||||
|
||||
// AddStreamFrame mocks base method
|
||||
func (_m *MockStreamI) AddStreamFrame(_param0 *wire.StreamFrame) error {
|
||||
ret := _m.ctrl.Call(_m, "AddStreamFrame", _param0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// AddStreamFrame indicates an expected call of AddStreamFrame
|
||||
func (_mr *MockStreamIMockRecorder) AddStreamFrame(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "AddStreamFrame", reflect.TypeOf((*MockStreamI)(nil).AddStreamFrame), arg0)
|
||||
}
|
||||
|
||||
// Cancel mocks base method
|
||||
func (_m *MockStreamI) Cancel(_param0 error) {
|
||||
_m.ctrl.Call(_m, "Cancel", _param0)
|
||||
}
|
||||
|
||||
// Cancel indicates an expected call of Cancel
|
||||
func (_mr *MockStreamIMockRecorder) Cancel(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Cancel", reflect.TypeOf((*MockStreamI)(nil).Cancel), arg0)
|
||||
}
|
||||
|
||||
// Close mocks base method
|
||||
func (_m *MockStreamI) Close() error {
|
||||
ret := _m.ctrl.Call(_m, "Close")
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Close indicates an expected call of Close
|
||||
func (_mr *MockStreamIMockRecorder) Close() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Close", reflect.TypeOf((*MockStreamI)(nil).Close))
|
||||
}
|
||||
|
||||
// Context mocks base method
|
||||
func (_m *MockStreamI) Context() context.Context {
|
||||
ret := _m.ctrl.Call(_m, "Context")
|
||||
ret0, _ := ret[0].(context.Context)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Context indicates an expected call of Context
|
||||
func (_mr *MockStreamIMockRecorder) Context() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Context", reflect.TypeOf((*MockStreamI)(nil).Context))
|
||||
}
|
||||
|
||||
// Finished mocks base method
|
||||
func (_m *MockStreamI) Finished() bool {
|
||||
ret := _m.ctrl.Call(_m, "Finished")
|
||||
ret0, _ := ret[0].(bool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Finished indicates an expected call of Finished
|
||||
func (_mr *MockStreamIMockRecorder) Finished() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Finished", reflect.TypeOf((*MockStreamI)(nil).Finished))
|
||||
}
|
||||
|
||||
// GetDataForWriting mocks base method
|
||||
func (_m *MockStreamI) GetDataForWriting(_param0 protocol.ByteCount) []byte {
|
||||
ret := _m.ctrl.Call(_m, "GetDataForWriting", _param0)
|
||||
ret0, _ := ret[0].([]byte)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// GetDataForWriting indicates an expected call of GetDataForWriting
|
||||
func (_mr *MockStreamIMockRecorder) GetDataForWriting(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetDataForWriting", reflect.TypeOf((*MockStreamI)(nil).GetDataForWriting), arg0)
|
||||
}
|
||||
|
||||
// GetWindowUpdate mocks base method
|
||||
func (_m *MockStreamI) GetWindowUpdate() protocol.ByteCount {
|
||||
ret := _m.ctrl.Call(_m, "GetWindowUpdate")
|
||||
ret0, _ := ret[0].(protocol.ByteCount)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// GetWindowUpdate indicates an expected call of GetWindowUpdate
|
||||
func (_mr *MockStreamIMockRecorder) GetWindowUpdate() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetWindowUpdate", reflect.TypeOf((*MockStreamI)(nil).GetWindowUpdate))
|
||||
}
|
||||
|
||||
// GetWriteOffset mocks base method
|
||||
func (_m *MockStreamI) GetWriteOffset() protocol.ByteCount {
|
||||
ret := _m.ctrl.Call(_m, "GetWriteOffset")
|
||||
ret0, _ := ret[0].(protocol.ByteCount)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// GetWriteOffset indicates an expected call of GetWriteOffset
|
||||
func (_mr *MockStreamIMockRecorder) GetWriteOffset() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetWriteOffset", reflect.TypeOf((*MockStreamI)(nil).GetWriteOffset))
|
||||
}
|
||||
|
||||
// IsFlowControlBlocked mocks base method
|
||||
func (_m *MockStreamI) IsFlowControlBlocked() bool {
|
||||
ret := _m.ctrl.Call(_m, "IsFlowControlBlocked")
|
||||
ret0, _ := ret[0].(bool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// IsFlowControlBlocked indicates an expected call of IsFlowControlBlocked
|
||||
func (_mr *MockStreamIMockRecorder) IsFlowControlBlocked() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsFlowControlBlocked", reflect.TypeOf((*MockStreamI)(nil).IsFlowControlBlocked))
|
||||
}
|
||||
|
||||
// LenOfDataForWriting mocks base method
|
||||
func (_m *MockStreamI) LenOfDataForWriting() protocol.ByteCount {
|
||||
ret := _m.ctrl.Call(_m, "LenOfDataForWriting")
|
||||
ret0, _ := ret[0].(protocol.ByteCount)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// LenOfDataForWriting indicates an expected call of LenOfDataForWriting
|
||||
func (_mr *MockStreamIMockRecorder) LenOfDataForWriting() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "LenOfDataForWriting", reflect.TypeOf((*MockStreamI)(nil).LenOfDataForWriting))
|
||||
}
|
||||
|
||||
// Read mocks base method
|
||||
func (_m *MockStreamI) Read(_param0 []byte) (int, error) {
|
||||
ret := _m.ctrl.Call(_m, "Read", _param0)
|
||||
ret0, _ := ret[0].(int)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Read indicates an expected call of Read
|
||||
func (_mr *MockStreamIMockRecorder) Read(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Read", reflect.TypeOf((*MockStreamI)(nil).Read), arg0)
|
||||
}
|
||||
|
||||
// RegisterRemoteError mocks base method
|
||||
func (_m *MockStreamI) RegisterRemoteError(_param0 error, _param1 protocol.ByteCount) error {
|
||||
ret := _m.ctrl.Call(_m, "RegisterRemoteError", _param0, _param1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// RegisterRemoteError indicates an expected call of RegisterRemoteError
|
||||
func (_mr *MockStreamIMockRecorder) RegisterRemoteError(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "RegisterRemoteError", reflect.TypeOf((*MockStreamI)(nil).RegisterRemoteError), arg0, arg1)
|
||||
}
|
||||
|
||||
// Reset mocks base method
|
||||
func (_m *MockStreamI) Reset(_param0 error) {
|
||||
_m.ctrl.Call(_m, "Reset", _param0)
|
||||
}
|
||||
|
||||
// Reset indicates an expected call of Reset
|
||||
func (_mr *MockStreamIMockRecorder) Reset(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Reset", reflect.TypeOf((*MockStreamI)(nil).Reset), arg0)
|
||||
}
|
||||
|
||||
// SentFin mocks base method
|
||||
func (_m *MockStreamI) SentFin() {
|
||||
_m.ctrl.Call(_m, "SentFin")
|
||||
}
|
||||
|
||||
// SentFin indicates an expected call of SentFin
|
||||
func (_mr *MockStreamIMockRecorder) SentFin() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "SentFin", reflect.TypeOf((*MockStreamI)(nil).SentFin))
|
||||
}
|
||||
|
||||
// SetDeadline mocks base method
|
||||
func (_m *MockStreamI) SetDeadline(_param0 time.Time) error {
|
||||
ret := _m.ctrl.Call(_m, "SetDeadline", _param0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SetDeadline indicates an expected call of SetDeadline
|
||||
func (_mr *MockStreamIMockRecorder) SetDeadline(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "SetDeadline", reflect.TypeOf((*MockStreamI)(nil).SetDeadline), arg0)
|
||||
}
|
||||
|
||||
// SetReadDeadline mocks base method
|
||||
func (_m *MockStreamI) SetReadDeadline(_param0 time.Time) error {
|
||||
ret := _m.ctrl.Call(_m, "SetReadDeadline", _param0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SetReadDeadline indicates an expected call of SetReadDeadline
|
||||
func (_mr *MockStreamIMockRecorder) SetReadDeadline(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "SetReadDeadline", reflect.TypeOf((*MockStreamI)(nil).SetReadDeadline), arg0)
|
||||
}
|
||||
|
||||
// SetWriteDeadline mocks base method
|
||||
func (_m *MockStreamI) SetWriteDeadline(_param0 time.Time) error {
|
||||
ret := _m.ctrl.Call(_m, "SetWriteDeadline", _param0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SetWriteDeadline indicates an expected call of SetWriteDeadline
|
||||
func (_mr *MockStreamIMockRecorder) SetWriteDeadline(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "SetWriteDeadline", reflect.TypeOf((*MockStreamI)(nil).SetWriteDeadline), arg0)
|
||||
}
|
||||
|
||||
// ShouldSendFin mocks base method
|
||||
func (_m *MockStreamI) ShouldSendFin() bool {
|
||||
ret := _m.ctrl.Call(_m, "ShouldSendFin")
|
||||
ret0, _ := ret[0].(bool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// ShouldSendFin indicates an expected call of ShouldSendFin
|
||||
func (_mr *MockStreamIMockRecorder) ShouldSendFin() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "ShouldSendFin", reflect.TypeOf((*MockStreamI)(nil).ShouldSendFin))
|
||||
}
|
||||
|
||||
// StreamID mocks base method
|
||||
func (_m *MockStreamI) StreamID() protocol.StreamID {
|
||||
ret := _m.ctrl.Call(_m, "StreamID")
|
||||
ret0, _ := ret[0].(protocol.StreamID)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// StreamID indicates an expected call of StreamID
|
||||
func (_mr *MockStreamIMockRecorder) StreamID() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "StreamID", reflect.TypeOf((*MockStreamI)(nil).StreamID))
|
||||
}
|
||||
|
||||
// UpdateSendWindow mocks base method
|
||||
func (_m *MockStreamI) UpdateSendWindow(_param0 protocol.ByteCount) {
|
||||
_m.ctrl.Call(_m, "UpdateSendWindow", _param0)
|
||||
}
|
||||
|
||||
// UpdateSendWindow indicates an expected call of UpdateSendWindow
|
||||
func (_mr *MockStreamIMockRecorder) UpdateSendWindow(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateSendWindow", reflect.TypeOf((*MockStreamI)(nil).UpdateSendWindow), arg0)
|
||||
}
|
||||
|
||||
// Write mocks base method
|
||||
func (_m *MockStreamI) Write(_param0 []byte) (int, error) {
|
||||
ret := _m.ctrl.Call(_m, "Write", _param0)
|
||||
ret0, _ := ret[0].(int)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Write indicates an expected call of Write
|
||||
func (_mr *MockStreamIMockRecorder) Write(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Write", reflect.TypeOf((*MockStreamI)(nil).Write), arg0)
|
||||
}
|
||||
112
internal/mocks/stream_flow_controller.go
Normal file
112
internal/mocks/stream_flow_controller.go
Normal file
@@ -0,0 +1,112 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/lucas-clemente/quic-go/internal/flowcontrol (interfaces: StreamFlowController)
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
protocol "github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
)
|
||||
|
||||
// MockStreamFlowController is a mock of StreamFlowController interface
|
||||
type MockStreamFlowController struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockStreamFlowControllerMockRecorder
|
||||
}
|
||||
|
||||
// MockStreamFlowControllerMockRecorder is the mock recorder for MockStreamFlowController
|
||||
type MockStreamFlowControllerMockRecorder struct {
|
||||
mock *MockStreamFlowController
|
||||
}
|
||||
|
||||
// NewMockStreamFlowController creates a new mock instance
|
||||
func NewMockStreamFlowController(ctrl *gomock.Controller) *MockStreamFlowController {
|
||||
mock := &MockStreamFlowController{ctrl: ctrl}
|
||||
mock.recorder = &MockStreamFlowControllerMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (_m *MockStreamFlowController) EXPECT() *MockStreamFlowControllerMockRecorder {
|
||||
return _m.recorder
|
||||
}
|
||||
|
||||
// AddBytesRead mocks base method
|
||||
func (_m *MockStreamFlowController) AddBytesRead(_param0 protocol.ByteCount) {
|
||||
_m.ctrl.Call(_m, "AddBytesRead", _param0)
|
||||
}
|
||||
|
||||
// AddBytesRead indicates an expected call of AddBytesRead
|
||||
func (_mr *MockStreamFlowControllerMockRecorder) AddBytesRead(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "AddBytesRead", reflect.TypeOf((*MockStreamFlowController)(nil).AddBytesRead), arg0)
|
||||
}
|
||||
|
||||
// AddBytesSent mocks base method
|
||||
func (_m *MockStreamFlowController) AddBytesSent(_param0 protocol.ByteCount) {
|
||||
_m.ctrl.Call(_m, "AddBytesSent", _param0)
|
||||
}
|
||||
|
||||
// AddBytesSent indicates an expected call of AddBytesSent
|
||||
func (_mr *MockStreamFlowControllerMockRecorder) AddBytesSent(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "AddBytesSent", reflect.TypeOf((*MockStreamFlowController)(nil).AddBytesSent), arg0)
|
||||
}
|
||||
|
||||
// GetWindowUpdate mocks base method
|
||||
func (_m *MockStreamFlowController) GetWindowUpdate() protocol.ByteCount {
|
||||
ret := _m.ctrl.Call(_m, "GetWindowUpdate")
|
||||
ret0, _ := ret[0].(protocol.ByteCount)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// GetWindowUpdate indicates an expected call of GetWindowUpdate
|
||||
func (_mr *MockStreamFlowControllerMockRecorder) GetWindowUpdate() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).GetWindowUpdate))
|
||||
}
|
||||
|
||||
// IsBlocked mocks base method
|
||||
func (_m *MockStreamFlowController) IsBlocked() bool {
|
||||
ret := _m.ctrl.Call(_m, "IsBlocked")
|
||||
ret0, _ := ret[0].(bool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// IsBlocked indicates an expected call of IsBlocked
|
||||
func (_mr *MockStreamFlowControllerMockRecorder) IsBlocked() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsBlocked", reflect.TypeOf((*MockStreamFlowController)(nil).IsBlocked))
|
||||
}
|
||||
|
||||
// SendWindowSize mocks base method
|
||||
func (_m *MockStreamFlowController) SendWindowSize() protocol.ByteCount {
|
||||
ret := _m.ctrl.Call(_m, "SendWindowSize")
|
||||
ret0, _ := ret[0].(protocol.ByteCount)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SendWindowSize indicates an expected call of SendWindowSize
|
||||
func (_mr *MockStreamFlowControllerMockRecorder) SendWindowSize() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "SendWindowSize", reflect.TypeOf((*MockStreamFlowController)(nil).SendWindowSize))
|
||||
}
|
||||
|
||||
// UpdateHighestReceived mocks base method
|
||||
func (_m *MockStreamFlowController) UpdateHighestReceived(_param0 protocol.ByteCount, _param1 bool) error {
|
||||
ret := _m.ctrl.Call(_m, "UpdateHighestReceived", _param0, _param1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// UpdateHighestReceived indicates an expected call of UpdateHighestReceived
|
||||
func (_mr *MockStreamFlowControllerMockRecorder) UpdateHighestReceived(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateHighestReceived", reflect.TypeOf((*MockStreamFlowController)(nil).UpdateHighestReceived), arg0, arg1)
|
||||
}
|
||||
|
||||
// UpdateSendWindow mocks base method
|
||||
func (_m *MockStreamFlowController) UpdateSendWindow(_param0 protocol.ByteCount) {
|
||||
_m.ctrl.Call(_m, "UpdateSendWindow", _param0)
|
||||
}
|
||||
|
||||
// UpdateSendWindow indicates an expected call of UpdateSendWindow
|
||||
func (_mr *MockStreamFlowControllerMockRecorder) UpdateSendWindow(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateSendWindow", reflect.TypeOf((*MockStreamFlowController)(nil).UpdateSendWindow), arg0)
|
||||
}
|
||||
Reference in New Issue
Block a user