diff --git a/internal/ackhandler/sent_packet_handler.go b/internal/ackhandler/sent_packet_handler.go index 72911e13..1aead119 100644 --- a/internal/ackhandler/sent_packet_handler.go +++ b/internal/ackhandler/sent_packet_handler.go @@ -220,11 +220,11 @@ func (h *sentPacketHandler) ReceivedAck(ackFrame *wire.AckFrame, withPacketNumbe if p.largestAcked != 0 { h.lowestPacketNotConfirmedAcked = utils.MaxPacketNumber(h.lowestPacketNotConfirmedAcked, p.largestAcked+1) } - if err := h.onPacketAcked(p); err != nil { + if err := h.onPacketAcked(p, rcvTime); err != nil { return err } if p.includedInBytesInFlight { - h.congestion.OnPacketAcked(p.PacketNumber, p.Length, priorInFlight) + h.congestion.OnPacketAcked(p.PacketNumber, p.Length, priorInFlight, rcvTime) } } @@ -382,7 +382,7 @@ func (h *sentPacketHandler) GetAlarmTimeout() time.Time { return h.alarm } -func (h *sentPacketHandler) onPacketAcked(p *Packet) error { +func (h *sentPacketHandler) onPacketAcked(p *Packet, rcvTime time.Time) error { // This happens if a packet and its retransmissions is acked in the same ACK. // As soon as we process the first one, this will remove all the retransmissions, // so we won't find the retransmitted packet number later. diff --git a/internal/ackhandler/sent_packet_handler_test.go b/internal/ackhandler/sent_packet_handler_test.go index 37df5965..ecd608be 100644 --- a/internal/ackhandler/sent_packet_handler_test.go +++ b/internal/ackhandler/sent_packet_handler_test.go @@ -568,18 +568,19 @@ var _ = Describe("SentPacketHandler", func() { }) It("should call MaybeExitSlowStart and OnPacketAcked", func() { + rcvTime := time.Now().Add(-5 * time.Second) cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(3) cong.EXPECT().TimeUntilSend(gomock.Any()).Times(3) gomock.InOrder( cong.EXPECT().MaybeExitSlowStart(), // must be called before packets are acked - cong.EXPECT().OnPacketAcked(protocol.PacketNumber(1), protocol.ByteCount(1), protocol.ByteCount(3)), - cong.EXPECT().OnPacketAcked(protocol.PacketNumber(2), protocol.ByteCount(1), protocol.ByteCount(3)), + cong.EXPECT().OnPacketAcked(protocol.PacketNumber(1), protocol.ByteCount(1), protocol.ByteCount(3), rcvTime), + cong.EXPECT().OnPacketAcked(protocol.PacketNumber(2), protocol.ByteCount(1), protocol.ByteCount(3), rcvTime), ) handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 1})) handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 2})) handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 3})) ack := &wire.AckFrame{AckRanges: []wire.AckRange{{Smallest: 1, Largest: 2}}} - err := handler.ReceivedAck(ack, 1, protocol.EncryptionForwardSecure, time.Now()) + err := handler.ReceivedAck(ack, 1, protocol.EncryptionForwardSecure, rcvTime) Expect(err).NotTo(HaveOccurred()) }) @@ -611,10 +612,11 @@ var _ = Describe("SentPacketHandler", func() { Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) // send one probe packet and receive an ACK for it + rcvTime := time.Now() gomock.InOrder( cong.EXPECT().MaybeExitSlowStart(), cong.EXPECT().OnRetransmissionTimeout(true), - cong.EXPECT().OnPacketAcked(protocol.PacketNumber(5), protocol.ByteCount(1), protocol.ByteCount(5)), + cong.EXPECT().OnPacketAcked(protocol.PacketNumber(5), protocol.ByteCount(1), protocol.ByteCount(5), rcvTime), cong.EXPECT().OnPacketLost(protocol.PacketNumber(1), protocol.ByteCount(1), protocol.ByteCount(5)), cong.EXPECT().OnPacketLost(protocol.PacketNumber(2), protocol.ByteCount(1), protocol.ByteCount(5)), cong.EXPECT().OnPacketLost(protocol.PacketNumber(3), protocol.ByteCount(1), protocol.ByteCount(5)), @@ -622,7 +624,7 @@ var _ = Describe("SentPacketHandler", func() { ) handler.SentPacket(retransmittablePacket(&Packet{PacketNumber: 5})) ack := &wire.AckFrame{AckRanges: []wire.AckRange{{Smallest: 5, Largest: 5}}} - err := handler.ReceivedAck(ack, 1, protocol.EncryptionForwardSecure, time.Now()) + err := handler.ReceivedAck(ack, 1, protocol.EncryptionForwardSecure, rcvTime) Expect(err).ToNot(HaveOccurred()) }) @@ -643,7 +645,7 @@ var _ = Describe("SentPacketHandler", func() { // don't EXPECT any call to OnRetransmissionTimeout gomock.InOrder( cong.EXPECT().MaybeExitSlowStart(), - cong.EXPECT().OnPacketAcked(protocol.PacketNumber(2), protocol.ByteCount(1), protocol.ByteCount(3)), + cong.EXPECT().OnPacketAcked(protocol.PacketNumber(2), protocol.ByteCount(1), protocol.ByteCount(3), gomock.Any()), cong.EXPECT().OnPacketLost(protocol.PacketNumber(1), protocol.ByteCount(1), protocol.ByteCount(3)), ) ack := &wire.AckFrame{AckRanges: []wire.AckRange{{Smallest: 2, Largest: 2}}} @@ -659,7 +661,7 @@ var _ = Describe("SentPacketHandler", func() { // lose packet 1 gomock.InOrder( cong.EXPECT().MaybeExitSlowStart(), - cong.EXPECT().OnPacketAcked(protocol.PacketNumber(2), protocol.ByteCount(1), protocol.ByteCount(2)), + cong.EXPECT().OnPacketAcked(protocol.PacketNumber(2), protocol.ByteCount(1), protocol.ByteCount(2), gomock.Any()), cong.EXPECT().OnPacketLost(protocol.PacketNumber(1), protocol.ByteCount(1), protocol.ByteCount(2)), ) ack := &wire.AckFrame{AckRanges: []wire.AckRange{{Smallest: 2, Largest: 2}}} @@ -681,7 +683,7 @@ var _ = Describe("SentPacketHandler", func() { // receive the first ACK gomock.InOrder( cong.EXPECT().MaybeExitSlowStart(), - cong.EXPECT().OnPacketAcked(protocol.PacketNumber(2), protocol.ByteCount(1), protocol.ByteCount(4)), + cong.EXPECT().OnPacketAcked(protocol.PacketNumber(2), protocol.ByteCount(1), protocol.ByteCount(4), gomock.Any()), cong.EXPECT().OnPacketLost(protocol.PacketNumber(1), protocol.ByteCount(1), protocol.ByteCount(4)), ) ack := &wire.AckFrame{AckRanges: []wire.AckRange{{Smallest: 2, Largest: 2}}} @@ -690,7 +692,7 @@ var _ = Describe("SentPacketHandler", func() { // receive the second ACK gomock.InOrder( cong.EXPECT().MaybeExitSlowStart(), - cong.EXPECT().OnPacketAcked(protocol.PacketNumber(4), protocol.ByteCount(1), protocol.ByteCount(2)), + cong.EXPECT().OnPacketAcked(protocol.PacketNumber(4), protocol.ByteCount(1), protocol.ByteCount(2), gomock.Any()), cong.EXPECT().OnPacketLost(protocol.PacketNumber(3), protocol.ByteCount(1), protocol.ByteCount(2)), ) ack = &wire.AckFrame{AckRanges: []wire.AckRange{{Smallest: 4, Largest: 4}}} diff --git a/internal/congestion/cubic.go b/internal/congestion/cubic.go index 3922f476..dcf91fc6 100644 --- a/internal/congestion/cubic.go +++ b/internal/congestion/cubic.go @@ -16,11 +16,10 @@ import ( // allow a 10 shift right to divide. // 1024*1024^3 (first 1024 is from 0.100^3) -// where 0.100 is 100 ms which is the scaling -// round trip time. +// where 0.100 is 100 ms which is the scaling round trip time. const cubeScale = 40 const cubeCongestionWindowScale = 410 -const cubeFactor protocol.PacketNumber = 1 << cubeScale / cubeCongestionWindowScale +const cubeFactor protocol.ByteCount = 1 << cubeScale / cubeCongestionWindowScale / protocol.DefaultTCPMSS const defaultNumConnections = 2 @@ -32,39 +31,35 @@ const beta float32 = 0.7 // new concurrent flows and speed up convergence. const betaLastMax float32 = 0.85 -// If true, Cubic's epoch is shifted when the sender is application-limited. -const shiftQuicCubicEpochWhenAppLimited = true - -const maxCubicTimeInterval = 30 * time.Millisecond - // Cubic implements the cubic algorithm from TCP type Cubic struct { clock Clock + // Number of connections to simulate. numConnections int + // Time when this cycle started, after last loss event. epoch time.Time - // Time when sender went into application-limited period. Zero if not in - // application-limited period. - appLimitedStartTime time.Time - // Time when we updated last_congestion_window. - lastUpdateTime time.Time - // Last congestion window (in packets) used. - lastCongestionWindow protocol.PacketNumber - // Max congestion window (in packets) used just before last loss event. + + // Max congestion window used just before last loss event. // Note: to improve fairness to other streams an additional back off is // applied to this value if the new value is below our latest value. - lastMaxCongestionWindow protocol.PacketNumber - // Number of acked packets since the cycle started (epoch). - ackedPacketsCount protocol.PacketNumber + lastMaxCongestionWindow protocol.ByteCount + + // Number of acked bytes since the cycle started (epoch). + ackedBytesCount protocol.ByteCount + // TCP Reno equivalent congestion window in packets. - estimatedTCPcongestionWindow protocol.PacketNumber + estimatedTCPcongestionWindow protocol.ByteCount + // Origin point of cubic function. - originPointCongestionWindow protocol.PacketNumber + originPointCongestionWindow protocol.ByteCount + // Time to origin point of cubic function in 2^10 fractions of a second. timeToOriginPoint uint32 + // Last congestion window in packets computed by cubic function. - lastTargetCongestionWindow protocol.PacketNumber + lastTargetCongestionWindow protocol.ByteCount } // NewCubic returns a new Cubic instance @@ -80,11 +75,8 @@ func NewCubic(clock Clock) *Cubic { // Reset is called after a timeout to reset the cubic state func (c *Cubic) Reset() { c.epoch = time.Time{} - c.appLimitedStartTime = time.Time{} - c.lastUpdateTime = time.Time{} - c.lastCongestionWindow = 0 c.lastMaxCongestionWindow = 0 - c.ackedPacketsCount = 0 + c.ackedBytesCount = 0 c.estimatedTCPcongestionWindow = 0 c.originPointCongestionWindow = 0 c.timeToOriginPoint = 0 @@ -107,57 +99,59 @@ func (c *Cubic) beta() float32 { return (float32(c.numConnections) - 1 + beta) / float32(c.numConnections) } +func (c *Cubic) betaLastMax() float32 { + // betaLastMax is the additional backoff factor after loss for our + // N-connection emulation, which emulates the additional backoff of + // an ensemble of N TCP-Reno connections on a single loss event. The + // effective multiplier is computed as: + return (float32(c.numConnections) - 1 + betaLastMax) / float32(c.numConnections) +} + // OnApplicationLimited is called on ack arrival when sender is unable to use // the available congestion window. Resets Cubic state during quiescence. func (c *Cubic) OnApplicationLimited() { - if shiftQuicCubicEpochWhenAppLimited { - // When sender is not using the available congestion window, Cubic's epoch - // should not continue growing. Record the time when sender goes into an - // app-limited period here, to compensate later when cwnd growth happens. - if c.appLimitedStartTime.IsZero() { - c.appLimitedStartTime = c.clock.Now() - } - } else { - // When sender is not using the available congestion window, Cubic's epoch - // should not continue growing. Reset the epoch when in such a period. - c.epoch = time.Time{} - } + // When sender is not using the available congestion window, the window does + // not grow. But to be RTT-independent, Cubic assumes that the sender has been + // using the entire window during the time since the beginning of the current + // "epoch" (the end of the last loss recovery period). Since + // application-limited periods break this assumption, we reset the epoch when + // in such a period. This reset effectively freezes congestion window growth + // through application-limited periods and allows Cubic growth to continue + // when the entire window is being used. + c.epoch = time.Time{} } // CongestionWindowAfterPacketLoss computes a new congestion window to use after // a loss event. Returns the new congestion window in packets. The new // congestion window is a multiplicative decrease of our current window. -func (c *Cubic) CongestionWindowAfterPacketLoss(currentCongestionWindow protocol.PacketNumber) protocol.PacketNumber { - if currentCongestionWindow < c.lastMaxCongestionWindow { +func (c *Cubic) CongestionWindowAfterPacketLoss(currentCongestionWindow protocol.ByteCount) protocol.ByteCount { + if currentCongestionWindow+protocol.DefaultTCPMSS < c.lastMaxCongestionWindow { // We never reached the old max, so assume we are competing with another // flow. Use our extra back off factor to allow the other flow to go up. - c.lastMaxCongestionWindow = protocol.PacketNumber(betaLastMax * float32(currentCongestionWindow)) + c.lastMaxCongestionWindow = protocol.ByteCount(c.betaLastMax() * float32(currentCongestionWindow)) } else { c.lastMaxCongestionWindow = currentCongestionWindow } c.epoch = time.Time{} // Reset time. - return protocol.PacketNumber(float32(currentCongestionWindow) * c.beta()) + return protocol.ByteCount(float32(currentCongestionWindow) * c.beta()) } // CongestionWindowAfterAck computes a new congestion window to use after a received ACK. // Returns the new congestion window in packets. The new congestion window // follows a cubic function that depends on the time passed since last // packet loss. -func (c *Cubic) CongestionWindowAfterAck(currentCongestionWindow protocol.PacketNumber, delayMin time.Duration) protocol.PacketNumber { - c.ackedPacketsCount++ // Packets acked. - currentTime := c.clock.Now() - - // Cubic is "independent" of RTT, the update is limited by the time elapsed. - if c.lastCongestionWindow == currentCongestionWindow && (currentTime.Sub(c.lastUpdateTime) <= maxCubicTimeInterval) { - return utils.MaxPacketNumber(c.lastTargetCongestionWindow, c.estimatedTCPcongestionWindow) - } - c.lastCongestionWindow = currentCongestionWindow - c.lastUpdateTime = currentTime +func (c *Cubic) CongestionWindowAfterAck( + ackedBytes protocol.ByteCount, + currentCongestionWindow protocol.ByteCount, + delayMin time.Duration, + eventTime time.Time, +) protocol.ByteCount { + c.ackedBytesCount += ackedBytes if c.epoch.IsZero() { // First ACK after a loss event. - c.epoch = currentTime // Start of epoch. - c.ackedPacketsCount = 1 // Reset count. + c.epoch = eventTime // Start of epoch. + c.ackedBytesCount = ackedBytes // Reset count. // Reset estimated_tcp_congestion_window_ to be in sync with cubic. c.estimatedTCPcongestionWindow = currentCongestionWindow if c.lastMaxCongestionWindow <= currentCongestionWindow { @@ -167,48 +161,37 @@ func (c *Cubic) CongestionWindowAfterAck(currentCongestionWindow protocol.Packet c.timeToOriginPoint = uint32(math.Cbrt(float64(cubeFactor * (c.lastMaxCongestionWindow - currentCongestionWindow)))) c.originPointCongestionWindow = c.lastMaxCongestionWindow } - } else { - // If sender was app-limited, then freeze congestion window growth during - // app-limited period. Continue growth now by shifting the epoch-start - // through the app-limited period. - if shiftQuicCubicEpochWhenAppLimited && !c.appLimitedStartTime.IsZero() { - shift := currentTime.Sub(c.appLimitedStartTime) - c.epoch = c.epoch.Add(shift) - c.appLimitedStartTime = time.Time{} - } } // Change the time unit from microseconds to 2^10 fractions per second. Take // the round trip time in account. This is done to allow us to use shift as a // divide operator. - elapsedTime := int64((currentTime.Add(delayMin).Sub(c.epoch)/time.Microsecond)<<10) / 1000000 + elapsedTime := int64(eventTime.Add(delayMin).Sub(c.epoch)/time.Microsecond) << 10 / (1000 * 1000) + // Right-shifts of negative, signed numbers have implementation-dependent + // behavior, so force the offset to be positive, as is done in the kernel. offset := int64(c.timeToOriginPoint) - elapsedTime - // Right-shifts of negative, signed numbers have - // implementation-dependent behavior. Force the offset to be - // positive, similar to the kernel implementation. if offset < 0 { offset = -offset } - deltaCongestionWindow := protocol.PacketNumber((cubeCongestionWindowScale * offset * offset * offset) >> cubeScale) - var targetCongestionWindow protocol.PacketNumber + + deltaCongestionWindow := protocol.ByteCount(cubeCongestionWindowScale*offset*offset*offset) * protocol.DefaultTCPMSS >> cubeScale + var targetCongestionWindow protocol.ByteCount if elapsedTime > int64(c.timeToOriginPoint) { targetCongestionWindow = c.originPointCongestionWindow + deltaCongestionWindow } else { targetCongestionWindow = c.originPointCongestionWindow - deltaCongestionWindow } - // With dynamic beta/alpha based on number of active streams, it is possible - // for the required_ack_count to become much lower than acked_packets_count_ - // suddenly, leading to more than one iteration through the following loop. - for { - // Update estimated TCP congestion_window. - requiredAckCount := protocol.PacketNumber(float32(c.estimatedTCPcongestionWindow) / c.alpha()) - if c.ackedPacketsCount < requiredAckCount { - break - } - c.ackedPacketsCount -= requiredAckCount - c.estimatedTCPcongestionWindow++ - } + // Limit the CWND increase to half the acked bytes. + targetCongestionWindow = utils.MinByteCount(targetCongestionWindow, currentCongestionWindow+c.ackedBytesCount/2) + + // Increase the window by approximately Alpha * 1 MSS of bytes every + // time we ack an estimated tcp window of bytes. For small + // congestion windows (less than 25), the formula below will + // increase slightly slower than linearly per estimated tcp window + // of bytes. + c.estimatedTCPcongestionWindow += protocol.ByteCount(float32(c.ackedBytesCount) * c.alpha() * float32(protocol.DefaultTCPMSS) / float32(c.estimatedTCPcongestionWindow)) + c.ackedBytesCount = 0 // We have a new cubic congestion window. c.lastTargetCongestionWindow = targetCongestionWindow @@ -218,7 +201,6 @@ func (c *Cubic) CongestionWindowAfterAck(currentCongestionWindow protocol.Packet if targetCongestionWindow < c.estimatedTCPcongestionWindow { targetCongestionWindow = c.estimatedTCPcongestionWindow } - return targetCongestionWindow } diff --git a/internal/congestion/cubic_sender.go b/internal/congestion/cubic_sender.go index 21f01942..b9f67e6c 100644 --- a/internal/congestion/cubic_sender.go +++ b/internal/congestion/cubic_sender.go @@ -8,9 +8,9 @@ import ( ) const ( - maxBurstBytes = 3 * protocol.DefaultTCPMSS - defaultMinimumCongestionWindow protocol.PacketNumber = 2 - renoBeta float32 = 0.7 // Reno backoff factor. + maxBurstBytes = 3 * protocol.DefaultTCPMSS + renoBeta float32 = 0.7 // Reno backoff factor. + defaultMinimumCongestionWindow protocol.ByteCount = 2 * protocol.DefaultTCPMSS ) type cubicSender struct { @@ -31,12 +31,6 @@ type cubicSender struct { // Track the largest packet number outstanding when a CWND cutback occurs. largestSentAtLastCutback protocol.PacketNumber - // Congestion window in packets. - congestionWindow protocol.PacketNumber - - // Slow start congestion window in packets, aka ssthresh. - slowstartThreshold protocol.PacketNumber - // Whether the last loss event caused us to exit slowstart. // Used for stats collection of slowstartPacketsLost lastCutbackExitedSlowstart bool @@ -44,24 +38,35 @@ type cubicSender struct { // When true, exit slow start with large cutback of congestion window. slowStartLargeReduction bool - // Minimum congestion window in packets. - minCongestionWindow protocol.PacketNumber + // Congestion window in packets. + congestionWindow protocol.ByteCount - // Maximum number of outstanding packets for tcp. - maxTCPCongestionWindow protocol.PacketNumber + // Minimum congestion window in packets. + minCongestionWindow protocol.ByteCount + + // Maximum congestion window. + maxCongestionWindow protocol.ByteCount + + // Slow start congestion window in bytes, aka ssthresh. + slowstartThreshold protocol.ByteCount // Number of connections to simulate. numConnections int // ACK counter for the Reno implementation. - congestionWindowCount protocol.ByteCount + numAckedPackets uint64 - initialCongestionWindow protocol.PacketNumber - initialMaxCongestionWindow protocol.PacketNumber + initialCongestionWindow protocol.ByteCount + initialMaxCongestionWindow protocol.ByteCount + + minSlowStartExitWindow protocol.ByteCount } +var _ SendAlgorithm = &cubicSender{} +var _ SendAlgorithmWithDebugInfo = &cubicSender{} + // NewCubicSender makes a new cubic sender -func NewCubicSender(clock Clock, rttStats *RTTStats, reno bool, initialCongestionWindow, initialMaxCongestionWindow protocol.PacketNumber) SendAlgorithmWithDebugInfo { +func NewCubicSender(clock Clock, rttStats *RTTStats, reno bool, initialCongestionWindow, initialMaxCongestionWindow protocol.ByteCount) SendAlgorithmWithDebugInfo { return &cubicSender{ rttStats: rttStats, initialCongestionWindow: initialCongestionWindow, @@ -69,7 +74,7 @@ func NewCubicSender(clock Clock, rttStats *RTTStats, reno bool, initialCongestio congestionWindow: initialCongestionWindow, minCongestionWindow: defaultMinimumCongestionWindow, slowstartThreshold: initialMaxCongestionWindow, - maxTCPCongestionWindow: initialMaxCongestionWindow, + maxCongestionWindow: initialMaxCongestionWindow, numConnections: defaultNumConnections, cubic: NewCubic(clock), reno: reno, @@ -80,21 +85,26 @@ func NewCubicSender(clock Clock, rttStats *RTTStats, reno bool, initialCongestio func (c *cubicSender) TimeUntilSend(bytesInFlight protocol.ByteCount) time.Duration { if c.InRecovery() { // PRR is used when in recovery. - if c.prr.TimeUntilSend(c.GetCongestionWindow(), bytesInFlight, c.GetSlowStartThreshold()) == 0 { + if c.prr.CanSend(c.GetCongestionWindow(), bytesInFlight, c.GetSlowStartThreshold()) { return 0 } } - delay := c.rttStats.SmoothedRTT() / time.Duration(2*c.GetCongestionWindow()/protocol.DefaultTCPMSS) + delay := c.rttStats.SmoothedRTT() / time.Duration(2*c.GetCongestionWindow()) if !c.InSlowStart() { // adjust delay, such that it's 1.25*cwd/rtt delay = delay * 8 / 5 } return delay } -func (c *cubicSender) OnPacketSent(sentTime time.Time, bytesInFlight protocol.ByteCount, packetNumber protocol.PacketNumber, bytes protocol.ByteCount, isRetransmittable bool) bool { - // Only update bytesInFlight for data packets. +func (c *cubicSender) OnPacketSent( + sentTime time.Time, + bytesInFlight protocol.ByteCount, + packetNumber protocol.PacketNumber, + bytes protocol.ByteCount, + isRetransmittable bool, +) { if !isRetransmittable { - return false + return } if c.InRecovery() { // PRR is used when in recovery. @@ -102,7 +112,6 @@ func (c *cubicSender) OnPacketSent(sentTime time.Time, bytesInFlight protocol.By } c.largestSentPacketNumber = packetNumber c.hybridSlowStart.OnPacketSent(packetNumber) - return true } func (c *cubicSender) InRecovery() bool { @@ -114,18 +123,18 @@ func (c *cubicSender) InSlowStart() bool { } func (c *cubicSender) GetCongestionWindow() protocol.ByteCount { - return protocol.ByteCount(c.congestionWindow) * protocol.DefaultTCPMSS + return c.congestionWindow } func (c *cubicSender) GetSlowStartThreshold() protocol.ByteCount { - return protocol.ByteCount(c.slowstartThreshold) * protocol.DefaultTCPMSS + return c.slowstartThreshold } func (c *cubicSender) ExitSlowstart() { c.slowstartThreshold = c.congestionWindow } -func (c *cubicSender) SlowstartThreshold() protocol.PacketNumber { +func (c *cubicSender) SlowstartThreshold() protocol.ByteCount { return c.slowstartThreshold } @@ -135,20 +144,29 @@ func (c *cubicSender) MaybeExitSlowStart() { } } -func (c *cubicSender) OnPacketAcked(ackedPacketNumber protocol.PacketNumber, ackedBytes protocol.ByteCount, bytesInFlight protocol.ByteCount) { +func (c *cubicSender) OnPacketAcked( + ackedPacketNumber protocol.PacketNumber, + ackedBytes protocol.ByteCount, + priorInFlight protocol.ByteCount, + eventTime time.Time, +) { c.largestAckedPacketNumber = utils.MaxPacketNumber(ackedPacketNumber, c.largestAckedPacketNumber) if c.InRecovery() { // PRR is used when in recovery. c.prr.OnPacketAcked(ackedBytes) return } - c.maybeIncreaseCwnd(ackedPacketNumber, ackedBytes, bytesInFlight) + c.maybeIncreaseCwnd(ackedPacketNumber, ackedBytes, priorInFlight, eventTime) if c.InSlowStart() { c.hybridSlowStart.OnPacketAcked(ackedPacketNumber) } } -func (c *cubicSender) OnPacketLost(packetNumber protocol.PacketNumber, lostBytes protocol.ByteCount, bytesInFlight protocol.ByteCount) { +func (c *cubicSender) OnPacketLost( + packetNumber protocol.PacketNumber, + lostBytes protocol.ByteCount, + priorInFlight protocol.ByteCount, +) { // TCP NewReno (RFC6582) says that once a loss occurs, any losses in packets // already sent should be treated as a single loss event, since it's expected. if packetNumber <= c.largestSentAtLastCutback { @@ -156,10 +174,8 @@ func (c *cubicSender) OnPacketLost(packetNumber protocol.PacketNumber, lostBytes c.stats.slowstartPacketsLost++ c.stats.slowstartBytesLost += lostBytes if c.slowStartLargeReduction { - if c.stats.slowstartPacketsLost == 1 || (c.stats.slowstartBytesLost/protocol.DefaultTCPMSS) > (c.stats.slowstartBytesLost-lostBytes)/protocol.DefaultTCPMSS { - // Reduce congestion window by 1 for every mss of bytes lost. - c.congestionWindow = utils.MaxPacketNumber(c.congestionWindow-1, c.minCongestionWindow) - } + // Reduce congestion window by lost_bytes for every loss. + c.congestionWindow = utils.MaxByteCount(c.congestionWindow-lostBytes, c.minSlowStartExitWindow) c.slowstartThreshold = c.congestionWindow } } @@ -170,17 +186,19 @@ func (c *cubicSender) OnPacketLost(packetNumber protocol.PacketNumber, lostBytes c.stats.slowstartPacketsLost++ } - c.prr.OnPacketLost(bytesInFlight) + c.prr.OnPacketLost(priorInFlight) // TODO(chromium): Separate out all of slow start into a separate class. if c.slowStartLargeReduction && c.InSlowStart() { - c.congestionWindow = c.congestionWindow - 1 + if c.congestionWindow >= 2*c.initialCongestionWindow { + c.minSlowStartExitWindow = c.congestionWindow / 2 + } + c.congestionWindow = c.congestionWindow - protocol.DefaultTCPMSS } else if c.reno { - c.congestionWindow = protocol.PacketNumber(float32(c.congestionWindow) * c.RenoBeta()) + c.congestionWindow = protocol.ByteCount(float32(c.congestionWindow) * c.RenoBeta()) } else { c.congestionWindow = c.cubic.CongestionWindowAfterPacketLoss(c.congestionWindow) } - // Enforce a minimum congestion window. if c.congestionWindow < c.minCongestionWindow { c.congestionWindow = c.minCongestionWindow } @@ -188,7 +206,7 @@ func (c *cubicSender) OnPacketLost(packetNumber protocol.PacketNumber, lostBytes c.largestSentAtLastCutback = c.largestSentPacketNumber // reset packet count from congestion avoidance mode. We start // counting again when we're out of recovery. - c.congestionWindowCount = 0 + c.numAckedPackets = 0 } func (c *cubicSender) RenoBeta() float32 { @@ -201,32 +219,38 @@ func (c *cubicSender) RenoBeta() float32 { // Called when we receive an ack. Normal TCP tracks how many packets one ack // represents, but quic has a separate ack for each packet. -func (c *cubicSender) maybeIncreaseCwnd(ackedPacketNumber protocol.PacketNumber, ackedBytes protocol.ByteCount, bytesInFlight protocol.ByteCount) { +func (c *cubicSender) maybeIncreaseCwnd( + ackedPacketNumber protocol.PacketNumber, + ackedBytes protocol.ByteCount, + priorInFlight protocol.ByteCount, + eventTime time.Time, +) { // Do not increase the congestion window unless the sender is close to using // the current window. - if !c.isCwndLimited(bytesInFlight) { + if !c.isCwndLimited(priorInFlight) { c.cubic.OnApplicationLimited() return } - if c.congestionWindow >= c.maxTCPCongestionWindow { + if c.congestionWindow >= c.maxCongestionWindow { return } if c.InSlowStart() { // TCP slow start, exponential growth, increase by one for each ACK. - c.congestionWindow++ + c.congestionWindow += protocol.DefaultTCPMSS return } + // Congestion avoidance if c.reno { // Classic Reno congestion avoidance. - c.congestionWindowCount++ + c.numAckedPackets++ // Divide by num_connections to smoothly increase the CWND at a faster // rate than conventional Reno. - if protocol.PacketNumber(c.congestionWindowCount*protocol.ByteCount(c.numConnections)) >= c.congestionWindow { - c.congestionWindow++ - c.congestionWindowCount = 0 + if c.numAckedPackets*uint64(c.numConnections) >= uint64(c.congestionWindow)/uint64(protocol.DefaultTCPMSS) { + c.congestionWindow += protocol.DefaultTCPMSS + c.numAckedPackets = 0 } } else { - c.congestionWindow = utils.MinPacketNumber(c.maxTCPCongestionWindow, c.cubic.CongestionWindowAfterAck(c.congestionWindow, c.rttStats.MinRTT())) + c.congestionWindow = utils.MinByteCount(c.maxCongestionWindow, c.cubic.CongestionWindowAfterAck(ackedBytes, c.congestionWindow, c.rttStats.MinRTT(), eventTime)) } } @@ -282,10 +306,10 @@ func (c *cubicSender) OnConnectionMigration() { c.largestSentAtLastCutback = 0 c.lastCutbackExitedSlowstart = false c.cubic.Reset() - c.congestionWindowCount = 0 + c.numAckedPackets = 0 c.congestionWindow = c.initialCongestionWindow c.slowstartThreshold = c.initialMaxCongestionWindow - c.maxTCPCongestionWindow = c.initialMaxCongestionWindow + c.maxCongestionWindow = c.initialMaxCongestionWindow } // SetSlowStartLargeReduction allows enabling the SSLR experiment diff --git a/internal/congestion/cubic_sender_test.go b/internal/congestion/cubic_sender_test.go index 4a07a405..de3b74fc 100644 --- a/internal/congestion/cubic_sender_test.go +++ b/internal/congestion/cubic_sender_test.go @@ -9,7 +9,7 @@ import ( . "github.com/onsi/gomega" ) -const initialCongestionWindowPackets protocol.PacketNumber = 10 +const initialCongestionWindowPackets = 10 const defaultWindowTCP = protocol.ByteCount(initialCongestionWindowPackets) * protocol.DefaultTCPMSS type mockClock time.Time @@ -22,7 +22,7 @@ func (c *mockClock) Advance(d time.Duration) { *c = mockClock(time.Time(*c).Add(d)) } -const MaxCongestionWindow = protocol.PacketNumber(200) +const MaxCongestionWindow protocol.ByteCount = 200 * protocol.DefaultTCPMSS var _ = Describe("Cubic Sender", func() { var ( @@ -40,13 +40,16 @@ var _ = Describe("Cubic Sender", func() { ackedPacketNumber = 0 clock = mockClock{} rttStats = NewRTTStats() - sender = NewCubicSender(&clock, rttStats, true /*reno*/, initialCongestionWindowPackets, MaxCongestionWindow) + sender = NewCubicSender(&clock, rttStats, true /*reno*/, initialCongestionWindowPackets*protocol.DefaultTCPMSS, MaxCongestionWindow) }) + canSend := func() bool { + return bytesInFlight < sender.GetCongestionWindow() + } + SendAvailableSendWindowLen := func(packetLength protocol.ByteCount) int { - // Send as long as TimeUntilSend returns InfDuration. packetsSent := 0 - for bytesInFlight < sender.GetCongestionWindow() { + for canSend() { sender.OnPacketSent(clock.Now(), bytesInFlight, packetNumber, packetLength, true) packetNumber++ packetsSent++ @@ -56,14 +59,14 @@ var _ = Describe("Cubic Sender", func() { } // Normal is that TCP acks every other segment. - AckNPacketsLen := func(n int, packetLength protocol.ByteCount) { + AckNPackets := func(n int) { rttStats.UpdateRTT(60*time.Millisecond, 0, clock.Now()) sender.MaybeExitSlowStart() for i := 0; i < n; i++ { ackedPacketNumber++ - sender.OnPacketAcked(ackedPacketNumber, packetLength, bytesInFlight) + sender.OnPacketAcked(ackedPacketNumber, protocol.DefaultTCPMSS, bytesInFlight, clock.Now()) } - bytesInFlight -= protocol.ByteCount(n) * packetLength + bytesInFlight -= protocol.ByteCount(n) * protocol.DefaultTCPMSS clock.Advance(time.Millisecond) } @@ -82,18 +85,20 @@ var _ = Describe("Cubic Sender", func() { } SendAvailableSendWindow := func() int { return SendAvailableSendWindowLen(protocol.DefaultTCPMSS) } - AckNPackets := func(n int) { AckNPacketsLen(n, protocol.DefaultTCPMSS) } LoseNPackets := func(n int) { LoseNPacketsLen(n, protocol.DefaultTCPMSS) } It("has the right values at startup", func() { // At startup make sure we are at the default. Expect(sender.GetCongestionWindow()).To(Equal(defaultWindowTCP)) - // At startup make sure we can send. - Expect(sender.TimeUntilSend(0)).To(BeZero()) // Make sure we can send. Expect(sender.TimeUntilSend(0)).To(BeZero()) + Expect(canSend()).To(BeTrue()) // And that window is un-affected. Expect(sender.GetCongestionWindow()).To(Equal(defaultWindowTCP)) + + // Fill the send window with data, then verify that we can't send. + SendAvailableSendWindow() + Expect(canSend()).To(BeFalse()) }) It("paces", func() { @@ -384,8 +389,8 @@ var _ = Describe("Cubic Sender", func() { // Expect the window to decrease to the minimum once the RTO fires // and slow start threshold to be set to 1/2 of the CWND. sender.OnRetransmissionTimeout(true) - Expect(sender.GetCongestionWindow()).To(Equal(protocol.ByteCount(2 * protocol.DefaultTCPMSS))) - Expect(sender.SlowstartThreshold()).To(Equal(protocol.PacketNumber(5))) + Expect(sender.GetCongestionWindow()).To(Equal(2 * protocol.DefaultTCPMSS)) + Expect(sender.SlowstartThreshold()).To(Equal(5 * protocol.DefaultTCPMSS)) }) It("RTO congestion window no retransmission", func() { @@ -397,73 +402,17 @@ var _ = Describe("Cubic Sender", func() { Expect(sender.GetCongestionWindow()).To(Equal(defaultWindowTCP)) }) - It("slow start max send window", func() { - const maxCongestionWindowTCP = 50 - const numberOfAcks = 100 - sender = NewCubicSender(&clock, rttStats, false, initialCongestionWindowPackets, maxCongestionWindowTCP) - - for i := 0; i < numberOfAcks; i++ { - // Send our full send window. - SendAvailableSendWindow() - AckNPackets(2) - } - expectedSendWindow := maxCongestionWindowTCP * protocol.DefaultTCPMSS - Expect(sender.GetCongestionWindow()).To(Equal(protocol.ByteCount(expectedSendWindow))) - }) - - It("tcp reno max congestion window", func() { - const maxCongestionWindowTCP = 50 - const numberOfAcks = 1000 - sender = NewCubicSender(&clock, rttStats, false, initialCongestionWindowPackets, maxCongestionWindowTCP) - - SendAvailableSendWindow() - AckNPackets(2) - // Make sure we fall out of slow start. - LoseNPackets(1) - - for i := 0; i < numberOfAcks; i++ { - // Send our full send window. - SendAvailableSendWindow() - AckNPackets(2) - } - - expectedSendWindow := maxCongestionWindowTCP * protocol.DefaultTCPMSS - Expect(sender.GetCongestionWindow()).To(Equal(protocol.ByteCount(expectedSendWindow))) - }) - - It("tcp cubic max congestion window", func() { - const maxCongestionWindowTCP = 50 - // Set to 10000 to compensate for small cubic alpha. - const numberOfAcks = 10000 - - sender = NewCubicSender(&clock, rttStats, false, initialCongestionWindowPackets, maxCongestionWindowTCP) - - SendAvailableSendWindow() - AckNPackets(2) - // Make sure we fall out of slow start. - LoseNPackets(1) - - for i := 0; i < numberOfAcks; i++ { - // Send our full send window. - SendAvailableSendWindow() - AckNPackets(2) - } - - expectedSendWindow := maxCongestionWindowTCP * protocol.DefaultTCPMSS - Expect(sender.GetCongestionWindow()).To(Equal(protocol.ByteCount(expectedSendWindow))) - }) - It("tcp cubic reset epoch on quiescence", func() { const maxCongestionWindow = 50 const maxCongestionWindowBytes = maxCongestionWindow * protocol.DefaultTCPMSS - sender = NewCubicSender(&clock, rttStats, false, initialCongestionWindowPackets, maxCongestionWindow) + sender = NewCubicSender(&clock, rttStats, false, initialCongestionWindowPackets*protocol.DefaultTCPMSS, maxCongestionWindowBytes) numSent := SendAvailableSendWindow() // Make sure we fall out of slow start. - saveCwnd := sender.GetCongestionWindow() + savedCwnd := sender.GetCongestionWindow() LoseNPackets(1) - Expect(saveCwnd).To(BeNumerically(">", sender.GetCongestionWindow())) + Expect(savedCwnd).To(BeNumerically(">", sender.GetCongestionWindow())) // Ack the rest of the outstanding packets to get out of recovery. for i := 1; i < numSent; i++ { @@ -472,12 +421,12 @@ var _ = Describe("Cubic Sender", func() { Expect(bytesInFlight).To(BeZero()) // Send a new window of data and ack all; cubic growth should occur. - saveCwnd = sender.GetCongestionWindow() + savedCwnd = sender.GetCongestionWindow() numSent = SendAvailableSendWindow() for i := 0; i < numSent; i++ { AckNPackets(1) } - Expect(saveCwnd).To(BeNumerically("<", sender.GetCongestionWindow())) + Expect(savedCwnd).To(BeNumerically("<", sender.GetCongestionWindow())) Expect(maxCongestionWindowBytes).To(BeNumerically(">", sender.GetCongestionWindow())) Expect(bytesInFlight).To(BeZero()) @@ -486,50 +435,10 @@ var _ = Describe("Cubic Sender", func() { // Send new window of data and ack one packet. Cubic epoch should have // been reset; ensure cwnd increase is not dramatic. - saveCwnd = sender.GetCongestionWindow() + savedCwnd = sender.GetCongestionWindow() SendAvailableSendWindow() AckNPackets(1) - Expect(saveCwnd).To(BeNumerically("~", sender.GetCongestionWindow(), protocol.DefaultTCPMSS)) - Expect(maxCongestionWindowBytes).To(BeNumerically(">", sender.GetCongestionWindow())) - }) - - It("tcp cubic shifted epoch on quiescence", func() { - const maxCongestionWindow = 50 - const maxCongestionWindowBytes = maxCongestionWindow * protocol.DefaultTCPMSS - sender = NewCubicSender(&clock, rttStats, false, initialCongestionWindowPackets, maxCongestionWindow) - - numSent := SendAvailableSendWindow() - - // Make sure we fall out of slow start. - saveCwnd := sender.GetCongestionWindow() - LoseNPackets(1) - Expect(saveCwnd).To(BeNumerically(">", sender.GetCongestionWindow())) - - // Ack the rest of the outstanding packets to get out of recovery. - for i := 1; i < numSent; i++ { - AckNPackets(1) - } - Expect(bytesInFlight).To(BeZero()) - - // Send a new window of data and ack all; cubic growth should occur. - saveCwnd = sender.GetCongestionWindow() - numSent = SendAvailableSendWindow() - for i := 0; i < numSent; i++ { - AckNPackets(1) - } - Expect(saveCwnd).To(BeNumerically("<", sender.GetCongestionWindow())) - Expect(maxCongestionWindowBytes).To(BeNumerically(">", sender.GetCongestionWindow())) - Expect(bytesInFlight).To(BeZero()) - - // Quiescent time of 100 seconds - clock.Advance(100 * time.Second) - - // Send new window of data and ack one packet. Cubic epoch should have - // been reset; ensure cwnd increase is not dramatic. - saveCwnd = sender.GetCongestionWindow() - SendAvailableSendWindow() - AckNPackets(1) - Expect(saveCwnd).To(BeNumerically("~", sender.GetCongestionWindow(), protocol.DefaultTCPMSS)) + Expect(savedCwnd).To(BeNumerically("~", sender.GetCongestionWindow(), protocol.DefaultTCPMSS)) Expect(maxCongestionWindowBytes).To(BeNumerically(">", sender.GetCongestionWindow())) }) @@ -549,55 +458,6 @@ var _ = Describe("Cubic Sender", func() { Expect(postLossWindow).To(BeNumerically(">", sender.GetCongestionWindow())) }) - It("don't track ack packets", func() { - // Send a packet with no retransmittable data, and ensure it's not tracked. - Expect(sender.OnPacketSent(clock.Now(), bytesInFlight, packetNumber, protocol.DefaultTCPMSS, false)).To(BeFalse()) - packetNumber++ - - // Send a data packet with retransmittable data, and ensure it is tracked. - Expect(sender.OnPacketSent(clock.Now(), bytesInFlight, packetNumber, protocol.DefaultTCPMSS, true)).To(BeTrue()) - }) - - // TEST_F(TcpCubicSenderPacketsTest, ConfigureInitialWindow) { - // QuicConfig config; - // - // QuicTagVector options; - // options.push_back(kIW03); - // QuicConfigPeer::SetReceivedConnectionOptions(&config, options); - // sender.SetFromConfig(config, Perspective::IS_SERVER); - // Expect( sender.congestion_window()).To(Equal(3u)) - // - // options.clear(); - // options.push_back(kIW10); - // QuicConfigPeer::SetReceivedConnectionOptions(&config, options); - // sender.SetFromConfig(config, Perspective::IS_SERVER); - // Expect( sender.congestion_window()).To(Equal(10u)) - // - // options.clear(); - // options.push_back(kIW20); - // QuicConfigPeer::SetReceivedConnectionOptions(&config, options); - // sender.SetFromConfig(config, Perspective::IS_SERVER); - // Expect( sender.congestion_window()).To(Equal(20u)) - // - // options.clear(); - // options.push_back(kIW50); - // QuicConfigPeer::SetReceivedConnectionOptions(&config, options); - // sender.SetFromConfig(config, Perspective::IS_SERVER); - // Expect( sender.congestion_window()).To(Equal(50u)) - // } - // - // TEST_F(TcpCubicSenderPacketsTest, ConfigureMinimumWindow) { - // QuicConfig config; - // - // // Verify that kCOPT: kMIN1 forces the min CWND to 1 packet. - // QuicTagVector options; - // options.push_back(kMIN1); - // QuicConfigPeer::SetReceivedConnectionOptions(&config, options); - // sender.SetFromConfig(config, Perspective::IS_SERVER); - // sender.OnRetransmissionTimeout(true); - // Expect( sender.congestion_window()).To(Equal(1u)) - // } - It("2 connection congestion avoidance at end of recovery", func() { sender.SetNumEmulatedConnections(2) // Ack 10 packets in 5 acks to raise the CWND to 20. @@ -698,67 +558,6 @@ var _ = Describe("Cubic Sender", func() { Expect(sender.GetCongestionWindow()).To(Equal(expectedSendWindow)) }) - // TEST_F(TcpCubicSenderPacketsTest, BandwidthResumption) { - // // Test that when provided with CachedNetworkParameters and opted in to the - // // bandwidth resumption experiment, that the TcpCubicSenderPackets sets - // // initial CWND appropriately. - // - // // Set some common values. - // CachedNetworkParameters cached_network_params; - // const QuicPacketCount kNumberOfPackets = 123; - // const int kBandwidthEstimateBytesPerSecond = - // kNumberOfPackets * protocol.DefaultTCPMSS; - // cached_network_params.set_bandwidth_estimate_bytes_per_second( - // kBandwidthEstimateBytesPerSecond); - // cached_network_params.set_min_rtt_ms(1000); - // - // // Make sure that a bandwidth estimate results in a changed CWND. - // cached_network_params.set_timestamp(clock.WallNow().ToUNIXSeconds() - - // (kNumSecondsPerHour - 1)); - // sender.ResumeConnectionState(cached_network_params, false); - // Expect( sender.congestion_window()).To(Equal(kNumberOfPackets)) - // - // // Resumed CWND is limited to be in a sensible range. - // cached_network_params.set_bandwidth_estimate_bytes_per_second( - // (maxCongestionWindow + 1) * protocol.DefaultTCPMSS); - // sender.ResumeConnectionState(cached_network_params, false); - // Expect( sender.congestion_window()).To(Equal(maxCongestionWindow)) - // - // cached_network_params.set_bandwidth_estimate_bytes_per_second( - // (kMinCongestionWindowForBandwidthResumption - 1) * protocol.DefaultTCPMSS); - // sender.ResumeConnectionState(cached_network_params, false); - // EXPECT_EQ(kMinCongestionWindowForBandwidthResumption, - // sender.congestion_window()); - // - // // Resume to the max value. - // cached_network_params.set_max_bandwidth_estimate_bytes_per_second( - // (kMinCongestionWindowForBandwidthResumption + 10) * protocol.DefaultTCPMSS); - // sender.ResumeConnectionState(cached_network_params, true); - // EXPECT_EQ((kMinCongestionWindowForBandwidthResumption + 10) * protocol.DefaultTCPMSS, - // sender.GetCongestionWindow()); - // } - // - // TEST_F(TcpCubicSenderPacketsTest, PaceBelowCWND) { - // QuicConfig config; - // - // // Verify that kCOPT: kMIN4 forces the min CWND to 1 packet, but allows up - // // to 4 to be sent. - // QuicTagVector options; - // options.push_back(kMIN4); - // QuicConfigPeer::SetReceivedConnectionOptions(&config, options); - // sender.SetFromConfig(config, Perspective::IS_SERVER); - // sender.OnRetransmissionTimeout(true); - // Expect( sender.congestion_window()).To(Equal(1u)) - // EXPECT_TRUE( - // sender.TimeUntilSend(QuicTime::Zero(), protocol.DefaultTCPMSS).IsZero()); - // EXPECT_TRUE( - // sender.TimeUntilSend(QuicTime::Zero(), 2 * protocol.DefaultTCPMSS).IsZero()); - // EXPECT_TRUE( - // sender.TimeUntilSend(QuicTime::Zero(), 3 * protocol.DefaultTCPMSS).IsZero()); - // EXPECT_FALSE( - // sender.TimeUntilSend(QuicTime::Zero(), 4 * protocol.DefaultTCPMSS).IsZero()); - // } - It("reset after connection migration", func() { Expect(sender.GetCongestionWindow()).To(Equal(defaultWindowTCP)) Expect(sender.SlowstartThreshold()).To(Equal(MaxCongestionWindow)) @@ -782,7 +581,7 @@ var _ = Describe("Cubic Sender", func() { // start threshold is also updated. expectedSendWindow = protocol.ByteCount(float32(expectedSendWindow) * renoBeta) Expect(sender.GetCongestionWindow()).To(Equal(expectedSendWindow)) - Expect(sender.SlowstartThreshold()).To(Equal(protocol.PacketNumber(expectedSendWindow / protocol.DefaultTCPMSS))) + Expect(sender.SlowstartThreshold()).To(Equal(expectedSendWindow)) // Resets cwnd and slow start threshold on connection migrations. sender.OnConnectionMigration() @@ -790,4 +589,52 @@ var _ = Describe("Cubic Sender", func() { Expect(sender.SlowstartThreshold()).To(Equal(MaxCongestionWindow)) Expect(sender.HybridSlowStart().Started()).To(BeFalse()) }) + + It("default max cwnd", func() { + sender = NewCubicSender(&clock, rttStats, true /*reno*/, initialCongestionWindowPackets*protocol.DefaultTCPMSS, protocol.DefaultMaxCongestionWindow) + + defaultMaxCongestionWindowPackets := protocol.DefaultMaxCongestionWindow / protocol.DefaultTCPMSS + for i := 1; i < int(defaultMaxCongestionWindowPackets); i++ { + sender.MaybeExitSlowStart() + sender.OnPacketAcked(protocol.PacketNumber(i), 1350, sender.GetCongestionWindow(), clock.Now()) + } + Expect(sender.GetCongestionWindow()).To(Equal(protocol.DefaultMaxCongestionWindow)) + }) + + It("limit cwnd increase in congestion avoidance", func() { + // Enable Cubic. + sender = NewCubicSender(&clock, rttStats, false, initialCongestionWindowPackets*protocol.DefaultTCPMSS, MaxCongestionWindow) + numSent := SendAvailableSendWindow() + + // Make sure we fall out of slow start. + savedCwnd := sender.GetCongestionWindow() + LoseNPackets(1) + Expect(savedCwnd).To(BeNumerically(">", sender.GetCongestionWindow())) + + // Ack the rest of the outstanding packets to get out of recovery. + for i := 1; i < numSent; i++ { + AckNPackets(1) + } + Expect(bytesInFlight).To(BeZero()) + + savedCwnd = sender.GetCongestionWindow() + SendAvailableSendWindow() + + // Ack packets until the CWND increases. + for sender.GetCongestionWindow() == savedCwnd { + AckNPackets(1) + SendAvailableSendWindow() + } + // Bytes in flight may be larger than the CWND if the CWND isn't an exact + // multiple of the packet sizes being sent. + Expect(bytesInFlight).To(BeNumerically(">=", sender.GetCongestionWindow())) + savedCwnd = sender.GetCongestionWindow() + + // Advance time 2 seconds waiting for an ack. + clock.Advance(2 * time.Second) + + // Ack two packets. The CWND should increase by only one packet. + AckNPackets(2) + Expect(sender.GetCongestionWindow()).To(Equal(savedCwnd + protocol.DefaultTCPMSS)) + }) }) diff --git a/internal/congestion/cubic_test.go b/internal/congestion/cubic_test.go index 8372df0e..52cae145 100644 --- a/internal/congestion/cubic_test.go +++ b/internal/congestion/cubic_test.go @@ -11,7 +11,9 @@ import ( const numConnections uint32 = 2 const nConnectionBeta float32 = (float32(numConnections) - 1 + beta) / float32(numConnections) +const nConnectionBetaLastMax float32 = (float32(numConnections) - 1 + betaLastMax) / float32(numConnections) const nConnectionAlpha float32 = 3 * float32(numConnections) * float32(numConnections) * (1 - nConnectionBeta) / (1 + nConnectionBeta) +const maxCubicTimeInterval = 30 * time.Millisecond var _ = Describe("Cubic", func() { var ( @@ -24,88 +26,211 @@ var _ = Describe("Cubic", func() { cubic = NewCubic(&clock) }) - It("works above origin", func() { + renoCwnd := func(currentCwnd protocol.ByteCount) protocol.ByteCount { + return currentCwnd + protocol.ByteCount(float32(protocol.DefaultTCPMSS)*nConnectionAlpha*float32(protocol.DefaultTCPMSS)/float32(currentCwnd)) + } + + cubicConvexCwnd := func(initialCwnd protocol.ByteCount, rtt, elapsedTime time.Duration) protocol.ByteCount { + offset := protocol.ByteCount((elapsedTime+rtt)/time.Microsecond) << 10 / 1000000 + deltaCongestionWindow := 410 * offset * offset * offset * protocol.DefaultTCPMSS >> 40 + return initialCwnd + deltaCongestionWindow + } + + It("works above origin (with tighter bounds)", func() { // Convex growth. const rttMin = 100 * time.Millisecond const rttMinS = float32(rttMin/time.Millisecond) / 1000.0 - currentCwnd := protocol.PacketNumber(10) - // Without the signed-integer, cubic-convex fix, we mistakenly - // increment cwnd after only one_ms_ and a single ack. - expectedCwnd := currentCwnd - // Initialize the state. + currentCwnd := 10 * protocol.DefaultTCPMSS + initialCwnd := currentCwnd + clock.Advance(time.Millisecond) initialTime := clock.Now() - currentCwnd = cubic.CongestionWindowAfterAck(currentCwnd, rttMin) - Expect(currentCwnd).To(Equal(expectedCwnd)) - currentCwnd = expectedCwnd - initialCwnd := currentCwnd + expectedFirstCwnd := renoCwnd(currentCwnd) + currentCwnd = cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, initialTime) + Expect(expectedFirstCwnd).To(Equal(currentCwnd)) + // Normal TCP phase. // The maximum number of expected reno RTTs can be calculated by // finding the point where the cubic curve and the reno curve meet. - maxRenoRtts := int(math.Sqrt(float64(nConnectionAlpha/(0.4*rttMinS*rttMinS*rttMinS))) - 1) + maxRenoRtts := int(math.Sqrt(float64(nConnectionAlpha/(0.4*rttMinS*rttMinS*rttMinS))) - 2) for i := 0; i < maxRenoRtts; i++ { - maxPerAckCwnd := currentCwnd - for n := uint64(1); n < uint64(float32(maxPerAckCwnd)/nConnectionAlpha); n++ { + // Alternatively, we expect it to increase by one, every time we + // receive current_cwnd/Alpha acks back. (This is another way of + // saying we expect cwnd to increase by approximately Alpha once + // we receive current_cwnd number ofacks back). + numAcksThisEpoch := int(float32(currentCwnd/protocol.DefaultTCPMSS) / nConnectionAlpha) + + initialCwndThisEpoch := currentCwnd + for n := 0; n < numAcksThisEpoch; n++ { // Call once per ACK. - nextCwnd := cubic.CongestionWindowAfterAck(currentCwnd, rttMin) - Expect(nextCwnd).To(Equal(currentCwnd)) + expectedNextCwnd := renoCwnd(currentCwnd) + currentCwnd = cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now()) + Expect(currentCwnd).To(Equal(expectedNextCwnd)) } + // Our byte-wise Reno implementation is an estimate. We expect + // the cwnd to increase by approximately one MSS every + // cwnd/kDefaultTCPMSS/Alpha acks, but it may be off by as much as + // half a packet for smaller values of current_cwnd. + cwndChangeThisEpoch := currentCwnd - initialCwndThisEpoch + Expect(cwndChangeThisEpoch).To(BeNumerically("~", protocol.DefaultTCPMSS, protocol.DefaultTCPMSS/2)) clock.Advance(100 * time.Millisecond) - currentCwnd = cubic.CongestionWindowAfterAck(currentCwnd, rttMin) - // When we fix convex mode and the uint64 arithmetic, we - // increase the expected_cwnd only after after the first 100ms, - // rather than after the initial 1ms. - expectedCwnd++ - Expect(currentCwnd).To(Equal(expectedCwnd)) } - // Cubic phase. - for i := 0; i < 52; i++ { - for n := protocol.PacketNumber(1); n < currentCwnd; n++ { - // Call once per ACK. - Expect(cubic.CongestionWindowAfterAck(currentCwnd, rttMin)).To(Equal(currentCwnd)) + + for i := 0; i < 54; i++ { + maxAcksThisEpoch := currentCwnd / protocol.DefaultTCPMSS + interval := time.Duration(100*1000/maxAcksThisEpoch) * time.Microsecond + for n := 0; n < int(maxAcksThisEpoch); n++ { + clock.Advance(interval) + currentCwnd = cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now()) + expectedCwnd := cubicConvexCwnd(initialCwnd, rttMin, clock.Now().Sub(initialTime)) + // If we allow per-ack updates, every update is a small cubic update. + Expect(currentCwnd).To(Equal(expectedCwnd)) } - clock.Advance(100 * time.Millisecond) - currentCwnd = cubic.CongestionWindowAfterAck(currentCwnd, rttMin) } - // Total time elapsed so far; add min_rtt (0.1s) here as well. - elapsedTimeS := float32(clock.Now().Sub(initialTime)+rttMin) / float32(time.Second) - // |expected_cwnd| is initial value of cwnd + K * t^3, where K = 0.4. - expectedCwnd = initialCwnd + protocol.PacketNumber((elapsedTimeS*elapsedTimeS*elapsedTimeS*410)/1024) + expectedCwnd := cubicConvexCwnd(initialCwnd, rttMin, clock.Now().Sub(initialTime)) + currentCwnd = cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now()) Expect(currentCwnd).To(Equal(expectedCwnd)) }) - It("manages loss events", func() { + It("works above the origin with fine grained cubing", func() { + // Start the test with an artificially large cwnd to prevent Reno + // from over-taking cubic. + currentCwnd := 1000 * protocol.DefaultTCPMSS + initialCwnd := currentCwnd rttMin := 100 * time.Millisecond - currentCwnd := protocol.PacketNumber(422) - expectedCwnd := currentCwnd + clock.Advance(time.Millisecond) + initialTime := clock.Now() + + currentCwnd = cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now()) + clock.Advance(600 * time.Millisecond) + currentCwnd = cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now()) + + // We expect the algorithm to perform only non-zero, fine-grained cubic + // increases on every ack in this case. + for i := 0; i < 100; i++ { + clock.Advance(10 * time.Millisecond) + expectedCwnd := cubicConvexCwnd(initialCwnd, rttMin, clock.Now().Sub(initialTime)) + nextCwnd := cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now()) + // Make sure we are performing cubic increases. + Expect(nextCwnd).To(Equal(expectedCwnd)) + // Make sure that these are non-zero, less-than-packet sized increases. + Expect(nextCwnd).To(BeNumerically(">", currentCwnd)) + cwndDelta := nextCwnd - currentCwnd + Expect(protocol.DefaultTCPMSS / 10).To(BeNumerically(">", cwndDelta)) + currentCwnd = nextCwnd + } + }) + + It("handles per ack updates", func() { + // Start the test with a large cwnd and RTT, to force the first + // increase to be a cubic increase. + initialCwndPackets := 150 + currentCwnd := protocol.ByteCount(initialCwndPackets) * protocol.DefaultTCPMSS + rttMin := 350 * time.Millisecond + + // Initialize the epoch + clock.Advance(time.Millisecond) + // Keep track of the growth of the reno-equivalent cwnd. + rCwnd := renoCwnd(currentCwnd) + currentCwnd = cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now()) + initialCwnd := currentCwnd + + // Simulate the return of cwnd packets in less than + // MaxCubicInterval() time. + maxAcks := int(float32(initialCwndPackets) / nConnectionAlpha) + interval := maxCubicTimeInterval / time.Duration(maxAcks+1) + + // In this scenario, the first increase is dictated by the cubic + // equation, but it is less than one byte, so the cwnd doesn't + // change. Normally, without per-ack increases, any cwnd plateau + // will cause the cwnd to be pinned for MaxCubicTimeInterval(). If + // we enable per-ack updates, the cwnd will continue to grow, + // regardless of the temporary plateau. + clock.Advance(interval) + rCwnd = renoCwnd(rCwnd) + Expect(cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now())).To(Equal(currentCwnd)) + for i := 1; i < maxAcks; i++ { + clock.Advance(interval) + nextCwnd := cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now()) + rCwnd = renoCwnd(rCwnd) + // The window shoud increase on every ack. + Expect(nextCwnd).To(BeNumerically(">", currentCwnd)) + Expect(nextCwnd).To(Equal(rCwnd)) + currentCwnd = nextCwnd + } + + // After all the acks are returned from the epoch, we expect the + // cwnd to have increased by nearly one packet. (Not exactly one + // packet, because our byte-wise Reno algorithm is always a slight + // under-estimation). Without per-ack updates, the current_cwnd + // would otherwise be unchanged. + minimumExpectedIncrease := protocol.DefaultTCPMSS * 9 / 10 + Expect(currentCwnd).To(BeNumerically(">", initialCwnd+minimumExpectedIncrease)) + }) + + It("handles loss events", func() { + rttMin := 100 * time.Millisecond + currentCwnd := 422 * protocol.DefaultTCPMSS + expectedCwnd := renoCwnd(currentCwnd) // Initialize the state. clock.Advance(time.Millisecond) - Expect(cubic.CongestionWindowAfterAck(currentCwnd, rttMin)).To(Equal(expectedCwnd)) - expectedCwnd = protocol.PacketNumber(float32(currentCwnd) * nConnectionBeta) + Expect(cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now())).To(Equal(expectedCwnd)) + + // On the first loss, the last max congestion window is set to the + // congestion window before the loss. + preLossCwnd := currentCwnd + Expect(cubic.lastMaxCongestionWindow).To(BeZero()) + expectedCwnd = protocol.ByteCount(float32(currentCwnd) * nConnectionBeta) Expect(cubic.CongestionWindowAfterPacketLoss(currentCwnd)).To(Equal(expectedCwnd)) - expectedCwnd = protocol.PacketNumber(float32(currentCwnd) * nConnectionBeta) + Expect(cubic.lastMaxCongestionWindow).To(Equal(preLossCwnd)) + currentCwnd = expectedCwnd + + // On the second loss, the current congestion window has not yet + // reached the last max congestion window. The last max congestion + // window will be reduced by an additional backoff factor to allow + // for competition. + preLossCwnd = currentCwnd + expectedCwnd = protocol.ByteCount(float32(currentCwnd) * nConnectionBeta) Expect(cubic.CongestionWindowAfterPacketLoss(currentCwnd)).To(Equal(expectedCwnd)) + currentCwnd = expectedCwnd + Expect(preLossCwnd).To(BeNumerically(">", cubic.lastMaxCongestionWindow)) + expectedLastMax := protocol.ByteCount(float32(preLossCwnd) * nConnectionBetaLastMax) + Expect(cubic.lastMaxCongestionWindow).To(Equal(expectedLastMax)) + Expect(expectedCwnd).To(BeNumerically("<", cubic.lastMaxCongestionWindow)) + // Simulate an increase, and check that we are below the origin. + currentCwnd = cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now()) + Expect(cubic.lastMaxCongestionWindow).To(BeNumerically(">", currentCwnd)) + + // On the final loss, simulate the condition where the congestion + // window had a chance to grow nearly to the last congestion window. + currentCwnd = cubic.lastMaxCongestionWindow - 1 + preLossCwnd = currentCwnd + expectedCwnd = protocol.ByteCount(float32(currentCwnd) * nConnectionBeta) + Expect(cubic.CongestionWindowAfterPacketLoss(currentCwnd)).To(Equal(expectedCwnd)) + expectedLastMax = preLossCwnd + Expect(cubic.lastMaxCongestionWindow).To(Equal(expectedLastMax)) }) It("works below origin", func() { // Concave growth. rttMin := 100 * time.Millisecond - currentCwnd := protocol.PacketNumber(422) - expectedCwnd := currentCwnd + currentCwnd := 422 * protocol.DefaultTCPMSS + expectedCwnd := renoCwnd(currentCwnd) // Initialize the state. clock.Advance(time.Millisecond) - Expect(cubic.CongestionWindowAfterAck(currentCwnd, rttMin)).To(Equal(expectedCwnd)) - expectedCwnd = protocol.PacketNumber(float32(currentCwnd) * nConnectionBeta) + Expect(cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now())).To(Equal(expectedCwnd)) + + expectedCwnd = protocol.ByteCount(float32(currentCwnd) * nConnectionBeta) Expect(cubic.CongestionWindowAfterPacketLoss(currentCwnd)).To(Equal(expectedCwnd)) currentCwnd = expectedCwnd // First update after loss to initialize the epoch. - currentCwnd = cubic.CongestionWindowAfterAck(currentCwnd, rttMin) + currentCwnd = cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now()) // Cubic phase. for i := 0; i < 40; i++ { clock.Advance(100 * time.Millisecond) - currentCwnd = cubic.CongestionWindowAfterAck(currentCwnd, rttMin) + currentCwnd = cubic.CongestionWindowAfterAck(protocol.DefaultTCPMSS, currentCwnd, rttMin, clock.Now()) } - expectedCwnd = 422 + expectedCwnd = 553632 Expect(currentCwnd).To(Equal(expectedCwnd)) }) }) diff --git a/internal/congestion/interface.go b/internal/congestion/interface.go index 28950dd0..7c27da64 100644 --- a/internal/congestion/interface.go +++ b/internal/congestion/interface.go @@ -9,11 +9,11 @@ import ( // A SendAlgorithm performs congestion control and calculates the congestion window type SendAlgorithm interface { TimeUntilSend(bytesInFlight protocol.ByteCount) time.Duration - OnPacketSent(sentTime time.Time, bytesInFlight protocol.ByteCount, packetNumber protocol.PacketNumber, bytes protocol.ByteCount, isRetransmittable bool) bool + OnPacketSent(sentTime time.Time, bytesInFlight protocol.ByteCount, packetNumber protocol.PacketNumber, bytes protocol.ByteCount, isRetransmittable bool) GetCongestionWindow() protocol.ByteCount MaybeExitSlowStart() - OnPacketAcked(number protocol.PacketNumber, ackedBytes protocol.ByteCount, bytesInFlight protocol.ByteCount) - OnPacketLost(number protocol.PacketNumber, lostBytes protocol.ByteCount, bytesInFlight protocol.ByteCount) + OnPacketAcked(number protocol.PacketNumber, ackedBytes protocol.ByteCount, priorInFlight protocol.ByteCount, eventTime time.Time) + OnPacketLost(number protocol.PacketNumber, lostBytes protocol.ByteCount, priorInFlight protocol.ByteCount) SetNumEmulatedConnections(n int) OnRetransmissionTimeout(packetsRetransmitted bool) OnConnectionMigration() @@ -30,7 +30,7 @@ type SendAlgorithmWithDebugInfo interface { // Stuff only used in testing HybridSlowStart() *HybridSlowStart - SlowstartThreshold() protocol.PacketNumber + SlowstartThreshold() protocol.ByteCount RenoBeta() float32 InRecovery() bool } diff --git a/internal/congestion/prr_sender.go b/internal/congestion/prr_sender.go index 18a3736a..5c807d19 100644 --- a/internal/congestion/prr_sender.go +++ b/internal/congestion/prr_sender.go @@ -1,10 +1,7 @@ package congestion import ( - "time" - "github.com/lucas-clemente/quic-go/internal/protocol" - "github.com/lucas-clemente/quic-go/internal/utils" ) // PrrSender implements the Proportional Rate Reduction (PRR) per RFC 6937 @@ -23,9 +20,9 @@ func (p *PrrSender) OnPacketSent(sentBytes protocol.ByteCount) { // OnPacketLost should be called on the first loss that triggers a recovery // period and all other methods in this class should only be called when in // recovery. -func (p *PrrSender) OnPacketLost(bytesInFlight protocol.ByteCount) { +func (p *PrrSender) OnPacketLost(priorInFlight protocol.ByteCount) { p.bytesSentSinceLoss = 0 - p.bytesInFlightBeforeLoss = bytesInFlight + p.bytesInFlightBeforeLoss = priorInFlight p.bytesDeliveredSinceLoss = 0 p.ackCountSinceLoss = 0 } @@ -36,28 +33,22 @@ func (p *PrrSender) OnPacketAcked(ackedBytes protocol.ByteCount) { p.ackCountSinceLoss++ } -// TimeUntilSend calculates the time until a packet can be sent -func (p *PrrSender) TimeUntilSend(congestionWindow, bytesInFlight, slowstartThreshold protocol.ByteCount) time.Duration { +// CanSend returns if packets can be sent +func (p *PrrSender) CanSend(congestionWindow, bytesInFlight, slowstartThreshold protocol.ByteCount) bool { // Return QuicTime::Zero In order to ensure limited transmit always works. if p.bytesSentSinceLoss == 0 || bytesInFlight < protocol.DefaultTCPMSS { - return 0 + return true } if congestionWindow > bytesInFlight { // During PRR-SSRB, limit outgoing packets to 1 extra MSS per ack, instead // of sending the entire available window. This prevents burst retransmits // when more packets are lost than the CWND reduction. // limit = MAX(prr_delivered - prr_out, DeliveredData) + MSS - if p.bytesDeliveredSinceLoss+p.ackCountSinceLoss*protocol.DefaultTCPMSS <= p.bytesSentSinceLoss { - return utils.InfDuration - } - return 0 + return p.bytesDeliveredSinceLoss+p.ackCountSinceLoss*protocol.DefaultTCPMSS > p.bytesSentSinceLoss } // Implement Proportional Rate Reduction (RFC6937). // Checks a simplified version of the PRR formula that doesn't use division: // AvailableSendWindow = // CEIL(prr_delivered * ssthresh / BytesInFlightAtLoss) - prr_sent - if p.bytesDeliveredSinceLoss*slowstartThreshold > p.bytesSentSinceLoss*p.bytesInFlightBeforeLoss { - return 0 - } - return utils.InfDuration + return p.bytesDeliveredSinceLoss*slowstartThreshold > p.bytesSentSinceLoss*p.bytesInFlightBeforeLoss } diff --git a/internal/congestion/prr_sender_test.go b/internal/congestion/prr_sender_test.go index 52f41714..0786b9bd 100644 --- a/internal/congestion/prr_sender_test.go +++ b/internal/congestion/prr_sender_test.go @@ -5,7 +5,6 @@ import ( . "github.com/onsi/gomega" "github.com/lucas-clemente/quic-go/internal/protocol" - "github.com/lucas-clemente/quic-go/internal/utils" ) var _ = Describe("PRR sender", func() { @@ -27,11 +26,11 @@ var _ = Describe("PRR sender", func() { // Ack a packet. PRR allows one packet to leave immediately. prr.OnPacketAcked(protocol.DefaultTCPMSS) bytesInFlight -= protocol.DefaultTCPMSS - Expect(prr.TimeUntilSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeZero()) + Expect(prr.CanSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeTrue()) // Send retransmission. prr.OnPacketSent(protocol.DefaultTCPMSS) // PRR shouldn't allow sending any more packets. - Expect(prr.TimeUntilSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(Equal(utils.InfDuration)) + Expect(prr.CanSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeFalse()) // One packet is lost, and one ack was consumed above. PRR now paces // transmissions through the remaining 48 acks. PRR will alternatively @@ -40,11 +39,11 @@ var _ = Describe("PRR sender", func() { // Ack a packet. PRR shouldn't allow sending a packet in response. prr.OnPacketAcked(protocol.DefaultTCPMSS) bytesInFlight -= protocol.DefaultTCPMSS - Expect(prr.TimeUntilSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(Equal(utils.InfDuration)) + Expect(prr.CanSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeFalse()) // Ack another packet. PRR should now allow sending a packet in response. prr.OnPacketAcked(protocol.DefaultTCPMSS) bytesInFlight -= protocol.DefaultTCPMSS - Expect(prr.TimeUntilSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeZero()) + Expect(prr.CanSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeTrue()) // Send a packet in response. prr.OnPacketSent(protocol.DefaultTCPMSS) bytesInFlight += protocol.DefaultTCPMSS @@ -57,7 +56,7 @@ var _ = Describe("PRR sender", func() { // Ack a packet. prr.OnPacketAcked(protocol.DefaultTCPMSS) bytesInFlight -= protocol.DefaultTCPMSS - Expect(prr.TimeUntilSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeZero()) + Expect(prr.CanSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeTrue()) // Send a packet in response, since PRR allows it. prr.OnPacketSent(protocol.DefaultTCPMSS) bytesInFlight += protocol.DefaultTCPMSS @@ -65,7 +64,7 @@ var _ = Describe("PRR sender", func() { // Since bytes_in_flight is equal to the congestion_window, // PRR disallows sending. Expect(bytesInFlight).To(Equal(congestionWindow)) - Expect(prr.TimeUntilSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(Equal(utils.InfDuration)) + Expect(prr.CanSend(congestionWindow, bytesInFlight, sshthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeFalse()) } }) @@ -86,20 +85,20 @@ var _ = Describe("PRR sender", func() { bytesInFlight -= protocol.DefaultTCPMSS // PRR-SSRB should allow two packets to be sent. for j := 0; j < 2; j++ { - Expect(prr.TimeUntilSend(congestionWindow, bytesInFlight, ssthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeZero()) + Expect(prr.CanSend(congestionWindow, bytesInFlight, ssthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeTrue()) // Send a packet in response. prr.OnPacketSent(protocol.DefaultTCPMSS) bytesInFlight += protocol.DefaultTCPMSS } // PRR should allow no more than 2 packets in response to an ack. - Expect(prr.TimeUntilSend(congestionWindow, bytesInFlight, ssthreshAfterLoss*protocol.DefaultTCPMSS)).To(Equal(utils.InfDuration)) + Expect(prr.CanSend(congestionWindow, bytesInFlight, ssthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeFalse()) } // Out of SSRB mode, PRR allows one send in response to each ack. for i := 0; i < 10; i++ { prr.OnPacketAcked(protocol.DefaultTCPMSS) bytesInFlight -= protocol.DefaultTCPMSS - Expect(prr.TimeUntilSend(congestionWindow, bytesInFlight, ssthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeZero()) + Expect(prr.CanSend(congestionWindow, bytesInFlight, ssthreshAfterLoss*protocol.DefaultTCPMSS)).To(BeTrue()) // Send a packet in response. prr.OnPacketSent(protocol.DefaultTCPMSS) bytesInFlight += protocol.DefaultTCPMSS diff --git a/internal/mocks/congestion.go b/internal/mocks/congestion.go index b86ccfea..d0749252 100644 --- a/internal/mocks/congestion.go +++ b/internal/mocks/congestion.go @@ -68,13 +68,13 @@ func (mr *MockSendAlgorithmMockRecorder) OnConnectionMigration() *gomock.Call { } // OnPacketAcked mocks base method -func (m *MockSendAlgorithm) OnPacketAcked(arg0 protocol.PacketNumber, arg1, arg2 protocol.ByteCount) { - m.ctrl.Call(m, "OnPacketAcked", arg0, arg1, arg2) +func (m *MockSendAlgorithm) OnPacketAcked(arg0 protocol.PacketNumber, arg1, arg2 protocol.ByteCount, arg3 time.Time) { + m.ctrl.Call(m, "OnPacketAcked", arg0, arg1, arg2, arg3) } // OnPacketAcked indicates an expected call of OnPacketAcked -func (mr *MockSendAlgorithmMockRecorder) OnPacketAcked(arg0, arg1, arg2 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnPacketAcked", reflect.TypeOf((*MockSendAlgorithm)(nil).OnPacketAcked), arg0, arg1, arg2) +func (mr *MockSendAlgorithmMockRecorder) OnPacketAcked(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnPacketAcked", reflect.TypeOf((*MockSendAlgorithm)(nil).OnPacketAcked), arg0, arg1, arg2, arg3) } // OnPacketLost mocks base method @@ -88,10 +88,8 @@ func (mr *MockSendAlgorithmMockRecorder) OnPacketLost(arg0, arg1, arg2 interface } // OnPacketSent mocks base method -func (m *MockSendAlgorithm) OnPacketSent(arg0 time.Time, arg1 protocol.ByteCount, arg2 protocol.PacketNumber, arg3 protocol.ByteCount, arg4 bool) bool { - ret := m.ctrl.Call(m, "OnPacketSent", arg0, arg1, arg2, arg3, arg4) - ret0, _ := ret[0].(bool) - return ret0 +func (m *MockSendAlgorithm) OnPacketSent(arg0 time.Time, arg1 protocol.ByteCount, arg2 protocol.PacketNumber, arg3 protocol.ByteCount, arg4 bool) { + m.ctrl.Call(m, "OnPacketSent", arg0, arg1, arg2, arg3, arg4) } // OnPacketSent indicates an expected call of OnPacketSent diff --git a/internal/protocol/server_parameters.go b/internal/protocol/server_parameters.go index 46b69ead..961986a2 100644 --- a/internal/protocol/server_parameters.go +++ b/internal/protocol/server_parameters.go @@ -12,11 +12,13 @@ const MaxPacketSizeIPv6 = 1232 // This makes sure that those packets can always be retransmitted without splitting the contained StreamFrames const NonForwardSecurePacketSizeReduction = 50 +const defaultMaxCongestionWindowPackets = 1000 + // DefaultMaxCongestionWindow is the default for the max congestion window -const DefaultMaxCongestionWindow = 1000 +const DefaultMaxCongestionWindow ByteCount = defaultMaxCongestionWindowPackets * DefaultTCPMSS // InitialCongestionWindow is the initial congestion window in QUIC packets -const InitialCongestionWindow = 32 +const InitialCongestionWindow ByteCount = 32 * DefaultTCPMSS // MaxUndecryptablePackets limits the number of undecryptable packets that a // session queues for later until it sends a public reset. @@ -70,7 +72,7 @@ const MaxStreamsMultiplier = 1.1 const MaxStreamsMinimumIncrement = 10 // MaxSessionUnprocessedPackets is the max number of packets stored in each session that are not yet processed. -const MaxSessionUnprocessedPackets = DefaultMaxCongestionWindow +const MaxSessionUnprocessedPackets = defaultMaxCongestionWindowPackets // SkipPacketAveragePeriodLength is the average period length in which one packet number is skipped to prevent an Optimistic ACK attack const SkipPacketAveragePeriodLength PacketNumber = 500 @@ -84,7 +86,7 @@ const CookieExpiryTime = 24 * time.Hour // MaxOutstandingSentPackets is maximum number of packets saved for retransmission. // When reached, it imposes a soft limit on sending new packets: // Sending ACKs and retransmission is still allowed, but now new regular packets can be sent. -const MaxOutstandingSentPackets = 2 * DefaultMaxCongestionWindow +const MaxOutstandingSentPackets = 2 * defaultMaxCongestionWindowPackets // MaxTrackedSentPackets is maximum number of sent packets saved for retransmission. // When reached, no more packets will be sent. @@ -92,7 +94,7 @@ const MaxOutstandingSentPackets = 2 * DefaultMaxCongestionWindow const MaxTrackedSentPackets = MaxOutstandingSentPackets * 5 / 4 // MaxTrackedReceivedAckRanges is the maximum number of ACK ranges tracked -const MaxTrackedReceivedAckRanges = DefaultMaxCongestionWindow +const MaxTrackedReceivedAckRanges = defaultMaxCongestionWindowPackets // MaxNonRetransmittableAcks is the maximum number of packets containing an ACK, but no retransmittable frames, that we send in a row const MaxNonRetransmittableAcks = 19 diff --git a/internal/utils/minmax.go b/internal/utils/minmax.go index ef71c7fa..4394ab04 100644 --- a/internal/utils/minmax.go +++ b/internal/utils/minmax.go @@ -82,6 +82,14 @@ func MinByteCount(a, b protocol.ByteCount) protocol.ByteCount { return b } +// MaxByteCount returns the maximum of two ByteCounts +func MaxByteCount(a, b protocol.ByteCount) protocol.ByteCount { + if a < b { + return b + } + return a +} + // MaxDuration returns the max duration func MaxDuration(a, b time.Duration) time.Duration { if a > b { diff --git a/internal/utils/minmax_test.go b/internal/utils/minmax_test.go index 0a5dbb08..95816372 100644 --- a/internal/utils/minmax_test.go +++ b/internal/utils/minmax_test.go @@ -35,6 +35,11 @@ var _ = Describe("Min / Max", func() { Expect(MaxInt64(7, 5)).To(Equal(int64(7))) }) + It("returns the maximum ByteCount", func() { + Expect(MaxByteCount(7, 5)).To(Equal(protocol.ByteCount(7))) + Expect(MaxByteCount(5, 7)).To(Equal(protocol.ByteCount(7))) + }) + It("returns the maximum duration", func() { Expect(MaxDuration(time.Microsecond, time.Nanosecond)).To(Equal(time.Microsecond)) Expect(MaxDuration(time.Nanosecond, time.Microsecond)).To(Equal(time.Microsecond))