package flowcontrol import ( "sync" "time" "github.com/lucas-clemente/quic-go/congestion" "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/utils" ) type baseFlowController struct { mutex sync.RWMutex rttStats *congestion.RTTStats bytesSent protocol.ByteCount sendWindow protocol.ByteCount lastWindowUpdateTime time.Time bytesRead protocol.ByteCount highestReceived protocol.ByteCount receiveWindow protocol.ByteCount receiveWindowIncrement protocol.ByteCount maxReceiveWindowIncrement protocol.ByteCount } func (c *baseFlowController) AddBytesSent(n protocol.ByteCount) { c.mutex.Lock() defer c.mutex.Unlock() c.bytesSent += n } // UpdateSendWindow should be called after receiving a WindowUpdateFrame // it returns true if the window was actually updated func (c *baseFlowController) UpdateSendWindow(offset protocol.ByteCount) { c.mutex.Lock() defer c.mutex.Unlock() if offset > c.sendWindow { c.sendWindow = offset } } func (c *baseFlowController) sendWindowSize() protocol.ByteCount { // this only happens during connection establishment, when data is sent before we receive the peer's transport parameters if c.bytesSent > c.sendWindow { return 0 } return c.sendWindow - c.bytesSent } func (c *baseFlowController) AddBytesRead(n protocol.ByteCount) { c.mutex.Lock() defer c.mutex.Unlock() // pretend we sent a WindowUpdate when reading the first byte // this way auto-tuning of the window increment already works for the first WindowUpdate if c.bytesRead == 0 { c.lastWindowUpdateTime = time.Now() } c.bytesRead += n } // getWindowUpdate updates the receive window, if necessary // it returns the new offset func (c *baseFlowController) getWindowUpdate() protocol.ByteCount { diff := c.receiveWindow - c.bytesRead // update the window when more than half of it was already consumed if diff >= (c.receiveWindowIncrement / 2) { return 0 } c.maybeAdjustWindowIncrement() c.receiveWindow = c.bytesRead + c.receiveWindowIncrement c.lastWindowUpdateTime = time.Now() return c.receiveWindow } func (c *baseFlowController) IsBlocked() bool { c.mutex.RLock() defer c.mutex.RUnlock() return c.sendWindowSize() == 0 } // maybeAdjustWindowIncrement increases the receiveWindowIncrement if we're sending WindowUpdates too often func (c *baseFlowController) maybeAdjustWindowIncrement() { if c.lastWindowUpdateTime.IsZero() { return } rtt := c.rttStats.SmoothedRTT() if rtt == 0 { return } timeSinceLastWindowUpdate := time.Since(c.lastWindowUpdateTime) // interval between the window updates is sufficiently large, no need to increase the increment if timeSinceLastWindowUpdate >= 2*rtt { return } c.receiveWindowIncrement = utils.MinByteCount(2*c.receiveWindowIncrement, c.maxReceiveWindowIncrement) } func (c *baseFlowController) checkFlowControlViolation() bool { return c.highestReceived > c.receiveWindow }