Implement loss recovery from the current WG draft

Fixes #498 and will hopefully go a long way towards fixing the many
flaky tests.
This commit is contained in:
Lucas Clemente
2017-03-24 18:28:59 +01:00
parent 959a8d0e4f
commit 1c5380c49b
9 changed files with 376 additions and 434 deletions

View File

@@ -12,6 +12,18 @@ import (
"github.com/lucas-clemente/quic-go/utils"
)
const (
// Maximum reordering in time space before time based loss detection considers a packet lost.
// In fraction of an RTT.
timeReorderingFraction = 1.0 / 8
// defaultRTOTimeout is the RTO time on new connections
defaultRTOTimeout = 500 * time.Millisecond
// Minimum time in the future an RTO alarm may be set for.
minRTOTimeout = 200 * time.Millisecond
// maxRTOTimeout is the maximum RTO time
maxRTOTimeout = 60 * time.Second
)
var (
// ErrDuplicateOrOutOfOrderAck occurs when a duplicate or an out-of-order ACK is received
ErrDuplicateOrOutOfOrderAck = errors.New("SentPacketHandler: Duplicate or out-of-order ACK")
@@ -22,11 +34,10 @@ var (
errAckForUnsentPacket = qerr.Error(qerr.InvalidAckData, "Received ACK for an unsent package")
)
var errPacketNumberNotIncreasing = errors.New("Already sent a packet with a higher packet number.")
var errPacketNumberNotIncreasing = errors.New("Already sent a packet with a higher packet number")
type sentPacketHandler struct {
lastSentPacketNumber protocol.PacketNumber
lastSentPacketTime time.Time
skippedPackets []protocol.PacketNumber
LargestAcked protocol.PacketNumber
@@ -40,10 +51,17 @@ type sentPacketHandler struct {
bytesInFlight protocol.ByteCount
rttStats *congestion.RTTStats
congestion congestion.SendAlgorithm
rttStats *congestion.RTTStats
consecutiveRTOCount uint32
// The number of times an RTO has been sent without receiving an ack.
rtoCount uint32
// The time at which the next packet will be considered lost based on early transmit or exceeding the reordering window in time.
lossTime time.Time
// The alarm timeout
alarm time.Time
}
// NewSentPacketHandler creates a new sentPacketHandler
@@ -64,40 +82,6 @@ func NewSentPacketHandler(rttStats *congestion.RTTStats) SentPacketHandler {
}
}
func (h *sentPacketHandler) ackPacket(packetElement *PacketElement) {
packet := &packetElement.Value
h.bytesInFlight -= packet.Length
h.packetHistory.Remove(packetElement)
}
// nackPacket NACKs a packet
// it returns true if a FastRetransmissions was triggered
func (h *sentPacketHandler) nackPacket(packetElement *PacketElement) bool {
packet := &packetElement.Value
packet.MissingReports++
if packet.MissingReports > protocol.RetransmissionThreshold {
utils.Debugf("\tQueueing packet 0x%x for retransmission (fast)", packet.PacketNumber)
h.queuePacketForRetransmission(packetElement)
return true
}
return false
}
// does NOT set packet.Retransmitted. This variable is not needed anymore
func (h *sentPacketHandler) queuePacketForRetransmission(packetElement *PacketElement) {
packet := &packetElement.Value
h.bytesInFlight -= packet.Length
h.retransmissionQueue = append(h.retransmissionQueue, packet)
h.packetHistory.Remove(packetElement)
// strictly speaking, this is only necessary for RTO retransmissions
// this is because FastRetransmissions are triggered by missing ranges in ACKs, and then the LargestAcked will already be higher than the packet number of the retransmitted packet
h.stopWaitingManager.QueuedRetransmissionForPacketNumber(packet.PacketNumber)
}
func (h *sentPacketHandler) largestInOrderAcked() protocol.PacketNumber {
if f := h.packetHistory.Front(); f != nil {
return f.Value.PacketNumber - 1
@@ -119,7 +103,6 @@ func (h *sentPacketHandler) SentPacket(packet *Packet) error {
}
now := time.Now()
h.lastSentPacketTime = now
packet.SendTime = now
if packet.Length == 0 {
return errors.New("SentPacketHandler: packet cannot be empty")
@@ -131,12 +114,14 @@ func (h *sentPacketHandler) SentPacket(packet *Packet) error {
h.congestion.OnPacketSent(
now,
h.BytesInFlight(),
h.bytesInFlight,
packet.PacketNumber,
packet.Length,
true, /* TODO: is retransmittable */
)
h.updateLossDetectionAlarm()
return nil
}
@@ -149,54 +134,58 @@ func (h *sentPacketHandler) ReceivedAck(ackFrame *frames.AckFrame, withPacketNum
if withPacketNumber <= h.largestReceivedPacketWithAck {
return ErrDuplicateOrOutOfOrderAck
}
h.largestReceivedPacketWithAck = withPacketNumber
// ignore repeated ACK (ACKs that don't have a higher LargestAcked than the last ACK)
if ackFrame.LargestAcked <= h.largestInOrderAcked() {
return nil
}
// check if it acks any packets that were skipped
for _, p := range h.skippedPackets {
if ackFrame.AcksPacket(p) {
return ErrAckForSkippedPacket
}
}
h.LargestAcked = ackFrame.LargestAcked
var ackedPackets congestion.PacketVector
var lostPackets congestion.PacketVector
ackRangeIndex := 0
rttUpdated := false
if h.skippedPacketsAcked(ackFrame) {
return ErrAckForSkippedPacket
}
var el, elNext *PacketElement
for el = h.packetHistory.Front(); el != nil; el = elNext {
// determine the next list element right at the beginning, because el.Next() is not avaible anymore, when the list element is deleted (i.e. when the packet is ACKed)
elNext = el.Next()
rttUpdated := h.maybeUpdateRTT(ackFrame.LargestAcked, ackFrame.DelayTime, rcvTime)
ackedPackets, err := h.determineNewlyAckedPackets(ackFrame)
if err != nil {
return err
}
if len(ackedPackets) > 0 {
var ackedPacketsCongestion congestion.PacketVector
for _, p := range ackedPackets {
h.onPacketAcked(p)
ackedPacketsCongestion = append(ackedPacketsCongestion, congestion.PacketInfo{
Number: p.Value.PacketNumber,
Length: p.Value.Length,
})
}
h.congestion.OnCongestionEvent(rttUpdated, h.bytesInFlight, ackedPacketsCongestion, nil)
}
h.detectLostPackets(rttUpdated)
h.updateLossDetectionAlarm()
h.garbageCollectSkippedPackets()
h.stopWaitingManager.ReceivedAck(ackFrame)
return nil
}
func (h *sentPacketHandler) determineNewlyAckedPackets(ackFrame *frames.AckFrame) ([]*PacketElement, error) {
var ackedPackets []*PacketElement
ackRangeIndex := 0
for el := h.packetHistory.Front(); el != nil; el = el.Next() {
packet := el.Value
packetNumber := packet.PacketNumber
// NACK packets below the LowestAcked
// Ignore packets below the LowestAcked
if packetNumber < ackFrame.LowestAcked {
retransmitted := h.nackPacket(el)
if retransmitted {
lostPackets = append(lostPackets, congestion.PacketInfo{Number: packetNumber, Length: packet.Length})
}
continue
}
// Update the RTT
if packetNumber == h.LargestAcked {
rttUpdated = true
timeDelta := rcvTime.Sub(packet.SendTime)
h.rttStats.UpdateRTT(timeDelta, ackFrame.DelayTime, rcvTime)
if utils.Debug() {
utils.Debugf("\tEstimated RTT: %dms", h.rttStats.SmoothedRTT()/time.Millisecond)
}
}
// Break after LargestAcked is reached
if packetNumber > ackFrame.LargestAcked {
break
}
@@ -211,59 +200,124 @@ func (h *sentPacketHandler) ReceivedAck(ackFrame *frames.AckFrame, withPacketNum
if packetNumber >= ackRange.FirstPacketNumber { // packet i contained in ACK range
if packetNumber > ackRange.LastPacketNumber {
return fmt.Errorf("BUG: ackhandler would have acked wrong packet 0x%x, while evaluating range 0x%x -> 0x%x", packetNumber, ackRange.FirstPacketNumber, ackRange.LastPacketNumber)
}
h.ackPacket(el)
ackedPackets = append(ackedPackets, congestion.PacketInfo{Number: packetNumber, Length: packet.Length})
} else {
retransmitted := h.nackPacket(el)
if retransmitted {
lostPackets = append(lostPackets, congestion.PacketInfo{Number: packetNumber, Length: packet.Length})
return nil, fmt.Errorf("BUG: ackhandler would have acked wrong packet 0x%x, while evaluating range 0x%x -> 0x%x", packetNumber, ackRange.FirstPacketNumber, ackRange.LastPacketNumber)
}
ackedPackets = append(ackedPackets, el)
}
} else {
h.ackPacket(el)
ackedPackets = append(ackedPackets, congestion.PacketInfo{Number: packetNumber, Length: packet.Length})
ackedPackets = append(ackedPackets, el)
}
}
if rttUpdated {
// Reset counter if a new packet was acked
h.consecutiveRTOCount = 0
return ackedPackets, nil
}
func (h *sentPacketHandler) maybeUpdateRTT(largestAcked protocol.PacketNumber, ackDelay time.Duration, rcvTime time.Time) bool {
for el := h.packetHistory.Front(); el != nil; el = el.Next() {
packet := el.Value
if packet.PacketNumber == largestAcked {
h.rttStats.UpdateRTT(rcvTime.Sub(packet.SendTime), ackDelay, time.Now())
return true
}
// Packets are sorted by number, so we can stop searching
if packet.PacketNumber > largestAcked {
break
}
}
return false
}
func (h *sentPacketHandler) updateLossDetectionAlarm() {
// Cancel the alarm if no packets are outstanding
if h.packetHistory.Len() == 0 {
h.alarm = time.Time{}
return
}
h.garbageCollectSkippedPackets()
// TODO(#496): Handle handshake packets separately
// TODO(#497): TLP
if !h.lossTime.IsZero() {
// Early retransmit timer or time loss detection.
h.alarm = h.lossTime
} else {
// RTO
h.alarm = time.Now().Add(h.computeRTOTimeout())
}
}
h.stopWaitingManager.ReceivedAck(ackFrame)
// TODO(lucas-clemente): Introducing congestion.MaybeExitSlowStart() would allow us to call through for each packet and eliminate both the rttUpdated param and the packet slices passed to the congestion
func (h *sentPacketHandler) detectLostPackets(rttUpdated bool) {
h.lossTime = time.Time{}
now := time.Now()
h.congestion.OnCongestionEvent(
rttUpdated,
h.BytesInFlight(),
ackedPackets,
lostPackets,
)
maxRTT := float64(utils.MaxDuration(h.rttStats.LatestRTT(), h.rttStats.SmoothedRTT()))
delayUntilLost := time.Duration((1.0 + timeReorderingFraction) * maxRTT)
return nil
var lostPackets []*PacketElement
for el := h.packetHistory.Front(); el != nil; el = el.Next() {
packet := el.Value
if packet.PacketNumber > h.LargestAcked {
break
}
timeSinceSent := now.Sub(packet.SendTime)
if timeSinceSent > delayUntilLost {
lostPackets = append(lostPackets, el)
} else if h.lossTime.IsZero() {
// Note: This conditional is only entered once per call
h.lossTime = now.Add(delayUntilLost - timeSinceSent)
}
}
if len(lostPackets) > 0 {
var lostPacketsCongestion congestion.PacketVector
for _, p := range lostPackets {
h.queuePacketForRetransmission(p)
lostPacketsCongestion = append(lostPacketsCongestion, congestion.PacketInfo{
Number: p.Value.PacketNumber,
Length: p.Value.Length,
})
}
h.congestion.OnCongestionEvent(rttUpdated, h.bytesInFlight, nil, lostPacketsCongestion)
}
}
func (h *sentPacketHandler) OnAlarm() {
// TODO(#496): Handle handshake packets separately
// TODO(#497): TLP
if !h.lossTime.IsZero() {
// Early retransmit or time loss detection
h.detectLostPackets(false /* rttUpdated */)
} else {
// RTO
h.retransmitOldestTwoPackets()
h.rtoCount++
}
h.updateLossDetectionAlarm()
}
func (h *sentPacketHandler) GetAlarmTimeout() time.Time {
return h.alarm
}
func (h *sentPacketHandler) onPacketAcked(packetElement *PacketElement) {
h.bytesInFlight -= packetElement.Value.Length
h.rtoCount = 0
// TODO(#497): h.tlpCount = 0
h.packetHistory.Remove(packetElement)
}
func (h *sentPacketHandler) DequeuePacketForRetransmission() *Packet {
if len(h.retransmissionQueue) == 0 {
return nil
}
if len(h.retransmissionQueue) > 0 {
queueLen := len(h.retransmissionQueue)
// packets are usually NACKed in descending order. So use the slice as a stack
packet := h.retransmissionQueue[queueLen-1]
h.retransmissionQueue = h.retransmissionQueue[:queueLen-1]
return packet
}
return nil
}
func (h *sentPacketHandler) BytesInFlight() protocol.ByteCount {
return h.bytesInFlight
queueLen := len(h.retransmissionQueue)
// packets are usually NACKed in descending order. So use the slice as a stack
packet := h.retransmissionQueue[queueLen-1]
h.retransmissionQueue = h.retransmissionQueue[:queueLen-1]
return packet
}
func (h *sentPacketHandler) GetLeastUnacked() protocol.PacketNumber {
@@ -275,7 +329,7 @@ func (h *sentPacketHandler) GetStopWaitingFrame(force bool) *frames.StopWaitingF
}
func (h *sentPacketHandler) SendingAllowed() bool {
congestionLimited := h.BytesInFlight() > h.congestion.GetCongestionWindow()
congestionLimited := h.bytesInFlight > h.congestion.GetCongestionWindow()
maxTrackedLimited := protocol.PacketNumber(len(h.retransmissionQueue)+h.packetHistory.Len()) >= protocol.MaxTrackedSentPackets
return !(congestionLimited || maxTrackedLimited)
}
@@ -288,22 +342,13 @@ func (h *sentPacketHandler) CheckForError() error {
return nil
}
func (h *sentPacketHandler) MaybeQueueRTOs() {
if time.Now().Before(h.TimeOfFirstRTO()) {
return
func (h *sentPacketHandler) retransmitOldestTwoPackets() {
if p := h.packetHistory.Front(); p != nil {
h.queueRTO(p)
}
// Always queue the two oldest packets
if h.packetHistory.Front() != nil {
h.queueRTO(h.packetHistory.Front())
if p := h.packetHistory.Front(); p != nil {
h.queueRTO(p)
}
if h.packetHistory.Front() != nil {
h.queueRTO(h.packetHistory.Front())
}
// Reset the RTO timer here, since it's not clear that this packet contained any retransmittable frames
h.lastSentPacketTime = time.Now()
h.consecutiveRTOCount++
}
func (h *sentPacketHandler) queueRTO(el *PacketElement) {
@@ -312,28 +357,42 @@ func (h *sentPacketHandler) queueRTO(el *PacketElement) {
Number: packet.PacketNumber,
Length: packet.Length,
}}
h.congestion.OnCongestionEvent(false, h.BytesInFlight(), nil, packetsLost)
h.congestion.OnRetransmissionTimeout(true)
utils.Debugf("\tQueueing packet 0x%x for retransmission (RTO)", packet.PacketNumber)
h.queuePacketForRetransmission(el)
h.congestion.OnCongestionEvent(false, h.bytesInFlight, nil, packetsLost)
h.congestion.OnRetransmissionTimeout(true)
}
func (h *sentPacketHandler) getRTO() time.Duration {
func (h *sentPacketHandler) queuePacketForRetransmission(packetElement *PacketElement) {
packet := &packetElement.Value
h.bytesInFlight -= packet.Length
h.retransmissionQueue = append(h.retransmissionQueue, packet)
h.packetHistory.Remove(packetElement)
// strictly speaking, this is only necessary for RTO retransmissions
// this is because FastRetransmissions are triggered by missing ranges in ACKs, and then the LargestAcked will already be higher than the packet number of the retransmitted packet
h.stopWaitingManager.QueuedRetransmissionForPacketNumber(packet.PacketNumber)
}
func (h *sentPacketHandler) computeRTOTimeout() time.Duration {
rto := h.congestion.RetransmissionDelay()
if rto == 0 {
rto = protocol.DefaultRetransmissionTime
rto = defaultRTOTimeout
}
rto = utils.MaxDuration(rto, protocol.MinRetransmissionTime)
rto = utils.MaxDuration(rto, minRTOTimeout)
// Exponential backoff
rto *= 1 << h.consecutiveRTOCount
return utils.MinDuration(rto, protocol.MaxRetransmissionTime)
rto = rto << h.rtoCount
return utils.MinDuration(rto, maxRTOTimeout)
}
func (h *sentPacketHandler) TimeOfFirstRTO() time.Time {
if h.lastSentPacketTime.IsZero() {
return time.Time{}
func (h *sentPacketHandler) skippedPacketsAcked(ackFrame *frames.AckFrame) bool {
for _, p := range h.skippedPackets {
if ackFrame.AcksPacket(p) {
return true
}
}
return h.lastSentPacketTime.Add(h.getRTO())
return false
}
func (h *sentPacketHandler) garbageCollectSkippedPackets() {