From 76d4b7a931acd7ef8fe1af9d7df1593a1c537bc0 Mon Sep 17 00:00:00 2001 From: Lucas Clemente Date: Sun, 24 Apr 2016 13:27:01 +0200 Subject: [PATCH] implement prr sender --- congestion/prr_sender.go | 63 +++++++++++++++++++ congestion/prr_sender_test.go | 111 ++++++++++++++++++++++++++++++++++ protocol/protocol.go | 4 ++ 3 files changed, 178 insertions(+) create mode 100644 congestion/prr_sender.go create mode 100644 congestion/prr_sender_test.go diff --git a/congestion/prr_sender.go b/congestion/prr_sender.go new file mode 100644 index 000000000..d1122d4c8 --- /dev/null +++ b/congestion/prr_sender.go @@ -0,0 +1,63 @@ +package congestion + +import ( + "math" + "time" + + "github.com/lucas-clemente/quic-go/protocol" +) + +// PrrSender implements the Proportional Rate Reduction (PRR) per RFC 6937 +type PrrSender struct { + bytesSentSinceLoss uint64 + bytesDeliveredSinceLoss uint64 + ackCountSinceLoss uint64 + bytesInFlightBeforeLoss uint64 +} + +// OnPacketSent should be called after a packet was sent +func (p *PrrSender) OnPacketSent(sentBytes uint64) { + p.bytesSentSinceLoss += sentBytes +} + +// OnPacketLost should be called on the first loss that triggers a recovery +// period and all other methods in this class should only be called when in +// recovery. +func (p *PrrSender) OnPacketLost(bytesInFlight uint64) { + p.bytesSentSinceLoss = 0 + p.bytesInFlightBeforeLoss = bytesInFlight + p.bytesDeliveredSinceLoss = 0 + p.ackCountSinceLoss = 0 +} + +// OnPacketAcked should be called after a packet was acked +func (p *PrrSender) OnPacketAcked(ackedBytes uint64) { + p.bytesDeliveredSinceLoss += ackedBytes + p.ackCountSinceLoss++ +} + +// TimeUntilSend calculates the time until a packet can be sent +func (p *PrrSender) TimeUntilSend(congestionWindow, bytesInFlight, slowstartThreshold uint64) time.Duration { + // Return QuicTime::Zero In order to ensure limited transmit always works. + if p.bytesSentSinceLoss == 0 || bytesInFlight < protocol.DefaultTCPMSS { + return 0 + } + if congestionWindow > bytesInFlight { + // During PRR-SSRB, limit outgoing packets to 1 extra MSS per ack, instead + // of sending the entire available window. This prevents burst retransmits + // when more packets are lost than the CWND reduction. + // limit = MAX(prr_delivered - prr_out, DeliveredData) + MSS + if p.bytesDeliveredSinceLoss+p.ackCountSinceLoss*protocol.DefaultTCPMSS <= p.bytesSentSinceLoss { + return math.MaxInt64 + } + return 0 + } + // Implement Proportional Rate Reduction (RFC6937). + // Checks a simplified version of the PRR formula that doesn't use division: + // AvailableSendWindow = + // CEIL(prr_delivered * ssthresh / BytesInFlightAtLoss) - prr_sent + if p.bytesDeliveredSinceLoss*slowstartThreshold > p.bytesSentSinceLoss*p.bytesInFlightBeforeLoss { + return 0 + } + return math.MaxInt64 +} diff --git a/congestion/prr_sender_test.go b/congestion/prr_sender_test.go new file mode 100644 index 000000000..5bc74086f --- /dev/null +++ b/congestion/prr_sender_test.go @@ -0,0 +1,111 @@ +package congestion_test + +import ( + "math" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/lucas-clemente/quic-go/congestion" + "github.com/lucas-clemente/quic-go/protocol" +) + +var _ = Describe("PRR sender", func() { + var ( + prr congestion.PrrSender + ) + + BeforeEach(func() { + prr = congestion.PrrSender{} + }) + + It("single loss results in send on every other ack", func() { + num_packets_in_flight := uint64(50) + bytes_in_flight := num_packets_in_flight * protocol.DefaultTCPMSS + ssthresh_after_loss := num_packets_in_flight / 2 + congestion_window := ssthresh_after_loss * protocol.DefaultTCPMSS + + prr.OnPacketLost(bytes_in_flight) + // Ack a packet. PRR allows one packet to leave immediately. + prr.OnPacketAcked(protocol.DefaultTCPMSS) + bytes_in_flight -= protocol.DefaultTCPMSS + Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(BeZero()) + // Send retransmission. + prr.OnPacketSent(protocol.DefaultTCPMSS) + // PRR shouldn't allow sending any more packets. + Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(Equal(time.Duration(math.MaxInt64))) + + // One packet is lost, and one ack was consumed above. PRR now paces + // transmissions through the remaining 48 acks. PRR will alternatively + // disallow and allow a packet to be sent in response to an ack. + for i := uint64(0); i < ssthresh_after_loss-1; i++ { + // Ack a packet. PRR shouldn't allow sending a packet in response. + prr.OnPacketAcked(protocol.DefaultTCPMSS) + bytes_in_flight -= protocol.DefaultTCPMSS + Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(Equal(time.Duration(math.MaxInt64))) + // Ack another packet. PRR should now allow sending a packet in response. + prr.OnPacketAcked(protocol.DefaultTCPMSS) + bytes_in_flight -= protocol.DefaultTCPMSS + Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(BeZero()) + // Send a packet in response. + prr.OnPacketSent(protocol.DefaultTCPMSS) + bytes_in_flight += protocol.DefaultTCPMSS + } + + // Since bytes_in_flight is now equal to congestion_window, PRR now maintains + // packet conservation, allowing one packet to be sent in response to an ack. + Expect(bytes_in_flight).To(Equal(congestion_window)) + for i := 0; i < 10; i++ { + // Ack a packet. + prr.OnPacketAcked(protocol.DefaultTCPMSS) + bytes_in_flight -= protocol.DefaultTCPMSS + Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(BeZero()) + // Send a packet in response, since PRR allows it. + prr.OnPacketSent(protocol.DefaultTCPMSS) + bytes_in_flight += protocol.DefaultTCPMSS + + // Since bytes_in_flight is equal to the congestion_window, + // PRR disallows sending. + Expect(bytes_in_flight).To(Equal(congestion_window)) + Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(Equal(time.Duration(math.MaxInt64))) + } + + }) + + It("bust loss results in slow start", func() { + bytes_in_flight := uint64(20 * protocol.DefaultTCPMSS) + const num_packets_lost = 13 + const ssthresh_after_loss = 10 + const congestion_window = ssthresh_after_loss * protocol.DefaultTCPMSS + + // Lose 13 packets. + bytes_in_flight -= num_packets_lost * protocol.DefaultTCPMSS + prr.OnPacketLost(bytes_in_flight) + + // PRR-SSRB will allow the following 3 acks to send up to 2 packets. + for i := 0; i < 3; i++ { + prr.OnPacketAcked(protocol.DefaultTCPMSS) + bytes_in_flight -= protocol.DefaultTCPMSS + // PRR-SSRB should allow two packets to be sent. + for j := 0; j < 2; j++ { + Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(BeZero()) + // Send a packet in response. + prr.OnPacketSent(protocol.DefaultTCPMSS) + bytes_in_flight += protocol.DefaultTCPMSS + } + // PRR should allow no more than 2 packets in response to an ack. + Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(Equal(time.Duration(math.MaxInt64))) + } + + // Out of SSRB mode, PRR allows one send in response to each ack. + for i := 0; i < 10; i++ { + prr.OnPacketAcked(protocol.DefaultTCPMSS) + bytes_in_flight -= protocol.DefaultTCPMSS + Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(BeZero()) + // Send a packet in response. + prr.OnPacketSent(protocol.DefaultTCPMSS) + bytes_in_flight += protocol.DefaultTCPMSS + } + }) +}) diff --git a/protocol/protocol.go b/protocol/protocol.go index 4bb0dddc3..1ecbce13e 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -17,3 +17,7 @@ const MaxPacketSize = 1452 // MaxFrameSize is the maximum size of a QUIC frame const MaxFrameSize = MaxPacketSize - (1 + 8 + 6) /*public header*/ - 1 /*private header*/ - 12 /*crypto signature*/ + +// DefaultTCPMSS is the default maximum packet size used in the Linux TCP implementation. +// Used in QUIC for congestion window computations in bytes. +const DefaultTCPMSS = 1460