From f16d30a20c0b345f1ec690b6c4874216690fe60c Mon Sep 17 00:00:00 2001 From: Lucas Clemente Date: Wed, 27 Apr 2016 18:17:55 +0200 Subject: [PATCH] implement another CubicSender test --- congestion/congestion_vector.go | 10 +++ congestion/cubic_sender.go | 133 +++++++++++++++++++++++++++++++- congestion/cubic_sender_test.go | 49 ++++++++++-- congestion/hybrid_slow_start.go | 9 +++ congestion/interface.go | 1 + congestion/stats.go | 8 ++ protocol/protocol.go | 3 + 7 files changed, 204 insertions(+), 9 deletions(-) create mode 100644 congestion/congestion_vector.go create mode 100644 congestion/stats.go diff --git a/congestion/congestion_vector.go b/congestion/congestion_vector.go new file mode 100644 index 00000000..7b21c1a4 --- /dev/null +++ b/congestion/congestion_vector.go @@ -0,0 +1,10 @@ +package congestion + +import "github.com/lucas-clemente/quic-go/protocol" + +type PacketInfo struct { + Number protocol.PacketNumber + Length uint64 +} + +type PacketVector []PacketInfo diff --git a/congestion/cubic_sender.go b/congestion/cubic_sender.go index dd8582d7..b9b2608a 100644 --- a/congestion/cubic_sender.go +++ b/congestion/cubic_sender.go @@ -7,9 +7,17 @@ import ( "github.com/lucas-clemente/quic-go/protocol" ) +const ( + maxBurstBytes = 3 * protocol.DefaultTCPMSS + defaultMinimumCongestionWindow protocol.PacketNumber = 2 +) + type cubicSender struct { hybridSlowStart HybridSlowStart prr PrrSender + rttStats *RTTStats + stats connectionStats + cubic *Cubic // Track the largest packet that has been sent. largestSentPacketNumber protocol.PacketNumber @@ -25,12 +33,30 @@ type cubicSender struct { // Slow start congestion window in packets, aka ssthresh. slowstartThreshold protocol.PacketNumber + + // Whether the last loss event caused us to exit slowstart. + // Used for stats collection of slowstartPacketsLost + lastCutbackExitedSlowstart bool + + // When true, exit slow start with large cutback of congestion window. + slowStartLargeReduction bool + + // Minimum congestion window in packets. + minCongestionWindow protocol.PacketNumber + + // Maximum number of outstanding packets for tcp. + maxTCPCongestionWindow protocol.PacketNumber } // NewCubicSender makes a new cubic sender -func NewCubicSender(initialCongestionWindow protocol.PacketNumber) SendAlgorithm { +func NewCubicSender(clock Clock, rttStats *RTTStats, initialCongestionWindow protocol.PacketNumber) SendAlgorithm { return &cubicSender{ - congestionWindow: initialCongestionWindow, + rttStats: rttStats, + minCongestionWindow: defaultMinimumCongestionWindow, + congestionWindow: initialCongestionWindow, + maxTCPCongestionWindow: protocol.MaxCongestionWindow, + slowstartThreshold: protocol.MaxCongestionWindow, + cubic: NewCubic(clock), } } @@ -63,6 +89,10 @@ func (c *cubicSender) InRecovery() bool { return c.largestAckedPacketNumber <= c.largestSentAtLastCutback && c.largestAckedPacketNumber != 0 } +func (c *cubicSender) InSlowStart() bool { + return c.GetCongestionWindow() < c.GetSlowStartThreshold() +} + func (c *cubicSender) GetCongestionWindow() uint64 { return uint64(c.congestionWindow) * protocol.DefaultTCPMSS } @@ -70,3 +100,102 @@ func (c *cubicSender) GetCongestionWindow() uint64 { func (c *cubicSender) GetSlowStartThreshold() uint64 { return uint64(c.slowstartThreshold) * protocol.DefaultTCPMSS } + +func (c *cubicSender) ExitSlowstart() { + c.slowstartThreshold = c.congestionWindow +} + +// OnCongestionEvent indicates an update to the congestion state, caused either by an incoming +// ack or loss event timeout. |rttUpdated| indicates whether a new +// latest_rtt sample has been taken, |byte_in_flight| the bytes in flight +// prior to the congestion event. |ackedPackets| and |lostPackets| are +// any packets considered acked or lost as a result of the congestion event. +func (c *cubicSender) OnCongestionEvent(rttUpdated bool, bytesInFlight uint64, ackedPackets PacketVector, lostPackets PacketVector) { + if rttUpdated && c.InSlowStart() && c.hybridSlowStart.ShouldExitSlowStart(c.rttStats.LatestRTT(), c.rttStats.MinRTT(), c.GetCongestionWindow()/protocol.DefaultTCPMSS) { + c.ExitSlowstart() + } + for _, i := range lostPackets { + c.onPacketLost(i.Number, i.Length, bytesInFlight) + } + for _, i := range ackedPackets { + c.onPacketAcked(i.Number, i.Length, bytesInFlight) + } +} + +func (c *cubicSender) onPacketAcked(ackedPacketNumber protocol.PacketNumber, ackedBytes uint64, bytesInFlight uint64) { + c.largestAckedPacketNumber = protocol.MaxPacketNumber(ackedPacketNumber, c.largestAckedPacketNumber) + if c.InRecovery() { + // PRR is used when in recovery. + c.prr.OnPacketAcked(ackedBytes) + return + } + c.maybeIncreaseCwnd(ackedPacketNumber, ackedBytes, bytesInFlight) + if c.InSlowStart() { + c.hybridSlowStart.OnPacketAcked(ackedPacketNumber) + } +} + +func (c *cubicSender) onPacketLost(packetNumber protocol.PacketNumber, lostBytes uint64, bytesInFlight uint64) { + // TCP NewReno (RFC6582) says that once a loss occurs, any losses in packets + // already sent should be treated as a single loss event, since it's expected. + if packetNumber <= c.largestSentAtLastCutback { + if c.lastCutbackExitedSlowstart { + c.stats.slowstartPacketsLost++ + c.stats.slowstartBytesLost += lostBytes + if c.slowStartLargeReduction { + if c.stats.slowstartPacketsLost == 1 || (c.stats.slowstartBytesLost/protocol.DefaultTCPMSS) > (c.stats.slowstartBytesLost-lostBytes)/protocol.DefaultTCPMSS { + // Reduce congestion window by 1 for every mss of bytes lost. + c.congestionWindow = protocol.MaxPacketNumber(c.congestionWindow-1, c.minCongestionWindow) + } + c.slowstartThreshold = c.congestionWindow + } + } + return + } + c.lastCutbackExitedSlowstart = c.InSlowStart() + + c.prr.OnPacketLost(bytesInFlight) + + // TODO(chromium): Separate out all of slow start into a separate class. + if c.slowStartLargeReduction && c.InSlowStart() { + c.congestionWindow = c.congestionWindow - 1 + } else { + c.congestionWindow = c.cubic.CongestionWindowAfterPacketLoss(c.congestionWindow) + } + // Enforce a minimum congestion window. + if c.congestionWindow < c.minCongestionWindow { + c.congestionWindow = c.minCongestionWindow + } + c.slowstartThreshold = c.congestionWindow + c.largestSentAtLastCutback = c.largestSentPacketNumber +} + +// Called when we receive an ack. Normal TCP tracks how many packets one ack +// represents, but quic has a separate ack for each packet. +func (c *cubicSender) maybeIncreaseCwnd(ackedPacketNumber protocol.PacketNumber, ackedBytes uint64, bytesInFlight uint64) { + // Do not increase the congestion window unless the sender is close to using + // the current window. + if !c.isCwndLimited(bytesInFlight) { + c.cubic.OnApplicationLimited() + return + } + if c.congestionWindow >= c.maxTCPCongestionWindow { + return + } + if c.InSlowStart() { + // TCP slow start, exponential growth, increase by one for each ACK. + c.congestionWindow++ + return + } + c.congestionWindow = protocol.MinPacketNumber(c.maxTCPCongestionWindow, c.cubic.CongestionWindowAfterAck(c.congestionWindow, c.rttStats.MinRTT())) +} + +func (c *cubicSender) isCwndLimited(bytesInFlight uint64) bool { + congestionWindow := c.GetCongestionWindow() + if bytesInFlight >= congestionWindow { + return true + } + availableBytes := congestionWindow - bytesInFlight + slowStartLimited := c.InSlowStart() && bytesInFlight > congestionWindow/2 + return slowStartLimited || availableBytes <= maxBurstBytes +} diff --git a/congestion/cubic_sender_test.go b/congestion/cubic_sender_test.go index 86c0a344..da1c9f61 100644 --- a/congestion/cubic_sender_test.go +++ b/congestion/cubic_sender_test.go @@ -24,17 +24,20 @@ func (c *mockClock) Advance(d time.Duration) { var _ = Describe("Cubic Sender", func() { var ( - sender congestion.SendAlgorithm - clock mockClock - bytesInFlight uint64 - packetNumber protocol.PacketNumber + sender congestion.SendAlgorithm + clock mockClock + bytesInFlight uint64 + packetNumber protocol.PacketNumber + ackedPacketNumber protocol.PacketNumber + rttStats *congestion.RTTStats ) BeforeEach(func() { bytesInFlight = 0 - clock = mockClock{} - sender = congestion.NewCubicSender(initialCongestionWindowPackets) packetNumber = 1 + clock = mockClock{} + rttStats = congestion.NewRTTStats() + sender = congestion.NewCubicSender(&clock, rttStats, initialCongestionWindowPackets) }) SendAvailableSendWindow := func(packetLength uint64) int { @@ -51,7 +54,21 @@ var _ = Describe("Cubic Sender", func() { return packets_sent } - It("works with default values", func() { + // Normal is that TCP acks every other segment. + AckNPackets := func(n int) { + rttStats.UpdateRTT(60*time.Millisecond, 0, clock.Now()) + var ackedPackets congestion.PacketVector + var lostPackets congestion.PacketVector + for i := 0; i < n; i++ { + ackedPacketNumber++ + ackedPackets = append(ackedPackets, congestion.PacketInfo{Number: ackedPacketNumber, Length: protocol.DefaultTCPMSS}) + } + sender.OnCongestionEvent(true, bytesInFlight, ackedPackets, lostPackets) + bytesInFlight -= uint64(n) * protocol.DefaultTCPMSS + clock.Advance(time.Millisecond) + } + + It("simpler sender", func() { // At startup make sure we are at the default. Expect(sender.GetCongestionWindow()).To(Equal(defaultWindowTCP)) // At startup make sure we can send. @@ -65,4 +82,22 @@ var _ = Describe("Cubic Sender", func() { SendAvailableSendWindow(protocol.DefaultTCPMSS) Expect(sender.TimeUntilSend(clock.Now(), sender.GetCongestionWindow())).ToNot(BeZero()) }) + + It("application limited slow start", func() { + // Send exactly 10 packets and ensure the CWND ends at 14 packets. + const kNumberOfAcks = 5 + // At startup make sure we can send. + Expect(sender.TimeUntilSend(clock.Now(), 0)).To(BeZero()) + // Make sure we can send. + Expect(sender.TimeUntilSend(clock.Now(), 0)).To(BeZero()) + + SendAvailableSendWindow(protocol.DefaultTCPMSS) + for i := 0; i < kNumberOfAcks; i++ { + AckNPackets(2) + } + bytesToSend := sender.GetCongestionWindow() + // It's expected 2 acks will arrive when the bytes_in_flight are greater than + // half the CWND. + Expect(bytesToSend).To(Equal(defaultWindowTCP + protocol.DefaultTCPMSS*2*2)) + }) }) diff --git a/congestion/hybrid_slow_start.go b/congestion/hybrid_slow_start.go index 04185064..a0e2306e 100644 --- a/congestion/hybrid_slow_start.go +++ b/congestion/hybrid_slow_start.go @@ -89,3 +89,12 @@ func (s *HybridSlowStart) ShouldExitSlowStart(latestRTT time.Duration, minRTT ti func (s *HybridSlowStart) OnPacketSent(packetNumber protocol.PacketNumber) { s.lastSentPacketNumber = packetNumber } + +// OnPacketAcked gets invoked after ShouldExitSlowStart, so it's best to end +// the round when the final packet of the burst is received and start it on +// the next incoming ack. +func (s *HybridSlowStart) OnPacketAcked(ackedPacketNumber protocol.PacketNumber) { + if s.IsEndOfRound(ackedPacketNumber) { + s.started = false + } +} diff --git a/congestion/interface.go b/congestion/interface.go index ff0ab889..92c34be7 100644 --- a/congestion/interface.go +++ b/congestion/interface.go @@ -10,4 +10,5 @@ type SendAlgorithm interface { TimeUntilSend(now time.Time, bytesInFlight uint64) time.Duration OnPacketSent(sentTime time.Time, bytesInFlight uint64, packetNumber protocol.PacketNumber, bytes uint64, isRetransmittable bool) bool GetCongestionWindow() uint64 + OnCongestionEvent(rttUpdated bool, bytesInFlight uint64, ackedPackets PacketVector, lostPackets PacketVector) } diff --git a/congestion/stats.go b/congestion/stats.go new file mode 100644 index 00000000..3c40180d --- /dev/null +++ b/congestion/stats.go @@ -0,0 +1,8 @@ +package congestion + +import "github.com/lucas-clemente/quic-go/protocol" + +type connectionStats struct { + slowstartPacketsLost protocol.PacketNumber + slowstartBytesLost uint64 +} diff --git a/protocol/protocol.go b/protocol/protocol.go index bbfc4d79..9128e3ed 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -24,3 +24,6 @@ const DefaultTCPMSS = 1460 // InitialCongestionWindow is the initial congestion window in QUIC packets const InitialCongestionWindow = 32 + +// MaxCongestionWindow is the maximum size of the CWND, in packets. +const MaxCongestionWindow PacketNumber = 200