forked from quic-go/quic-go
Merge pull request #282 from lucas-clemente/fix-261
Fix flaky drop tests
This commit is contained in:
@@ -15,7 +15,7 @@ type SentPacketHandler interface {
|
||||
|
||||
GetStopWaitingFrame() *frames.StopWaitingFrame
|
||||
|
||||
ProbablyHasPacketForRetransmission() bool
|
||||
MaybeQueueRTOs()
|
||||
DequeuePacketForRetransmission() (packet *ackhandlerlegacy.Packet)
|
||||
|
||||
BytesInFlight() protocol.ByteCount
|
||||
|
||||
@@ -2,6 +2,7 @@ package ackhandler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/ackhandlerlegacy"
|
||||
@@ -227,6 +228,9 @@ func (h *sentPacketHandler) ReceivedAck(ackFrame *frames.AckFrame, withPacketNum
|
||||
}
|
||||
|
||||
if packetNumber >= ackRange.FirstPacketNumber { // packet i contained in ACK range
|
||||
if packetNumber > ackRange.LastPacketNumber {
|
||||
return fmt.Errorf("BUG: ackhandler would have acked wrong packet 0x%x, while evaluating range 0x%x -> 0x%x", packetNumber, ackRange.FirstPacketNumber, ackRange.LastPacketNumber)
|
||||
}
|
||||
p := h.ackPacket(el)
|
||||
if p != nil {
|
||||
ackedPackets = append(ackedPackets, congestion.PacketInfo{Number: p.PacketNumber, Length: p.Length})
|
||||
@@ -262,16 +266,8 @@ func (h *sentPacketHandler) ReceivedAck(ackFrame *frames.AckFrame, withPacketNum
|
||||
return nil
|
||||
}
|
||||
|
||||
// ProbablyHasPacketForRetransmission returns if there is a packet queued for retransmission
|
||||
// There is one case where it gets the answer wrong:
|
||||
// if a packet has already been queued for retransmission, but a belated ACK is received for this packet, this function will return true, although the packet will not be returend for retransmission by DequeuePacketForRetransmission()
|
||||
func (h *sentPacketHandler) ProbablyHasPacketForRetransmission() bool {
|
||||
h.maybeQueuePacketsRTO()
|
||||
return len(h.retransmissionQueue) > 0
|
||||
}
|
||||
|
||||
func (h *sentPacketHandler) DequeuePacketForRetransmission() *ackhandlerlegacy.Packet {
|
||||
if !h.ProbablyHasPacketForRetransmission() {
|
||||
if len(h.retransmissionQueue) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -310,7 +306,7 @@ func (h *sentPacketHandler) CheckForError() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *sentPacketHandler) maybeQueuePacketsRTO() {
|
||||
func (h *sentPacketHandler) MaybeQueueRTOs() {
|
||||
if time.Now().Before(h.TimeOfFirstRTO()) {
|
||||
return
|
||||
}
|
||||
@@ -329,6 +325,8 @@ func (h *sentPacketHandler) maybeQueuePacketsRTO() {
|
||||
h.congestion.OnRetransmissionTimeout(true)
|
||||
utils.Debugf("\tQueueing packet 0x%x for retransmission (RTO)", packet.PacketNumber)
|
||||
h.queuePacketForRetransmission(el)
|
||||
// Reset the RTO timer here, since it's not clear that this packet contained any retransmittable frames
|
||||
h.lastSentPacketTime = time.Now()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -568,7 +568,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
}
|
||||
Expect(getPacketElement(2)).ToNot(BeNil())
|
||||
Expect(handler.ProbablyHasPacketForRetransmission()).To(BeFalse())
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.DequeuePacketForRetransmission()).To(BeNil())
|
||||
})
|
||||
|
||||
@@ -578,7 +578,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
}
|
||||
Expect(getPacketElement(2)).To(BeNil())
|
||||
Expect(handler.ProbablyHasPacketForRetransmission()).To(BeTrue())
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.retransmissionQueue).To(HaveLen(1))
|
||||
Expect(handler.retransmissionQueue[0].PacketNumber).To(Equal(protocol.PacketNumber(2)))
|
||||
})
|
||||
@@ -819,7 +819,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
err := handler.SentPacket(&ackhandlerlegacy.Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
handler.lastSentPacketTime = time.Now().Add(-time.Second)
|
||||
handler.maybeQueuePacketsRTO()
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(cong.nCalls).To(Equal(3))
|
||||
// rttUpdated, bytesInFlight, ackedPackets, lostPackets
|
||||
Expect(cong.argsOnCongestionEvent[0]).To(BeFalse())
|
||||
@@ -866,7 +866,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
It("does nothing if not required", func() {
|
||||
err := handler.SentPacket(&ackhandlerlegacy.Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
handler.maybeQueuePacketsRTO()
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.retransmissionQueue).To(BeEmpty())
|
||||
})
|
||||
|
||||
@@ -875,25 +875,19 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
err := handler.SentPacket(p)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
handler.lastSentPacketTime = time.Now().Add(-time.Second)
|
||||
handler.maybeQueuePacketsRTO()
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.retransmissionQueue).To(HaveLen(1))
|
||||
Expect(handler.retransmissionQueue[0].PacketNumber).To(Equal(p.PacketNumber))
|
||||
Expect(time.Now().Sub(handler.lastSentPacketTime)).To(BeNumerically("<", time.Second/2))
|
||||
})
|
||||
})
|
||||
|
||||
It("works with HasPacketForRetransmission", func() {
|
||||
p := &ackhandlerlegacy.Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1}
|
||||
err := handler.SentPacket(p)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
handler.lastSentPacketTime = time.Now().Add(-time.Second)
|
||||
Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil())
|
||||
})
|
||||
|
||||
It("works with DequeuePacketForRetransmission", func() {
|
||||
p := &ackhandlerlegacy.Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1}
|
||||
err := handler.SentPacket(p)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
handler.lastSentPacketTime = time.Now().Add(-time.Second)
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.DequeuePacketForRetransmission().PacketNumber).To(Equal(p.PacketNumber))
|
||||
})
|
||||
})
|
||||
|
||||
@@ -14,7 +14,7 @@ type SentPacketHandler interface {
|
||||
|
||||
GetStopWaitingFrame() *frames.StopWaitingFrame
|
||||
|
||||
ProbablyHasPacketForRetransmission() bool
|
||||
MaybeQueueRTOs()
|
||||
DequeuePacketForRetransmission() (packet *Packet)
|
||||
|
||||
BytesInFlight() protocol.ByteCount
|
||||
|
||||
@@ -273,17 +273,8 @@ func (h *sentPacketHandler) ReceivedAck(ackFrameOrig *frames.AckFrame, withPacke
|
||||
return nil
|
||||
}
|
||||
|
||||
// ProbablyHasPacketForRetransmission returns if there is a packet queued for retransmission
|
||||
// There is one case where it gets the answer wrong:
|
||||
// if a packet has already been queued for retransmission, but a belated ACK is received for this packet, this function will return true, although the packet will not be returend for retransmission by DequeuePacketForRetransmission()
|
||||
func (h *sentPacketHandler) ProbablyHasPacketForRetransmission() bool {
|
||||
h.maybeQueuePacketsRTO()
|
||||
|
||||
return len(h.retransmissionQueue) > 0
|
||||
}
|
||||
|
||||
func (h *sentPacketHandler) DequeuePacketForRetransmission() (packet *Packet) {
|
||||
if !h.ProbablyHasPacketForRetransmission() {
|
||||
if len(h.retransmissionQueue) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -329,7 +320,7 @@ func (h *sentPacketHandler) CheckForError() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *sentPacketHandler) maybeQueuePacketsRTO() {
|
||||
func (h *sentPacketHandler) MaybeQueueRTOs() {
|
||||
if time.Now().Before(h.TimeOfFirstRTO()) {
|
||||
return
|
||||
}
|
||||
@@ -344,6 +335,8 @@ func (h *sentPacketHandler) maybeQueuePacketsRTO() {
|
||||
h.congestion.OnRetransmissionTimeout(true)
|
||||
utils.Debugf("\tQueueing packet 0x%x for retransmission (RTO)", packet.PacketNumber)
|
||||
h.queuePacketForRetransmission(packet)
|
||||
// Reset the RTO timer here, since it's not clear that this packet contained any retransmittable frames
|
||||
h.lastSentPacketTime = time.Now()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -513,7 +513,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
_, err := handler.nackPacket(2)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
}
|
||||
Expect(handler.ProbablyHasPacketForRetransmission()).To(BeFalse())
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.DequeuePacketForRetransmission()).To(BeNil())
|
||||
})
|
||||
|
||||
@@ -522,7 +522,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
_, err := handler.nackPacket(2)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
}
|
||||
Expect(handler.ProbablyHasPacketForRetransmission()).To(BeTrue())
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.retransmissionQueue).To(HaveLen(1))
|
||||
Expect(handler.retransmissionQueue[0].PacketNumber).To(Equal(protocol.PacketNumber(2)))
|
||||
})
|
||||
@@ -594,8 +594,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
}
|
||||
// this is the belated ACK
|
||||
handler.ackPacket(2)
|
||||
// this is the edge case where ProbablyHasPacketForRetransmission() get's it wrong: it says there's probably a packet for retransmission, but actually there isn't
|
||||
Expect(handler.ProbablyHasPacketForRetransmission()).To(BeTrue())
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.DequeuePacketForRetransmission()).To(BeNil())
|
||||
})
|
||||
})
|
||||
@@ -712,7 +711,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
err := handler.SentPacket(&Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
handler.lastSentPacketTime = time.Now().Add(-time.Second)
|
||||
handler.maybeQueuePacketsRTO()
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(cong.nCalls).To(Equal(3))
|
||||
// rttUpdated, bytesInFlight, ackedPackets, lostPackets
|
||||
Expect(cong.argsOnCongestionEvent[0]).To(BeFalse())
|
||||
@@ -757,7 +756,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
|
||||
It("ignores nil packets", func() {
|
||||
handler.packetHistory[1] = nil
|
||||
handler.maybeQueuePacketsRTO()
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.TimeOfFirstRTO().IsZero()).To(BeTrue())
|
||||
})
|
||||
})
|
||||
@@ -766,7 +765,7 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
It("does nothing if not required", func() {
|
||||
err := handler.SentPacket(&Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
handler.maybeQueuePacketsRTO()
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.retransmissionQueue).To(BeEmpty())
|
||||
})
|
||||
|
||||
@@ -775,9 +774,10 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
err := handler.SentPacket(p)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
handler.lastSentPacketTime = time.Now().Add(-time.Second)
|
||||
handler.maybeQueuePacketsRTO()
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.retransmissionQueue).To(HaveLen(1))
|
||||
Expect(handler.retransmissionQueue[0]).To(Equal(p))
|
||||
Expect(time.Now().Sub(handler.lastSentPacketTime)).To(BeNumerically("<", time.Second/2))
|
||||
})
|
||||
|
||||
It("does not queue retransmittedpackets", func() {
|
||||
@@ -785,30 +785,23 @@ var _ = Describe("SentPacketHandler", func() {
|
||||
err := handler.SentPacket(p)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
handler.lastSentPacketTime = time.Now().Add(-time.Second)
|
||||
handler.maybeQueuePacketsRTO()
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.retransmissionQueue).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("ignores nil packets", func() {
|
||||
handler.packetHistory[1] = nil
|
||||
handler.maybeQueuePacketsRTO()
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.retransmissionQueue).To(BeEmpty())
|
||||
})
|
||||
})
|
||||
|
||||
It("works with HasPacketForRetransmission", func() {
|
||||
p := &Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1}
|
||||
err := handler.SentPacket(p)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
handler.lastSentPacketTime = time.Now().Add(-time.Second)
|
||||
Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil())
|
||||
})
|
||||
|
||||
It("works with DequeuePacketForRetransmission", func() {
|
||||
p := &Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1}
|
||||
err := handler.SentPacket(p)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
handler.lastSentPacketTime = time.Now().Add(-time.Second)
|
||||
handler.MaybeQueueRTOs()
|
||||
Expect(handler.DequeuePacketForRetransmission()).To(Equal(p))
|
||||
})
|
||||
})
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
. "github.com/onsi/gomega/gexec"
|
||||
)
|
||||
|
||||
var _ = PDescribe("Drop Proxy", func() {
|
||||
var _ = Describe("Drop Proxy", func() {
|
||||
BeforeEach(func() {
|
||||
dataMan.GenerateData(dataLen)
|
||||
})
|
||||
|
||||
@@ -440,6 +440,9 @@ func (s *Session) sendPacket() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Do this before checking the congestion, since we might de-congestionize here :)
|
||||
s.sentPacketHandler.MaybeQueueRTOs()
|
||||
|
||||
if !s.sentPacketHandler.CongestionAllowsSending() {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -47,7 +47,9 @@ func (m *mockUnpacker) Unpack(publicHeaderBinary []byte, hdr *PublicHeader, data
|
||||
}
|
||||
|
||||
type mockSentPacketHandler struct {
|
||||
retransmissionQueue []*ackhandlerlegacy.Packet
|
||||
retransmissionQueue []*ackhandlerlegacy.Packet
|
||||
congestionLimited bool
|
||||
maybeQueueRTOsCalled bool
|
||||
}
|
||||
|
||||
func (h *mockSentPacketHandler) SentPacket(packet *ackhandlerlegacy.Packet) error { return nil }
|
||||
@@ -59,12 +61,12 @@ func (h *mockSentPacketHandler) GetLeastUnacked() protocol.PacketNumber { return
|
||||
func (h *mockSentPacketHandler) GetStopWaitingFrame() *frames.StopWaitingFrame {
|
||||
panic("not implemented")
|
||||
}
|
||||
func (h *mockSentPacketHandler) CongestionAllowsSending() bool { return true }
|
||||
func (h *mockSentPacketHandler) CongestionAllowsSending() bool { return !h.congestionLimited }
|
||||
func (h *mockSentPacketHandler) CheckForError() error { return nil }
|
||||
func (h *mockSentPacketHandler) TimeOfFirstRTO() time.Time { panic("not implemented") }
|
||||
|
||||
func (h *mockSentPacketHandler) ProbablyHasPacketForRetransmission() bool {
|
||||
return len(h.retransmissionQueue) > 0
|
||||
func (h *mockSentPacketHandler) MaybeQueueRTOs() {
|
||||
h.maybeQueueRTOsCalled = true
|
||||
}
|
||||
|
||||
func (h *mockSentPacketHandler) DequeuePacketForRetransmission() *ackhandlerlegacy.Packet {
|
||||
@@ -570,6 +572,15 @@ var _ = Describe("Session", func() {
|
||||
Expect(conn.written[0]).To(ContainSubstring("foobar"))
|
||||
Expect(conn.written[0]).To(ContainSubstring("loremipsum"))
|
||||
})
|
||||
|
||||
It("calls MaybeQueueRTOs even if congestion blocked, so that bytesInFlight is updated", func() {
|
||||
sph := newMockSentPacketHandler()
|
||||
sph.(*mockSentPacketHandler).congestionLimited = true
|
||||
session.sentPacketHandler = sph
|
||||
err := session.sendPacket()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(sph.(*mockSentPacketHandler).maybeQueueRTOsCalled).To(BeTrue())
|
||||
})
|
||||
})
|
||||
|
||||
Context("scheduling sending", func() {
|
||||
@@ -729,24 +740,33 @@ var _ = Describe("Session", func() {
|
||||
err := session.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{PacketNumber: p, Length: 1})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
time.Sleep(time.Microsecond)
|
||||
ack := frames.AckFrameLegacy{LargestObserved: p}
|
||||
err = session.sentPacketHandler.ReceivedAck(&frames.AckFrame{AckFrameLegacy: &ack}, p)
|
||||
ack := &frames.AckFrame{}
|
||||
if version == protocol.Version33 {
|
||||
ack.AckFrameLegacy = &frames.AckFrameLegacy{LargestObserved: p}
|
||||
} else {
|
||||
ack.LargestAcked = p
|
||||
}
|
||||
err = session.sentPacketHandler.ReceivedAck(ack, p)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
if version == protocol.Version33 {
|
||||
session.packer.lastPacketNumber = n
|
||||
} else {
|
||||
session.packer.packetNumberGenerator.last = n
|
||||
}
|
||||
// Now, we send a single packet, and expect that it was retransmitted later
|
||||
go session.run()
|
||||
Expect(conn.written).To(BeEmpty())
|
||||
err := session.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{
|
||||
PacketNumber: n,
|
||||
Length: 1,
|
||||
Frames: []frames.Frame{&frames.StreamFrame{
|
||||
Data: bytes.Repeat([]byte{'a'}, 1000),
|
||||
Data: []byte("foobar"),
|
||||
}},
|
||||
})
|
||||
session.packer.lastPacketNumber = n
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
go session.run()
|
||||
session.scheduleSending()
|
||||
Eventually(func() bool { return len(conn.written) > 0 }).Should(BeTrue())
|
||||
Eventually(func() [][]byte { return conn.written }).ShouldNot(BeEmpty())
|
||||
Expect(conn.written[0]).To(ContainSubstring("foobar"))
|
||||
})
|
||||
|
||||
Context("counting streams", func() {
|
||||
|
||||
Reference in New Issue
Block a user