forked from quic-go/quic-go
implement prr sender
This commit is contained in:
63
congestion/prr_sender.go
Normal file
63
congestion/prr_sender.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package congestion
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/protocol"
|
||||
)
|
||||
|
||||
// PrrSender implements the Proportional Rate Reduction (PRR) per RFC 6937
|
||||
type PrrSender struct {
|
||||
bytesSentSinceLoss uint64
|
||||
bytesDeliveredSinceLoss uint64
|
||||
ackCountSinceLoss uint64
|
||||
bytesInFlightBeforeLoss uint64
|
||||
}
|
||||
|
||||
// OnPacketSent should be called after a packet was sent
|
||||
func (p *PrrSender) OnPacketSent(sentBytes uint64) {
|
||||
p.bytesSentSinceLoss += sentBytes
|
||||
}
|
||||
|
||||
// OnPacketLost should be called on the first loss that triggers a recovery
|
||||
// period and all other methods in this class should only be called when in
|
||||
// recovery.
|
||||
func (p *PrrSender) OnPacketLost(bytesInFlight uint64) {
|
||||
p.bytesSentSinceLoss = 0
|
||||
p.bytesInFlightBeforeLoss = bytesInFlight
|
||||
p.bytesDeliveredSinceLoss = 0
|
||||
p.ackCountSinceLoss = 0
|
||||
}
|
||||
|
||||
// OnPacketAcked should be called after a packet was acked
|
||||
func (p *PrrSender) OnPacketAcked(ackedBytes uint64) {
|
||||
p.bytesDeliveredSinceLoss += ackedBytes
|
||||
p.ackCountSinceLoss++
|
||||
}
|
||||
|
||||
// TimeUntilSend calculates the time until a packet can be sent
|
||||
func (p *PrrSender) TimeUntilSend(congestionWindow, bytesInFlight, slowstartThreshold uint64) time.Duration {
|
||||
// Return QuicTime::Zero In order to ensure limited transmit always works.
|
||||
if p.bytesSentSinceLoss == 0 || bytesInFlight < protocol.DefaultTCPMSS {
|
||||
return 0
|
||||
}
|
||||
if congestionWindow > bytesInFlight {
|
||||
// During PRR-SSRB, limit outgoing packets to 1 extra MSS per ack, instead
|
||||
// of sending the entire available window. This prevents burst retransmits
|
||||
// when more packets are lost than the CWND reduction.
|
||||
// limit = MAX(prr_delivered - prr_out, DeliveredData) + MSS
|
||||
if p.bytesDeliveredSinceLoss+p.ackCountSinceLoss*protocol.DefaultTCPMSS <= p.bytesSentSinceLoss {
|
||||
return math.MaxInt64
|
||||
}
|
||||
return 0
|
||||
}
|
||||
// Implement Proportional Rate Reduction (RFC6937).
|
||||
// Checks a simplified version of the PRR formula that doesn't use division:
|
||||
// AvailableSendWindow =
|
||||
// CEIL(prr_delivered * ssthresh / BytesInFlightAtLoss) - prr_sent
|
||||
if p.bytesDeliveredSinceLoss*slowstartThreshold > p.bytesSentSinceLoss*p.bytesInFlightBeforeLoss {
|
||||
return 0
|
||||
}
|
||||
return math.MaxInt64
|
||||
}
|
||||
111
congestion/prr_sender_test.go
Normal file
111
congestion/prr_sender_test.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package congestion_test
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/congestion"
|
||||
"github.com/lucas-clemente/quic-go/protocol"
|
||||
)
|
||||
|
||||
var _ = Describe("PRR sender", func() {
|
||||
var (
|
||||
prr congestion.PrrSender
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
prr = congestion.PrrSender{}
|
||||
})
|
||||
|
||||
It("single loss results in send on every other ack", func() {
|
||||
num_packets_in_flight := uint64(50)
|
||||
bytes_in_flight := num_packets_in_flight * protocol.DefaultTCPMSS
|
||||
ssthresh_after_loss := num_packets_in_flight / 2
|
||||
congestion_window := ssthresh_after_loss * protocol.DefaultTCPMSS
|
||||
|
||||
prr.OnPacketLost(bytes_in_flight)
|
||||
// Ack a packet. PRR allows one packet to leave immediately.
|
||||
prr.OnPacketAcked(protocol.DefaultTCPMSS)
|
||||
bytes_in_flight -= protocol.DefaultTCPMSS
|
||||
Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(BeZero())
|
||||
// Send retransmission.
|
||||
prr.OnPacketSent(protocol.DefaultTCPMSS)
|
||||
// PRR shouldn't allow sending any more packets.
|
||||
Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(Equal(time.Duration(math.MaxInt64)))
|
||||
|
||||
// One packet is lost, and one ack was consumed above. PRR now paces
|
||||
// transmissions through the remaining 48 acks. PRR will alternatively
|
||||
// disallow and allow a packet to be sent in response to an ack.
|
||||
for i := uint64(0); i < ssthresh_after_loss-1; i++ {
|
||||
// Ack a packet. PRR shouldn't allow sending a packet in response.
|
||||
prr.OnPacketAcked(protocol.DefaultTCPMSS)
|
||||
bytes_in_flight -= protocol.DefaultTCPMSS
|
||||
Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(Equal(time.Duration(math.MaxInt64)))
|
||||
// Ack another packet. PRR should now allow sending a packet in response.
|
||||
prr.OnPacketAcked(protocol.DefaultTCPMSS)
|
||||
bytes_in_flight -= protocol.DefaultTCPMSS
|
||||
Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(BeZero())
|
||||
// Send a packet in response.
|
||||
prr.OnPacketSent(protocol.DefaultTCPMSS)
|
||||
bytes_in_flight += protocol.DefaultTCPMSS
|
||||
}
|
||||
|
||||
// Since bytes_in_flight is now equal to congestion_window, PRR now maintains
|
||||
// packet conservation, allowing one packet to be sent in response to an ack.
|
||||
Expect(bytes_in_flight).To(Equal(congestion_window))
|
||||
for i := 0; i < 10; i++ {
|
||||
// Ack a packet.
|
||||
prr.OnPacketAcked(protocol.DefaultTCPMSS)
|
||||
bytes_in_flight -= protocol.DefaultTCPMSS
|
||||
Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(BeZero())
|
||||
// Send a packet in response, since PRR allows it.
|
||||
prr.OnPacketSent(protocol.DefaultTCPMSS)
|
||||
bytes_in_flight += protocol.DefaultTCPMSS
|
||||
|
||||
// Since bytes_in_flight is equal to the congestion_window,
|
||||
// PRR disallows sending.
|
||||
Expect(bytes_in_flight).To(Equal(congestion_window))
|
||||
Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(Equal(time.Duration(math.MaxInt64)))
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
It("bust loss results in slow start", func() {
|
||||
bytes_in_flight := uint64(20 * protocol.DefaultTCPMSS)
|
||||
const num_packets_lost = 13
|
||||
const ssthresh_after_loss = 10
|
||||
const congestion_window = ssthresh_after_loss * protocol.DefaultTCPMSS
|
||||
|
||||
// Lose 13 packets.
|
||||
bytes_in_flight -= num_packets_lost * protocol.DefaultTCPMSS
|
||||
prr.OnPacketLost(bytes_in_flight)
|
||||
|
||||
// PRR-SSRB will allow the following 3 acks to send up to 2 packets.
|
||||
for i := 0; i < 3; i++ {
|
||||
prr.OnPacketAcked(protocol.DefaultTCPMSS)
|
||||
bytes_in_flight -= protocol.DefaultTCPMSS
|
||||
// PRR-SSRB should allow two packets to be sent.
|
||||
for j := 0; j < 2; j++ {
|
||||
Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(BeZero())
|
||||
// Send a packet in response.
|
||||
prr.OnPacketSent(protocol.DefaultTCPMSS)
|
||||
bytes_in_flight += protocol.DefaultTCPMSS
|
||||
}
|
||||
// PRR should allow no more than 2 packets in response to an ack.
|
||||
Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(Equal(time.Duration(math.MaxInt64)))
|
||||
}
|
||||
|
||||
// Out of SSRB mode, PRR allows one send in response to each ack.
|
||||
for i := 0; i < 10; i++ {
|
||||
prr.OnPacketAcked(protocol.DefaultTCPMSS)
|
||||
bytes_in_flight -= protocol.DefaultTCPMSS
|
||||
Expect(prr.TimeUntilSend(congestion_window, bytes_in_flight, ssthresh_after_loss*protocol.DefaultTCPMSS)).To(BeZero())
|
||||
// Send a packet in response.
|
||||
prr.OnPacketSent(protocol.DefaultTCPMSS)
|
||||
bytes_in_flight += protocol.DefaultTCPMSS
|
||||
}
|
||||
})
|
||||
})
|
||||
@@ -17,3 +17,7 @@ const MaxPacketSize = 1452
|
||||
|
||||
// MaxFrameSize is the maximum size of a QUIC frame
|
||||
const MaxFrameSize = MaxPacketSize - (1 + 8 + 6) /*public header*/ - 1 /*private header*/ - 12 /*crypto signature*/
|
||||
|
||||
// DefaultTCPMSS is the default maximum packet size used in the Linux TCP implementation.
|
||||
// Used in QUIC for congestion window computations in bytes.
|
||||
const DefaultTCPMSS = 1460
|
||||
|
||||
Reference in New Issue
Block a user