Merge pull request #688 from lucas-clemente/fix-505

Fix non-retransmittable packet handling in SPH
This commit is contained in:
Lucas Clemente
2017-06-19 20:53:55 +02:00
committed by GitHub
9 changed files with 147 additions and 112 deletions

View File

@@ -9,6 +9,7 @@ import (
// SentPacketHandler handles ACKs received for outgoing packets
type SentPacketHandler interface {
// SentPacket may modify the packet
SentPacket(packet *Packet) error
ReceivedAck(ackFrame *frames.AckFrame, withPacketNumber protocol.PacketNumber, recvTime time.Time) error

View File

@@ -0,0 +1,38 @@
package ackhandler
import (
"github.com/lucas-clemente/quic-go/frames"
)
// Returns a new slice with all non-retransmittable frames deleted.
func stripNonRetransmittableFrames(fs []frames.Frame) []frames.Frame {
res := make([]frames.Frame, 0, len(fs))
for _, f := range fs {
if IsFrameRetransmittable(f) {
res = append(res, f)
}
}
return res
}
// IsFrameRetransmittable returns true if the frame should be retransmitted.
func IsFrameRetransmittable(f frames.Frame) bool {
switch f.(type) {
case *frames.StopWaitingFrame:
return false
case *frames.AckFrame:
return false
default:
return true
}
}
// HasRetransmittableFrames returns true if at least one frame is retransmittable.
func HasRetransmittableFrames(fs []frames.Frame) bool {
for _, f := range fs {
if IsFrameRetransmittable(f) {
return true
}
}
return false
}

View File

@@ -0,0 +1,44 @@
package ackhandler
import (
"reflect"
"github.com/lucas-clemente/quic-go/frames"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("retransmittable frames", func() {
for fl, el := range map[frames.Frame]bool{
&frames.AckFrame{}: false,
&frames.StopWaitingFrame{}: false,
&frames.BlockedFrame{}: true,
&frames.ConnectionCloseFrame{}: true,
&frames.GoawayFrame{}: true,
&frames.PingFrame{}: true,
&frames.RstStreamFrame{}: true,
&frames.StreamFrame{}: true,
&frames.WindowUpdateFrame{}: true,
} {
f := fl
e := el
fName := reflect.ValueOf(f).Elem().Type().Name()
It("works for "+fName, func() {
Expect(IsFrameRetransmittable(f)).To(Equal(e))
})
It("stripping non-retransmittable frames works for "+fName, func() {
s := []frames.Frame{f}
if e {
Expect(stripNonRetransmittableFrames(s)).To(Equal([]frames.Frame{f}))
} else {
Expect(stripNonRetransmittableFrames(s)).To(BeEmpty())
}
})
It("HasRetransmittableFrames works for "+fName, func() {
Expect(HasRetransmittableFrames([]frames.Frame{f})).To(Equal(e))
})
}
})

View File

@@ -106,26 +106,27 @@ func (h *sentPacketHandler) SentPacket(packet *Packet) error {
}
}
now := time.Now()
packet.SendTime = now
if packet.Length == 0 {
return errors.New("SentPacketHandler: packet cannot be empty")
}
h.bytesInFlight += packet.Length
h.lastSentPacketNumber = packet.PacketNumber
h.packetHistory.PushBack(*packet)
now := time.Now()
packet.Frames = stripNonRetransmittableFrames(packet.Frames)
isRetransmittable := len(packet.Frames) != 0
if isRetransmittable {
packet.SendTime = now
h.bytesInFlight += packet.Length
h.packetHistory.PushBack(*packet)
}
h.congestion.OnPacketSent(
now,
h.bytesInFlight,
packet.PacketNumber,
packet.Length,
true, /* TODO: is retransmittable */
isRetransmittable,
)
h.updateLossDetectionAlarm()
return nil
}
@@ -310,10 +311,11 @@ func (h *sentPacketHandler) DequeuePacketForRetransmission() *Packet {
if len(h.retransmissionQueue) == 0 {
return nil
}
queueLen := len(h.retransmissionQueue)
// packets are usually NACKed in descending order. So use the slice as a stack
packet := h.retransmissionQueue[queueLen-1]
h.retransmissionQueue = h.retransmissionQueue[:queueLen-1]
packet := h.retransmissionQueue[0]
// Shift the slice and don't retain anything that isn't needed.
copy(h.retransmissionQueue, h.retransmissionQueue[1:])
h.retransmissionQueue[len(h.retransmissionQueue)-1] = nil
h.retransmissionQueue = h.retransmissionQueue[:len(h.retransmissionQueue)-1]
return packet
}

View File

@@ -57,6 +57,10 @@ func (m *mockCongestion) OnPacketLost(n protocol.PacketNumber, l protocol.ByteCo
m.packetsLost = append(m.packetsLost, []interface{}{n, l, bif})
}
func retransmittablePacket(num protocol.PacketNumber) *Packet {
return &Packet{PacketNumber: num, Length: 1, Frames: []frames.Frame{&frames.PingFrame{}}}
}
var _ = Describe("SentPacketHandler", func() {
var (
handler *sentPacketHandler
@@ -133,6 +137,12 @@ var _ = Describe("SentPacketHandler", func() {
Expect(handler.packetHistory.Front().Value.SendTime.Unix()).To(BeNumerically("~", time.Now().Unix(), 1))
})
It("does not store non-retransmittable packets", func() {
err := handler.SentPacket(&Packet{PacketNumber: 1, Length: 1})
Expect(err).ToNot(HaveOccurred())
Expect(handler.packetHistory.Len()).To(BeZero())
})
Context("skipped packet numbers", func() {
It("works with non-consecutive packet numbers", func() {
packet1 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, Length: 1}
@@ -216,12 +226,10 @@ var _ = Describe("SentPacketHandler", func() {
It("checks the size of the packet history, for unacked packets", func() {
i := protocol.PacketNumber(1)
for ; i <= protocol.MaxTrackedSentPackets; i++ {
packet := Packet{PacketNumber: protocol.PacketNumber(i), Length: 1}
err := handler.SentPacket(&packet)
err := handler.SentPacket(retransmittablePacket(i))
Expect(err).ToNot(HaveOccurred())
}
packet := Packet{PacketNumber: protocol.PacketNumber(i), Length: 1}
err := handler.SentPacket(&packet)
err := handler.SentPacket(retransmittablePacket(i))
Expect(err).To(MatchError(ErrTooManyTrackedSentPackets))
})
@@ -631,6 +639,7 @@ var _ = Describe("SentPacketHandler", func() {
p := &Packet{
PacketNumber: 1,
Length: 42,
Frames: []frames.Frame{&frames.PingFrame{}},
}
err := handler.SentPacket(p)
Expect(err).NotTo(HaveOccurred())
@@ -641,8 +650,8 @@ var _ = Describe("SentPacketHandler", func() {
})
It("should call MaybeExitSlowStart and OnPacketAcked", func() {
handler.SentPacket(&Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1})
handler.SentPacket(&Packet{PacketNumber: 2, Frames: []frames.Frame{}, Length: 1})
handler.SentPacket(retransmittablePacket(1))
handler.SentPacket(retransmittablePacket(2))
err := handler.ReceivedAck(&frames.AckFrame{LargestAcked: 1, LowestAcked: 1}, 1, time.Now())
Expect(err).NotTo(HaveOccurred())
Expect(cong.maybeExitSlowStart).To(BeTrue())
@@ -653,9 +662,9 @@ var _ = Describe("SentPacketHandler", func() {
})
It("should call MaybeExitSlowStart and OnPacketLost", func() {
handler.SentPacket(&Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1})
handler.SentPacket(&Packet{PacketNumber: 2, Frames: []frames.Frame{}, Length: 1})
handler.SentPacket(&Packet{PacketNumber: 3, Frames: []frames.Frame{}, Length: 1})
handler.SentPacket(retransmittablePacket(1))
handler.SentPacket(retransmittablePacket(2))
handler.SentPacket(retransmittablePacket(3))
handler.OnAlarm() // RTO, meaning 2 lost packets
Expect(cong.maybeExitSlowStart).To(BeFalse())
Expect(cong.onRetransmissionTimeout).To(BeTrue())
@@ -668,7 +677,11 @@ var _ = Describe("SentPacketHandler", func() {
It("allows or denies sending based on congestion", func() {
Expect(handler.SendingAllowed()).To(BeTrue())
err := handler.SentPacket(&Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: protocol.DefaultTCPMSS + 1})
err := handler.SentPacket(&Packet{
PacketNumber: 1,
Frames: []frames.Frame{&frames.PingFrame{}},
Length: protocol.DefaultTCPMSS + 1,
})
Expect(err).NotTo(HaveOccurred())
Expect(handler.SendingAllowed()).To(BeFalse())
})
@@ -680,7 +693,11 @@ var _ = Describe("SentPacketHandler", func() {
})
It("allows sending if there are retransmisisons outstanding", func() {
err := handler.SentPacket(&Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: protocol.DefaultTCPMSS + 1})
err := handler.SentPacket(&Packet{
PacketNumber: 1,
Frames: []frames.Frame{&frames.PingFrame{}},
Length: protocol.DefaultTCPMSS + 1,
})
Expect(err).NotTo(HaveOccurred())
Expect(handler.SendingAllowed()).To(BeFalse())
handler.retransmissionQueue = []*Packet{nil}
@@ -724,9 +741,9 @@ var _ = Describe("SentPacketHandler", func() {
Context("Delay-based loss detection", func() {
It("detects a packet as lost", func() {
err := handler.SentPacket(&Packet{PacketNumber: 1, Length: 1})
err := handler.SentPacket(retransmittablePacket(1))
Expect(err).NotTo(HaveOccurred())
err = handler.SentPacket(&Packet{PacketNumber: 2, Length: 1})
err = handler.SentPacket(retransmittablePacket(2))
Expect(err).NotTo(HaveOccurred())
Expect(handler.lossTime.IsZero()).To(BeTrue())
@@ -747,9 +764,9 @@ var _ = Describe("SentPacketHandler", func() {
It("does not detect packets as lost without ACKs", func() {
err := handler.SentPacket(&Packet{PacketNumber: 1, Length: 1})
Expect(err).NotTo(HaveOccurred())
err = handler.SentPacket(&Packet{PacketNumber: 2, Length: 1})
err = handler.SentPacket(retransmittablePacket(2))
Expect(err).NotTo(HaveOccurred())
err = handler.SentPacket(&Packet{PacketNumber: 3, Length: 1})
err = handler.SentPacket(retransmittablePacket(3))
Expect(err).NotTo(HaveOccurred())
Expect(handler.lossTime.IsZero()).To(BeTrue())
@@ -767,9 +784,9 @@ var _ = Describe("SentPacketHandler", func() {
Context("RTO retransmission", func() {
It("queues two packets if RTO expires", func() {
err := handler.SentPacket(&Packet{PacketNumber: 1, Length: 1})
err := handler.SentPacket(retransmittablePacket(1))
Expect(err).NotTo(HaveOccurred())
err = handler.SentPacket(&Packet{PacketNumber: 2, Length: 1})
err = handler.SentPacket(retransmittablePacket(2))
Expect(err).NotTo(HaveOccurred())
handler.rttStats.UpdateRTT(time.Hour, 0, time.Now())
@@ -777,8 +794,12 @@ var _ = Describe("SentPacketHandler", func() {
Expect(handler.GetAlarmTimeout().Sub(time.Now())).To(BeNumerically("~", handler.computeRTOTimeout(), time.Minute))
handler.OnAlarm()
Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil())
Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil())
p := handler.DequeuePacketForRetransmission()
Expect(p).ToNot(BeNil())
Expect(p.PacketNumber).To(Equal(protocol.PacketNumber(1)))
p = handler.DequeuePacketForRetransmission()
Expect(p).ToNot(BeNil())
Expect(p.PacketNumber).To(Equal(protocol.PacketNumber(2)))
Expect(handler.rtoCount).To(BeEquivalentTo(1))
})

View File

@@ -10,6 +10,11 @@ import (
"github.com/lucas-clemente/quic-go/qerr"
)
type unpackedPacket struct {
encryptionLevel protocol.EncryptionLevel
frames []frames.Frame
}
type quicAEAD interface {
Open(dst, src []byte, packetNumber protocol.PacketNumber, associatedData []byte) ([]byte, protocol.EncryptionLevel, error)
}

View File

@@ -293,7 +293,7 @@ runLoop:
}
now := time.Now()
if s.sentPacketHandler.GetAlarmTimeout().Before(now) {
if timeout := s.sentPacketHandler.GetAlarmTimeout(); !timeout.IsZero() && timeout.Before(now) {
// This could cause packets to be retransmitted, so check it before trying
// to send packets.
s.sentPacketHandler.OnAlarm()
@@ -405,7 +405,8 @@ func (s *session) handlePacketImpl(p *receivedPacket) error {
// Only do this after decrypting, so we are sure the packet is not attacker-controlled
s.largestRcvdPacketNumber = utils.MaxPacketNumber(s.largestRcvdPacketNumber, hdr.PacketNumber)
if err = s.receivedPacketHandler.ReceivedPacket(hdr.PacketNumber, packet.IsRetransmittable()); err != nil {
isRetransmittable := ackhandler.HasRetransmittableFrames(packet.frames)
if err = s.receivedPacketHandler.ReceivedPacket(hdr.PacketNumber, isRetransmittable); err != nil {
return err
}

View File

@@ -1,31 +0,0 @@
package quic
import (
"github.com/lucas-clemente/quic-go/frames"
"github.com/lucas-clemente/quic-go/protocol"
)
type unpackedPacket struct {
encryptionLevel protocol.EncryptionLevel
frames []frames.Frame
}
func (u *unpackedPacket) IsRetransmittable() bool {
for _, f := range u.frames {
switch f.(type) {
case *frames.StreamFrame:
return true
case *frames.RstStreamFrame:
return true
case *frames.WindowUpdateFrame:
return true
case *frames.BlockedFrame:
return true
case *frames.PingFrame:
return true
case *frames.GoawayFrame:
return true
}
}
return false
}

View File

@@ -1,46 +0,0 @@
package quic
import (
"github.com/lucas-clemente/quic-go/frames"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Unpacked packet", func() {
var packet *unpackedPacket
BeforeEach(func() {
packet = &unpackedPacket{}
})
It("says that an empty packet is not retransmittable", func() {
Expect(packet.IsRetransmittable()).To(BeFalse())
})
It("detects the frame types", func() {
packet.frames = []frames.Frame{&frames.AckFrame{}}
Expect(packet.IsRetransmittable()).To(BeFalse())
packet.frames = []frames.Frame{&frames.BlockedFrame{}}
Expect(packet.IsRetransmittable()).To(BeTrue())
packet.frames = []frames.Frame{&frames.GoawayFrame{}}
Expect(packet.IsRetransmittable()).To(BeTrue())
packet.frames = []frames.Frame{&frames.PingFrame{}}
Expect(packet.IsRetransmittable()).To(BeTrue())
packet.frames = []frames.Frame{&frames.StreamFrame{}}
Expect(packet.IsRetransmittable()).To(BeTrue())
packet.frames = []frames.Frame{&frames.RstStreamFrame{}}
Expect(packet.IsRetransmittable()).To(BeTrue())
packet.frames = []frames.Frame{&frames.StopWaitingFrame{}}
Expect(packet.IsRetransmittable()).To(BeFalse())
packet.frames = []frames.Frame{&frames.WindowUpdateFrame{}}
Expect(packet.IsRetransmittable()).To(BeTrue())
})
It("says that a packet is retransmittable if it contains one retransmittable frame", func() {
packet.frames = []frames.Frame{
&frames.AckFrame{},
&frames.WindowUpdateFrame{},
&frames.StopWaitingFrame{},
}
Expect(packet.IsRetransmittable()).To(BeTrue())
})
})