diff --git a/ackhandler/interfaces.go b/ackhandler/interfaces.go index d478838ee..3dc2631b0 100644 --- a/ackhandler/interfaces.go +++ b/ackhandler/interfaces.go @@ -15,7 +15,7 @@ type SentPacketHandler interface { GetStopWaitingFrame() *frames.StopWaitingFrame - ProbablyHasPacketForRetransmission() bool + MaybeQueueRTOs() DequeuePacketForRetransmission() (packet *ackhandlerlegacy.Packet) BytesInFlight() protocol.ByteCount diff --git a/ackhandler/sent_packet_handler.go b/ackhandler/sent_packet_handler.go index a864c9edb..4a9abec49 100644 --- a/ackhandler/sent_packet_handler.go +++ b/ackhandler/sent_packet_handler.go @@ -2,6 +2,7 @@ package ackhandler import ( "errors" + "fmt" "time" "github.com/lucas-clemente/quic-go/ackhandlerlegacy" @@ -227,6 +228,9 @@ func (h *sentPacketHandler) ReceivedAck(ackFrame *frames.AckFrame, withPacketNum } if packetNumber >= ackRange.FirstPacketNumber { // packet i contained in ACK range + if packetNumber > ackRange.LastPacketNumber { + return fmt.Errorf("BUG: ackhandler would have acked wrong packet 0x%x, while evaluating range 0x%x -> 0x%x", packetNumber, ackRange.FirstPacketNumber, ackRange.LastPacketNumber) + } p := h.ackPacket(el) if p != nil { ackedPackets = append(ackedPackets, congestion.PacketInfo{Number: p.PacketNumber, Length: p.Length}) @@ -262,16 +266,8 @@ func (h *sentPacketHandler) ReceivedAck(ackFrame *frames.AckFrame, withPacketNum return nil } -// ProbablyHasPacketForRetransmission returns if there is a packet queued for retransmission -// There is one case where it gets the answer wrong: -// if a packet has already been queued for retransmission, but a belated ACK is received for this packet, this function will return true, although the packet will not be returend for retransmission by DequeuePacketForRetransmission() -func (h *sentPacketHandler) ProbablyHasPacketForRetransmission() bool { - h.maybeQueuePacketsRTO() - return len(h.retransmissionQueue) > 0 -} - func (h *sentPacketHandler) DequeuePacketForRetransmission() *ackhandlerlegacy.Packet { - if !h.ProbablyHasPacketForRetransmission() { + if len(h.retransmissionQueue) == 0 { return nil } @@ -310,7 +306,7 @@ func (h *sentPacketHandler) CheckForError() error { return nil } -func (h *sentPacketHandler) maybeQueuePacketsRTO() { +func (h *sentPacketHandler) MaybeQueueRTOs() { if time.Now().Before(h.TimeOfFirstRTO()) { return } @@ -329,6 +325,8 @@ func (h *sentPacketHandler) maybeQueuePacketsRTO() { h.congestion.OnRetransmissionTimeout(true) utils.Debugf("\tQueueing packet 0x%x for retransmission (RTO)", packet.PacketNumber) h.queuePacketForRetransmission(el) + // Reset the RTO timer here, since it's not clear that this packet contained any retransmittable frames + h.lastSentPacketTime = time.Now() return } } diff --git a/ackhandler/sent_packet_handler_test.go b/ackhandler/sent_packet_handler_test.go index 33918897e..5c75555a3 100644 --- a/ackhandler/sent_packet_handler_test.go +++ b/ackhandler/sent_packet_handler_test.go @@ -568,7 +568,7 @@ var _ = Describe("SentPacketHandler", func() { Expect(err).ToNot(HaveOccurred()) } Expect(getPacketElement(2)).ToNot(BeNil()) - Expect(handler.ProbablyHasPacketForRetransmission()).To(BeFalse()) + handler.MaybeQueueRTOs() Expect(handler.DequeuePacketForRetransmission()).To(BeNil()) }) @@ -578,7 +578,7 @@ var _ = Describe("SentPacketHandler", func() { Expect(err).ToNot(HaveOccurred()) } Expect(getPacketElement(2)).To(BeNil()) - Expect(handler.ProbablyHasPacketForRetransmission()).To(BeTrue()) + handler.MaybeQueueRTOs() Expect(handler.retransmissionQueue).To(HaveLen(1)) Expect(handler.retransmissionQueue[0].PacketNumber).To(Equal(protocol.PacketNumber(2))) }) @@ -819,7 +819,7 @@ var _ = Describe("SentPacketHandler", func() { err := handler.SentPacket(&ackhandlerlegacy.Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1}) Expect(err).NotTo(HaveOccurred()) handler.lastSentPacketTime = time.Now().Add(-time.Second) - handler.maybeQueuePacketsRTO() + handler.MaybeQueueRTOs() Expect(cong.nCalls).To(Equal(3)) // rttUpdated, bytesInFlight, ackedPackets, lostPackets Expect(cong.argsOnCongestionEvent[0]).To(BeFalse()) @@ -866,7 +866,7 @@ var _ = Describe("SentPacketHandler", func() { It("does nothing if not required", func() { err := handler.SentPacket(&ackhandlerlegacy.Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1}) Expect(err).NotTo(HaveOccurred()) - handler.maybeQueuePacketsRTO() + handler.MaybeQueueRTOs() Expect(handler.retransmissionQueue).To(BeEmpty()) }) @@ -875,25 +875,19 @@ var _ = Describe("SentPacketHandler", func() { err := handler.SentPacket(p) Expect(err).NotTo(HaveOccurred()) handler.lastSentPacketTime = time.Now().Add(-time.Second) - handler.maybeQueuePacketsRTO() + handler.MaybeQueueRTOs() Expect(handler.retransmissionQueue).To(HaveLen(1)) Expect(handler.retransmissionQueue[0].PacketNumber).To(Equal(p.PacketNumber)) + Expect(time.Now().Sub(handler.lastSentPacketTime)).To(BeNumerically("<", time.Second/2)) }) }) - It("works with HasPacketForRetransmission", func() { - p := &ackhandlerlegacy.Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1} - err := handler.SentPacket(p) - Expect(err).NotTo(HaveOccurred()) - handler.lastSentPacketTime = time.Now().Add(-time.Second) - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) - }) - It("works with DequeuePacketForRetransmission", func() { p := &ackhandlerlegacy.Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1} err := handler.SentPacket(p) Expect(err).NotTo(HaveOccurred()) handler.lastSentPacketTime = time.Now().Add(-time.Second) + handler.MaybeQueueRTOs() Expect(handler.DequeuePacketForRetransmission().PacketNumber).To(Equal(p.PacketNumber)) }) }) diff --git a/ackhandlerlegacy/interfaces.go b/ackhandlerlegacy/interfaces.go index 51299b7d4..ceb5d78ca 100644 --- a/ackhandlerlegacy/interfaces.go +++ b/ackhandlerlegacy/interfaces.go @@ -14,7 +14,7 @@ type SentPacketHandler interface { GetStopWaitingFrame() *frames.StopWaitingFrame - ProbablyHasPacketForRetransmission() bool + MaybeQueueRTOs() DequeuePacketForRetransmission() (packet *Packet) BytesInFlight() protocol.ByteCount diff --git a/ackhandlerlegacy/sent_packet_handler.go b/ackhandlerlegacy/sent_packet_handler.go index 42e21d645..33012fbed 100644 --- a/ackhandlerlegacy/sent_packet_handler.go +++ b/ackhandlerlegacy/sent_packet_handler.go @@ -273,17 +273,8 @@ func (h *sentPacketHandler) ReceivedAck(ackFrameOrig *frames.AckFrame, withPacke return nil } -// ProbablyHasPacketForRetransmission returns if there is a packet queued for retransmission -// There is one case where it gets the answer wrong: -// if a packet has already been queued for retransmission, but a belated ACK is received for this packet, this function will return true, although the packet will not be returend for retransmission by DequeuePacketForRetransmission() -func (h *sentPacketHandler) ProbablyHasPacketForRetransmission() bool { - h.maybeQueuePacketsRTO() - - return len(h.retransmissionQueue) > 0 -} - func (h *sentPacketHandler) DequeuePacketForRetransmission() (packet *Packet) { - if !h.ProbablyHasPacketForRetransmission() { + if len(h.retransmissionQueue) == 0 { return nil } @@ -329,7 +320,7 @@ func (h *sentPacketHandler) CheckForError() error { return nil } -func (h *sentPacketHandler) maybeQueuePacketsRTO() { +func (h *sentPacketHandler) MaybeQueueRTOs() { if time.Now().Before(h.TimeOfFirstRTO()) { return } @@ -344,6 +335,8 @@ func (h *sentPacketHandler) maybeQueuePacketsRTO() { h.congestion.OnRetransmissionTimeout(true) utils.Debugf("\tQueueing packet 0x%x for retransmission (RTO)", packet.PacketNumber) h.queuePacketForRetransmission(packet) + // Reset the RTO timer here, since it's not clear that this packet contained any retransmittable frames + h.lastSentPacketTime = time.Now() return } } diff --git a/ackhandlerlegacy/sent_packet_handler_test.go b/ackhandlerlegacy/sent_packet_handler_test.go index a7284bbdd..aa48a1171 100644 --- a/ackhandlerlegacy/sent_packet_handler_test.go +++ b/ackhandlerlegacy/sent_packet_handler_test.go @@ -513,7 +513,7 @@ var _ = Describe("SentPacketHandler", func() { _, err := handler.nackPacket(2) Expect(err).ToNot(HaveOccurred()) } - Expect(handler.ProbablyHasPacketForRetransmission()).To(BeFalse()) + handler.MaybeQueueRTOs() Expect(handler.DequeuePacketForRetransmission()).To(BeNil()) }) @@ -522,7 +522,7 @@ var _ = Describe("SentPacketHandler", func() { _, err := handler.nackPacket(2) Expect(err).ToNot(HaveOccurred()) } - Expect(handler.ProbablyHasPacketForRetransmission()).To(BeTrue()) + handler.MaybeQueueRTOs() Expect(handler.retransmissionQueue).To(HaveLen(1)) Expect(handler.retransmissionQueue[0].PacketNumber).To(Equal(protocol.PacketNumber(2))) }) @@ -594,8 +594,7 @@ var _ = Describe("SentPacketHandler", func() { } // this is the belated ACK handler.ackPacket(2) - // this is the edge case where ProbablyHasPacketForRetransmission() get's it wrong: it says there's probably a packet for retransmission, but actually there isn't - Expect(handler.ProbablyHasPacketForRetransmission()).To(BeTrue()) + handler.MaybeQueueRTOs() Expect(handler.DequeuePacketForRetransmission()).To(BeNil()) }) }) @@ -712,7 +711,7 @@ var _ = Describe("SentPacketHandler", func() { err := handler.SentPacket(&Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1}) Expect(err).NotTo(HaveOccurred()) handler.lastSentPacketTime = time.Now().Add(-time.Second) - handler.maybeQueuePacketsRTO() + handler.MaybeQueueRTOs() Expect(cong.nCalls).To(Equal(3)) // rttUpdated, bytesInFlight, ackedPackets, lostPackets Expect(cong.argsOnCongestionEvent[0]).To(BeFalse()) @@ -757,7 +756,7 @@ var _ = Describe("SentPacketHandler", func() { It("ignores nil packets", func() { handler.packetHistory[1] = nil - handler.maybeQueuePacketsRTO() + handler.MaybeQueueRTOs() Expect(handler.TimeOfFirstRTO().IsZero()).To(BeTrue()) }) }) @@ -766,7 +765,7 @@ var _ = Describe("SentPacketHandler", func() { It("does nothing if not required", func() { err := handler.SentPacket(&Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1}) Expect(err).NotTo(HaveOccurred()) - handler.maybeQueuePacketsRTO() + handler.MaybeQueueRTOs() Expect(handler.retransmissionQueue).To(BeEmpty()) }) @@ -775,9 +774,10 @@ var _ = Describe("SentPacketHandler", func() { err := handler.SentPacket(p) Expect(err).NotTo(HaveOccurred()) handler.lastSentPacketTime = time.Now().Add(-time.Second) - handler.maybeQueuePacketsRTO() + handler.MaybeQueueRTOs() Expect(handler.retransmissionQueue).To(HaveLen(1)) Expect(handler.retransmissionQueue[0]).To(Equal(p)) + Expect(time.Now().Sub(handler.lastSentPacketTime)).To(BeNumerically("<", time.Second/2)) }) It("does not queue retransmittedpackets", func() { @@ -785,30 +785,23 @@ var _ = Describe("SentPacketHandler", func() { err := handler.SentPacket(p) Expect(err).NotTo(HaveOccurred()) handler.lastSentPacketTime = time.Now().Add(-time.Second) - handler.maybeQueuePacketsRTO() + handler.MaybeQueueRTOs() Expect(handler.retransmissionQueue).To(BeEmpty()) }) It("ignores nil packets", func() { handler.packetHistory[1] = nil - handler.maybeQueuePacketsRTO() + handler.MaybeQueueRTOs() Expect(handler.retransmissionQueue).To(BeEmpty()) }) }) - It("works with HasPacketForRetransmission", func() { - p := &Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1} - err := handler.SentPacket(p) - Expect(err).NotTo(HaveOccurred()) - handler.lastSentPacketTime = time.Now().Add(-time.Second) - Expect(handler.DequeuePacketForRetransmission()).ToNot(BeNil()) - }) - It("works with DequeuePacketForRetransmission", func() { p := &Packet{PacketNumber: 1, Frames: []frames.Frame{}, Length: 1} err := handler.SentPacket(p) Expect(err).NotTo(HaveOccurred()) handler.lastSentPacketTime = time.Now().Add(-time.Second) + handler.MaybeQueueRTOs() Expect(handler.DequeuePacketForRetransmission()).To(Equal(p)) }) }) diff --git a/integrationtests/drop_test.go b/integrationtests/drop_test.go index 49558c4e8..da4f74032 100644 --- a/integrationtests/drop_test.go +++ b/integrationtests/drop_test.go @@ -19,7 +19,7 @@ import ( . "github.com/onsi/gomega/gexec" ) -var _ = PDescribe("Drop Proxy", func() { +var _ = Describe("Drop Proxy", func() { BeforeEach(func() { dataMan.GenerateData(dataLen) }) diff --git a/session.go b/session.go index 5692e6a7d..667fac3b5 100644 --- a/session.go +++ b/session.go @@ -440,6 +440,9 @@ func (s *Session) sendPacket() error { return err } + // Do this before checking the congestion, since we might de-congestionize here :) + s.sentPacketHandler.MaybeQueueRTOs() + if !s.sentPacketHandler.CongestionAllowsSending() { return nil } diff --git a/session_test.go b/session_test.go index 4fdacac90..c613f61ed 100644 --- a/session_test.go +++ b/session_test.go @@ -47,7 +47,9 @@ func (m *mockUnpacker) Unpack(publicHeaderBinary []byte, hdr *PublicHeader, data } type mockSentPacketHandler struct { - retransmissionQueue []*ackhandlerlegacy.Packet + retransmissionQueue []*ackhandlerlegacy.Packet + congestionLimited bool + maybeQueueRTOsCalled bool } func (h *mockSentPacketHandler) SentPacket(packet *ackhandlerlegacy.Packet) error { return nil } @@ -59,12 +61,12 @@ func (h *mockSentPacketHandler) GetLeastUnacked() protocol.PacketNumber { return func (h *mockSentPacketHandler) GetStopWaitingFrame() *frames.StopWaitingFrame { panic("not implemented") } -func (h *mockSentPacketHandler) CongestionAllowsSending() bool { return true } +func (h *mockSentPacketHandler) CongestionAllowsSending() bool { return !h.congestionLimited } func (h *mockSentPacketHandler) CheckForError() error { return nil } func (h *mockSentPacketHandler) TimeOfFirstRTO() time.Time { panic("not implemented") } -func (h *mockSentPacketHandler) ProbablyHasPacketForRetransmission() bool { - return len(h.retransmissionQueue) > 0 +func (h *mockSentPacketHandler) MaybeQueueRTOs() { + h.maybeQueueRTOsCalled = true } func (h *mockSentPacketHandler) DequeuePacketForRetransmission() *ackhandlerlegacy.Packet { @@ -570,6 +572,15 @@ var _ = Describe("Session", func() { Expect(conn.written[0]).To(ContainSubstring("foobar")) Expect(conn.written[0]).To(ContainSubstring("loremipsum")) }) + + It("calls MaybeQueueRTOs even if congestion blocked, so that bytesInFlight is updated", func() { + sph := newMockSentPacketHandler() + sph.(*mockSentPacketHandler).congestionLimited = true + session.sentPacketHandler = sph + err := session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(sph.(*mockSentPacketHandler).maybeQueueRTOsCalled).To(BeTrue()) + }) }) Context("scheduling sending", func() { @@ -729,24 +740,33 @@ var _ = Describe("Session", func() { err := session.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{PacketNumber: p, Length: 1}) Expect(err).NotTo(HaveOccurred()) time.Sleep(time.Microsecond) - ack := frames.AckFrameLegacy{LargestObserved: p} - err = session.sentPacketHandler.ReceivedAck(&frames.AckFrame{AckFrameLegacy: &ack}, p) + ack := &frames.AckFrame{} + if version == protocol.Version33 { + ack.AckFrameLegacy = &frames.AckFrameLegacy{LargestObserved: p} + } else { + ack.LargestAcked = p + } + err = session.sentPacketHandler.ReceivedAck(ack, p) Expect(err).NotTo(HaveOccurred()) } + if version == protocol.Version33 { + session.packer.lastPacketNumber = n + } else { + session.packer.packetNumberGenerator.last = n + } // Now, we send a single packet, and expect that it was retransmitted later - go session.run() - Expect(conn.written).To(BeEmpty()) err := session.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{ PacketNumber: n, Length: 1, Frames: []frames.Frame{&frames.StreamFrame{ - Data: bytes.Repeat([]byte{'a'}, 1000), + Data: []byte("foobar"), }}, }) - session.packer.lastPacketNumber = n Expect(err).NotTo(HaveOccurred()) + go session.run() session.scheduleSending() - Eventually(func() bool { return len(conn.written) > 0 }).Should(BeTrue()) + Eventually(func() [][]byte { return conn.written }).ShouldNot(BeEmpty()) + Expect(conn.written[0]).To(ContainSubstring("foobar")) }) Context("counting streams", func() {