use linked list to store sent packets in new AckHandler

This commit is contained in:
Marten Seemann
2016-08-10 00:05:06 +07:00
parent fe531dd65e
commit cba965cc0c
6 changed files with 434 additions and 255 deletions

View File

@@ -34,8 +34,7 @@ type sentPacketHandler struct {
largestReceivedPacketWithAck protocol.PacketNumber
// TODO: Move into separate class as in chromium
packetHistory map[protocol.PacketNumber]*ackhandlerlegacy.Packet
packetHistory *ackhandlerlegacy.PacketList
stopWaitingManager stopWaitingManager
retransmissionQueue []*ackhandlerlegacy.Packet
@@ -59,76 +58,70 @@ func NewSentPacketHandler() SentPacketHandler {
)
return &sentPacketHandler{
packetHistory: make(map[protocol.PacketNumber]*ackhandlerlegacy.Packet),
packetHistory: ackhandlerlegacy.NewPacketList(),
stopWaitingManager: stopWaitingManager{},
rttStats: rttStats,
congestion: congestion,
}
}
func (h *sentPacketHandler) ackPacket(packetNumber protocol.PacketNumber) *ackhandlerlegacy.Packet {
packet, ok := h.packetHistory[packetNumber]
if ok && !packet.Retransmitted {
h.bytesInFlight -= packet.Length
}
func (h *sentPacketHandler) ackPacket(packetElement *ackhandlerlegacy.PacketElement) *ackhandlerlegacy.Packet {
packet := &packetElement.Value
if h.LargestInOrderAcked == packetNumber-1 {
h.bytesInFlight -= packet.Length
if h.LargestInOrderAcked == packet.PacketNumber-1 {
h.LargestInOrderAcked++
if next := packetElement.Next(); next != nil {
h.LargestInOrderAcked = next.Value.PacketNumber - 1
}
}
delete(h.packetHistory, packetNumber)
h.packetHistory.Remove(packetElement)
return packet
}
func (h *sentPacketHandler) nackPacket(packetNumber protocol.PacketNumber) (*ackhandlerlegacy.Packet, error) {
packet, ok := h.packetHistory[packetNumber]
// This means that the packet has already been retransmitted, do nothing.
// We're probably only receiving another NACK for this packet because the
// retransmission has not yet arrived at the client.
if !ok {
return nil, nil
}
if packet.Retransmitted {
return nil, nil
}
func (h *sentPacketHandler) nackPacket(packetElement *ackhandlerlegacy.PacketElement) (*ackhandlerlegacy.Packet, error) {
packet := &packetElement.Value
packet.MissingReports++
if packet.MissingReports > protocol.RetransmissionThreshold {
h.queuePacketForRetransmission(packet)
h.queuePacketForRetransmission(packetElement)
return packet, nil
}
return nil, nil
}
func (h *sentPacketHandler) queuePacketForRetransmission(packet *ackhandlerlegacy.Packet) {
// does NOT set packet.Retransmitted. This variable is not needed anymore
func (h *sentPacketHandler) queuePacketForRetransmission(packetElement *ackhandlerlegacy.PacketElement) {
packet := &packetElement.Value
utils.Debugf("\tQueueing packet 0x%x for retransmission", packet.PacketNumber)
h.bytesInFlight -= packet.Length
h.retransmissionQueue = append(h.retransmissionQueue, packet)
packet.Retransmitted = true
// If this is the lowest packet that hasn't been acked or retransmitted yet ...
if packet.PacketNumber == h.LargestInOrderAcked+1 {
// ... increase the LargestInOrderAcked until it's one before the next packet that was not acked and not retransmitted
for h.LargestInOrderAcked < h.LargestAcked {
if p, notAcked := h.packetHistory[h.LargestInOrderAcked+1]; notAcked && !p.Retransmitted {
for el := packetElement; el != nil; el = el.Next() {
if h.LargestInOrderAcked == h.LargestAcked {
break
}
h.LargestInOrderAcked++
h.LargestInOrderAcked = el.Value.PacketNumber - 1
}
}
h.packetHistory.Remove(packetElement)
// strictly speaking, this is only necessary for RTO retransmissions
// this is because FastRetransmissions are triggered by missing ranges in ACKs, and then the LargestAcked will already be higher than the packet number of the retransmitted packet
h.stopWaitingManager.QueuedRetransmissionForPacketNumber(packet.PacketNumber)
}
func (h *sentPacketHandler) SentPacket(packet *ackhandlerlegacy.Packet) error {
_, ok := h.packetHistory[packet.PacketNumber]
if ok {
return errDuplicatePacketNumber
}
// TODO: check for decreasing or duplicate packet numbers
now := time.Now()
h.lastSentPacketTime = now
@@ -139,7 +132,7 @@ func (h *sentPacketHandler) SentPacket(packet *ackhandlerlegacy.Packet) error {
h.bytesInFlight += packet.Length
h.lastSentPacketNumber = packet.PacketNumber
h.packetHistory[packet.PacketNumber] = packet
h.packetHistory.PushBack(*packet)
h.congestion.OnPacketSent(
time.Now(),
@@ -171,48 +164,58 @@ func (h *sentPacketHandler) ReceivedAck(ackFrame *frames.AckFrame, withPacketNum
h.LargestAcked = ackFrame.LargestAcked
packet, ok := h.packetHistory[h.LargestAcked]
if ok {
// Update the RTT
timeDelta := time.Now().Sub(packet.SendTime)
// TODO: Don't always update RTT
h.rttStats.UpdateRTT(timeDelta, ackFrame.DelayTime, time.Now())
if utils.Debug() {
utils.Debugf("\tEstimated RTT: %dms", h.rttStats.SmoothedRTT()/time.Millisecond)
}
}
var ackedPackets congestion.PacketVector
var lostPackets congestion.PacketVector
// NACK packets below the LowestAcked
for i := h.LargestInOrderAcked + 1; i < ackFrame.LowestAcked; i++ {
p, err := h.nackPacket(i)
if err != nil {
return err
}
if p != nil {
lostPackets = append(lostPackets, congestion.PacketInfo{Number: p.PacketNumber, Length: p.Length})
}
}
ackRangeIndex := 0
for i := ackFrame.LowestAcked; i <= ackFrame.LargestAcked; i++ {
var el, elNext *ackhandlerlegacy.PacketElement
for el = h.packetHistory.Front(); el != nil; el = elNext {
// determine the next list element right at the beginning, because el.Next() is not avaible anymore, when the list element is deleted (i.e. when the packet is ACKed)
elNext = el.Next()
packet := el.Value
packetNumber := packet.PacketNumber
// NACK packets below the LowestAcked
if packetNumber < ackFrame.LowestAcked {
p, err := h.nackPacket(el)
if err != nil {
return err
}
if p != nil {
lostPackets = append(lostPackets, congestion.PacketInfo{Number: p.PacketNumber, Length: p.Length})
}
continue
}
// Update the RTT
if packetNumber == h.LargestAcked {
timeDelta := time.Now().Sub(packet.SendTime)
// TODO: Don't always update RTT
h.rttStats.UpdateRTT(timeDelta, ackFrame.DelayTime, time.Now())
if utils.Debug() {
utils.Debugf("\tEstimated RTT: %dms", h.rttStats.SmoothedRTT()/time.Millisecond)
}
}
if packetNumber > ackFrame.LargestAcked {
break
}
if ackFrame.HasMissingRanges() {
ackRange := ackFrame.AckRanges[len(ackFrame.AckRanges)-1-ackRangeIndex]
if i > ackRange.LastPacketNumber && ackRangeIndex < len(ackFrame.AckRanges)-1 {
if packetNumber > ackRange.LastPacketNumber && ackRangeIndex < len(ackFrame.AckRanges)-1 {
ackRangeIndex++
ackRange = ackFrame.AckRanges[len(ackFrame.AckRanges)-1-ackRangeIndex]
}
if i >= ackRange.FirstPacketNumber { // packet i contained in ACK range
p := h.ackPacket(i)
if packetNumber >= ackRange.FirstPacketNumber { // packet i contained in ACK range
p := h.ackPacket(el)
if p != nil {
ackedPackets = append(ackedPackets, congestion.PacketInfo{Number: p.PacketNumber, Length: p.Length})
}
} else {
p, err := h.nackPacket(i)
p, err := h.nackPacket(el)
if err != nil {
return err
}
@@ -221,7 +224,7 @@ func (h *sentPacketHandler) ReceivedAck(ackFrame *frames.AckFrame, withPacketNum
}
}
} else {
p := h.ackPacket(i)
p := h.ackPacket(el)
if p != nil {
ackedPackets = append(ackedPackets, congestion.PacketInfo{Number: p.PacketNumber, Length: p.Length})
}
@@ -245,29 +248,19 @@ func (h *sentPacketHandler) ReceivedAck(ackFrame *frames.AckFrame, withPacketNum
// if a packet has already been queued for retransmission, but a belated ACK is received for this packet, this function will return true, although the packet will not be returend for retransmission by DequeuePacketForRetransmission()
func (h *sentPacketHandler) ProbablyHasPacketForRetransmission() bool {
h.maybeQueuePacketsRTO()
return len(h.retransmissionQueue) > 0
}
func (h *sentPacketHandler) DequeuePacketForRetransmission() (packet *ackhandlerlegacy.Packet) {
func (h *sentPacketHandler) DequeuePacketForRetransmission() *ackhandlerlegacy.Packet {
if !h.ProbablyHasPacketForRetransmission() {
return nil
}
for len(h.retransmissionQueue) > 0 {
if len(h.retransmissionQueue) > 0 {
queueLen := len(h.retransmissionQueue)
// packets are usually NACKed in descending order. So use the slice as a stack
packet = h.retransmissionQueue[queueLen-1]
packet := h.retransmissionQueue[queueLen-1]
h.retransmissionQueue = h.retransmissionQueue[:queueLen-1]
// this happens if a belated ACK arrives for this packet
// no need to retransmit it
_, ok := h.packetHistory[packet.PacketNumber]
if !ok {
continue
}
delete(h.packetHistory, packet.PacketNumber)
return packet
}
@@ -291,7 +284,7 @@ func (h *sentPacketHandler) CongestionAllowsSending() bool {
}
func (h *sentPacketHandler) CheckForError() error {
length := len(h.retransmissionQueue) + len(h.packetHistory)
length := len(h.retransmissionQueue) + h.packetHistory.Len()
if uint32(length) > protocol.MaxTrackedSentPackets {
return ErrTooManyTrackedSentPackets
}
@@ -303,18 +296,21 @@ func (h *sentPacketHandler) maybeQueuePacketsRTO() {
return
}
for p := h.LargestInOrderAcked + 1; p <= h.lastSentPacketNumber; p++ {
packet := h.packetHistory[p]
if packet != nil && !packet.Retransmitted {
packetsLost := congestion.PacketVector{congestion.PacketInfo{
Number: packet.PacketNumber,
Length: packet.Length,
}}
h.congestion.OnCongestionEvent(false, h.BytesInFlight(), nil, packetsLost)
h.congestion.OnRetransmissionTimeout(true)
h.queuePacketForRetransmission(packet)
return
for el := h.packetHistory.Front(); el != nil; el = el.Next() {
packet := &el.Value
if packet.PacketNumber < h.LargestInOrderAcked {
continue
}
packetsLost := congestion.PacketVector{congestion.PacketInfo{
Number: packet.PacketNumber,
Length: packet.Length,
}}
h.congestion.OnCongestionEvent(false, h.BytesInFlight(), nil, packetsLost)
h.congestion.OnRetransmissionTimeout(true)
// utils.Debugf("\tqueueing RTO retransmission for packet 0x%x", packet.PacketNumber)
h.queuePacketForRetransmission(el)
return
}
}