keep track of which packets were sent as retransmissions

When an ACK for a packet that was retransmitted arrives, we don't need
to retransmit the retransmission, even if it was lost.
This commit is contained in:
Marten Seemann
2018-03-14 12:18:54 +01:00
parent 56720edc34
commit fed3bf503e
8 changed files with 415 additions and 71 deletions

View File

@@ -84,7 +84,7 @@ func NewSentPacketHandler(rttStats *congestion.RTTStats) SentPacketHandler {
}
func (h *sentPacketHandler) lowestUnacked() protocol.PacketNumber {
if p := h.packetHistory.Front(); p != nil {
if p := h.packetHistory.FirstOutstanding(); p != nil {
return p.PacketNumber
}
return h.largestAcked + 1
@@ -112,6 +112,27 @@ func (h *sentPacketHandler) SetHandshakeComplete() {
}
func (h *sentPacketHandler) SentPacket(packet *Packet) {
packet.sendTime = time.Now()
if isRetransmittable := h.sentPacketImpl(packet); isRetransmittable {
h.packetHistory.SentPacket(packet)
}
h.updateLossDetectionAlarm(packet.sendTime)
}
func (h *sentPacketHandler) SentPacketsAsRetransmission(packets []*Packet, retransmissionOf protocol.PacketNumber) {
now := time.Now()
var p []*Packet
for _, packet := range packets {
packet.sendTime = now
if isRetransmittable := h.sentPacketImpl(packet); isRetransmittable {
p = append(p, packet)
}
}
h.packetHistory.SentPacketsAsRetransmission(p, retransmissionOf)
h.updateLossDetectionAlarm(now)
}
func (h *sentPacketHandler) sentPacketImpl(packet *Packet) bool /* isRetransmittable */ {
for p := h.lastSentPacketNumber + 1; p < packet.PacketNumber; p++ {
h.skippedPackets = append(h.skippedPackets, p)
if len(h.skippedPackets) > protocol.MaxTrackedSkippedPackets {
@@ -119,7 +140,6 @@ func (h *sentPacketHandler) SentPacket(packet *Packet) {
}
}
now := time.Now()
h.lastSentPacketNumber = packet.PacketNumber
var largestAcked protocol.PacketNumber
@@ -133,27 +153,13 @@ func (h *sentPacketHandler) SentPacket(packet *Packet) {
isRetransmittable := len(packet.Frames) != 0
if isRetransmittable {
packet.sendTime = now
packet.largestAcked = largestAcked
h.bytesInFlight += packet.Length
h.packetHistory.SentPacket(packet)
}
h.congestion.OnPacketSent(
now,
h.bytesInFlight,
packet.PacketNumber,
packet.Length,
isRetransmittable,
)
h.congestion.OnPacketSent(packet.sendTime, h.bytesInFlight, packet.PacketNumber, packet.Length, isRetransmittable)
h.nextPacketSendTime = utils.MaxTime(h.nextPacketSendTime, now).Add(h.congestion.TimeUntilSend(h.bytesInFlight))
h.updateLossDetectionAlarm(now)
}
func (h *sentPacketHandler) SentPacketsAsRetransmission(packets []*Packet, retransmissionOf protocol.PacketNumber) {
for _, packet := range packets {
h.SentPacket(packet)
}
h.nextPacketSendTime = utils.MaxTime(h.nextPacketSendTime, packet.sendTime).Add(h.congestion.TimeUntilSend(h.bytesInFlight))
return isRetransmittable
}
func (h *sentPacketHandler) ReceivedAck(ackFrame *wire.AckFrame, withPacketNumber protocol.PacketNumber, encLevel protocol.EncryptionLevel, rcvTime time.Time) error {
@@ -192,11 +198,17 @@ func (h *sentPacketHandler) ReceivedAck(ackFrame *wire.AckFrame, withPacketNumbe
if p.largestAcked != 0 {
h.lowestPacketNotConfirmedAcked = utils.MaxPacketNumber(h.lowestPacketNotConfirmedAcked, p.largestAcked+1)
}
h.onPacketAcked(p)
h.congestion.OnPacketAcked(p.PacketNumber, p.Length, h.bytesInFlight)
if err := h.onPacketAcked(p); err != nil {
return err
}
if len(p.retransmittedAs) == 0 {
h.congestion.OnPacketAcked(p.PacketNumber, p.Length, h.bytesInFlight)
}
}
h.detectLostPackets(rcvTime)
if err := h.detectLostPackets(rcvTime); err != nil {
return err
}
h.updateLossDetectionAlarm(rcvTime)
h.garbageCollectSkippedPackets()
@@ -271,7 +283,7 @@ func (h *sentPacketHandler) updateLossDetectionAlarm(now time.Time) {
}
}
func (h *sentPacketHandler) detectLostPackets(now time.Time) {
func (h *sentPacketHandler) detectLostPackets(now time.Time) error {
h.lossTime = time.Time{}
maxRTT := float64(utils.MaxDuration(h.rttStats.LatestRTT(), h.rttStats.SmoothedRTT()))
@@ -282,6 +294,9 @@ func (h *sentPacketHandler) detectLostPackets(now time.Time) {
if packet.PacketNumber > h.largestAcked {
return false, nil
}
if packet.queuedForRetransmission { // don't retransmit packets twice
return true, nil
}
timeSinceSent := now.Sub(packet.sendTime)
if timeSinceSent > delayUntilLost {
@@ -294,40 +309,99 @@ func (h *sentPacketHandler) detectLostPackets(now time.Time) {
})
for _, p := range lostPackets {
h.queuePacketForRetransmission(p)
if err := h.queuePacketForRetransmission(p); err != nil {
return err
}
h.congestion.OnPacketLost(p.PacketNumber, p.Length, h.bytesInFlight)
}
return nil
}
func (h *sentPacketHandler) OnAlarm() {
func (h *sentPacketHandler) OnAlarm() error {
now := time.Now()
// TODO(#497): TLP
var err error
if !h.handshakeComplete {
h.queueHandshakePacketsForRetransmission()
h.handshakeCount++
err = h.queueHandshakePacketsForRetransmission()
} else if !h.lossTime.IsZero() {
// Early retransmit or time loss detection
h.detectLostPackets(now)
err = h.detectLostPackets(now)
} else {
// RTO
h.retransmitOldestTwoPackets()
h.rtoCount++
err = h.queueRTOs()
}
if err != nil {
return err
}
h.updateLossDetectionAlarm(now)
return nil
}
func (h *sentPacketHandler) GetAlarmTimeout() time.Time {
return h.alarm
}
func (h *sentPacketHandler) onPacketAcked(p *Packet) {
h.bytesInFlight -= p.Length
func (h *sentPacketHandler) onPacketAcked(p *Packet) error {
// This happens if a packet and its retransmissions is acked in the same ACK.
// As soon as we process the first one, this will remove all the retransmissions,
// so we won't find the retransmitted packet number later.
if packet := h.packetHistory.GetPacket(p.PacketNumber); packet == nil {
return nil
}
h.rtoCount = 0
h.handshakeCount = 0
// TODO(#497): h.tlpCount = 0
h.packetHistory.Remove(p.PacketNumber)
// find the first packet, from which on we can delete all retransmissions
// example: packet 10 was retransmitted as packet 11 and 12, and
// packet 12 was then retransmitted as 13.
// When receiving an ACK for packet 13, we can remove packets 12 and 13,
// but still need to keep 10 and 11.
first := p
for first.isRetransmission {
previous := h.packetHistory.GetPacket(first.retransmissionOf)
if previous == nil {
return fmt.Errorf("sent packet handler BUG: retransmitted packet for %d not found (should have been %d)", first.PacketNumber, first.retransmissionOf)
}
// if the retransmission of a packet was split, we can't remove it yet
if len(previous.retransmittedAs) > 1 {
break
}
first = previous
}
if first.isRetransmission {
root := h.packetHistory.GetPacket(first.retransmissionOf)
retransmittedAs := make([]protocol.PacketNumber, 0, len(root.retransmittedAs)-1)
for _, pn := range root.retransmittedAs {
if pn != first.PacketNumber {
retransmittedAs = append(retransmittedAs, pn)
}
}
root.retransmittedAs = retransmittedAs
}
return h.removeAllRetransmissions(first)
}
func (h *sentPacketHandler) removeAllRetransmissions(p *Packet) error {
if !p.queuedForRetransmission {
// The bytes in flight are reduced when a packet is queued for retransmission.
// When a packet is acked, we only need to reduce it for packets that were not retransmitted.
h.bytesInFlight -= p.Length
} else {
for _, r := range p.retransmittedAs {
packet := h.packetHistory.GetPacket(r)
if packet == nil {
return fmt.Errorf("sent packet handler BUG: removing packet %d (retransmission of %d) not found in history", r, p.PacketNumber)
}
if err := h.removeAllRetransmissions(packet); err != nil {
return err
}
}
}
return h.packetHistory.Remove(p.PacketNumber)
}
func (h *sentPacketHandler) DequeuePacketForRetransmission() *Packet {
@@ -389,40 +463,45 @@ func (h *sentPacketHandler) ShouldSendNumPackets() int {
return int(math.Ceil(float64(protocol.MinPacingDelay) / float64(delay)))
}
func (h *sentPacketHandler) retransmitOldestTwoPackets() {
if p := h.packetHistory.Front(); p != nil {
h.queueRTO(p)
}
if p := h.packetHistory.Front(); p != nil {
h.queueRTO(p)
// retransmit the oldest two packets
func (h *sentPacketHandler) queueRTOs() error {
for i := 0; i < 2; i++ {
if p := h.packetHistory.FirstOutstanding(); p != nil {
utils.Debugf("\tQueueing packet %#x for retransmission (RTO), %d outstanding", p.PacketNumber, h.packetHistory.Len())
if err := h.queuePacketForRetransmission(p); err != nil {
return err
}
h.congestion.OnPacketLost(p.PacketNumber, p.Length, h.bytesInFlight)
h.congestion.OnRetransmissionTimeout(true)
}
}
return nil
}
func (h *sentPacketHandler) queueRTO(p *Packet) {
utils.Debugf("\tQueueing packet 0x%x for retransmission (RTO), %d outstanding", p.PacketNumber, h.packetHistory.Len())
h.queuePacketForRetransmission(p)
h.congestion.OnPacketLost(p.PacketNumber, p.Length, h.bytesInFlight)
h.congestion.OnRetransmissionTimeout(true)
}
func (h *sentPacketHandler) queueHandshakePacketsForRetransmission() {
func (h *sentPacketHandler) queueHandshakePacketsForRetransmission() error {
var handshakePackets []*Packet
h.packetHistory.Iterate(func(p *Packet) (bool, error) {
if p.EncryptionLevel < protocol.EncryptionForwardSecure {
if !p.queuedForRetransmission && p.EncryptionLevel < protocol.EncryptionForwardSecure {
handshakePackets = append(handshakePackets, p)
}
return true, nil
})
for _, p := range handshakePackets {
h.queuePacketForRetransmission(p)
if err := h.queuePacketForRetransmission(p); err != nil {
return err
}
}
return nil
}
func (h *sentPacketHandler) queuePacketForRetransmission(p *Packet) {
func (h *sentPacketHandler) queuePacketForRetransmission(p *Packet) error {
if _, err := h.packetHistory.QueuePacketForRetransmission(p.PacketNumber); err != nil {
return err
}
h.bytesInFlight -= p.Length
h.retransmissionQueue = append(h.retransmissionQueue, p)
h.packetHistory.Remove(p.PacketNumber)
h.stopWaitingManager.QueuedRetransmissionForPacketNumber(p.PacketNumber)
return nil
}
func (h *sentPacketHandler) computeHandshakeTimeout() time.Duration {