From 1535663fc55f5cf428a4c2a303efe209c08e5ee4 Mon Sep 17 00:00:00 2001 From: Lucas Clemente Date: Thu, 28 Apr 2016 22:28:42 +0200 Subject: [PATCH] add bytes-in-flight calculation to SentPacketHandler --- ackhandler/interfaces.go | 2 + ackhandler/packet.go | 1 + ackhandler/sent_packet_handler.go | 15 ++++ ackhandler/sent_packet_handler_test.go | 111 +++++++++++++++++++------ session.go | 1 + 5 files changed, 103 insertions(+), 27 deletions(-) diff --git a/ackhandler/interfaces.go b/ackhandler/interfaces.go index 8167dac25..b3fa7d913 100644 --- a/ackhandler/interfaces.go +++ b/ackhandler/interfaces.go @@ -13,6 +13,8 @@ type SentPacketHandler interface { ReceivedAck(ackFrame *frames.AckFrame) (time.Duration, error) DequeuePacketForRetransmission() (packet *Packet) + + BytesInFlight() uint64 } // ReceivedPacketHandler handles ACKs needed to send for incoming packets diff --git a/ackhandler/packet.go b/ackhandler/packet.go index 3c807dc4a..61db3d2e6 100644 --- a/ackhandler/packet.go +++ b/ackhandler/packet.go @@ -13,6 +13,7 @@ type Packet struct { Frames []frames.Frame EntropyBit bool Entropy EntropyAccumulator + Length uint64 MissingReports uint8 Retransmitted bool // has this Packet ever been retransmitted diff --git a/ackhandler/sent_packet_handler.go b/ackhandler/sent_packet_handler.go index 4336e1bdc..94c944aac 100644 --- a/ackhandler/sent_packet_handler.go +++ b/ackhandler/sent_packet_handler.go @@ -29,9 +29,12 @@ type sentPacketHandler struct { LargestObserved protocol.PacketNumber LargestObservedEntropy EntropyAccumulator + // TODO: Move into separate class as in chromium packetHistory map[protocol.PacketNumber]*Packet retransmissionQueue []*Packet // ToDo: use better data structure + + bytesInFlight uint64 } // NewSentPacketHandler creates a new sentPacketHandler @@ -42,6 +45,9 @@ func NewSentPacketHandler() SentPacketHandler { } func (h *sentPacketHandler) ackPacket(packetNumber protocol.PacketNumber) { + if packet, ok := h.packetHistory[packetNumber]; ok && !packet.Retransmitted { + h.bytesInFlight -= packet.Length + } delete(h.packetHistory, packetNumber) } @@ -66,6 +72,7 @@ func (h *sentPacketHandler) nackPacket(packetNumber protocol.PacketNumber) error } func (h *sentPacketHandler) queuePacketForRetransmission(packet *Packet) { + h.bytesInFlight -= packet.Length h.retransmissionQueue = append(h.retransmissionQueue, packet) packet.Retransmitted = true } @@ -79,6 +86,10 @@ func (h *sentPacketHandler) SentPacket(packet *Packet) error { return errors.New("Packet number must be increased by exactly 1") } packet.sendTime = time.Now() + if packet.Length == 0 { + panic("SentPacketHandler: packet cannot be empty") + } + h.bytesInFlight += packet.Length h.lastSentPacketEntropy.Add(packet.PacketNumber, packet.EntropyBit) packet.Entropy = h.lastSentPacketEntropy @@ -177,3 +188,7 @@ func (h *sentPacketHandler) DequeuePacketForRetransmission() (packet *Packet) { h.retransmissionQueue = h.retransmissionQueue[1:] return packet } + +func (h *sentPacketHandler) BytesInFlight() uint64 { + return h.bytesInFlight +} diff --git a/ackhandler/sent_packet_handler_test.go b/ackhandler/sent_packet_handler_test.go index a3570542d..fcec3160b 100644 --- a/ackhandler/sent_packet_handler_test.go +++ b/ackhandler/sent_packet_handler_test.go @@ -23,8 +23,8 @@ var _ = Describe("SentPacketHandler", func() { Context("SentPacket", func() { It("accepts two consecutive packets", func() { entropy := EntropyAccumulator(0) - packet1 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true} - packet2 := Packet{PacketNumber: 2, Frames: []frames.Frame{&streamFrame}, EntropyBit: true} + packet1 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 1} + packet2 := Packet{PacketNumber: 2, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 2} err := handler.SentPacket(&packet1) Expect(err).ToNot(HaveOccurred()) err = handler.SentPacket(&packet2) @@ -38,22 +38,24 @@ var _ = Describe("SentPacketHandler", func() { entropy.Add(packet2.PacketNumber, packet2.EntropyBit) Expect(handler.packetHistory[2].PacketNumber).To(Equal(protocol.PacketNumber(2))) Expect(handler.packetHistory[2].Entropy).To(Equal(entropy)) + Expect(handler.BytesInFlight()).To(Equal(uint64(3))) }) It("rejects packets with the same packet number", func() { - packet1 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true} - packet2 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: false} + packet1 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 1} + packet2 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 2} err := handler.SentPacket(&packet1) Expect(err).ToNot(HaveOccurred()) err = handler.SentPacket(&packet2) Expect(err).To(HaveOccurred()) Expect(handler.lastSentPacketNumber).To(Equal(protocol.PacketNumber(1))) Expect(handler.packetHistory).To(HaveKey(protocol.PacketNumber(1))) + Expect(handler.BytesInFlight()).To(Equal(uint64(1))) }) It("rejects non-consecutive packets", func() { - packet1 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true} - packet2 := Packet{PacketNumber: 3, Frames: []frames.Frame{&streamFrame}, EntropyBit: false} + packet1 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 1} + packet2 := Packet{PacketNumber: 3, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 2} err := handler.SentPacket(&packet1) Expect(err).ToNot(HaveOccurred()) err = handler.SentPacket(&packet2) @@ -62,13 +64,15 @@ var _ = Describe("SentPacketHandler", func() { Expect(handler.packetHistory).To(HaveKey(protocol.PacketNumber(1))) Expect(handler.packetHistory).ToNot(HaveKey(protocol.PacketNumber(2))) Expect(handler.packetHistory).ToNot(HaveKey(protocol.PacketNumber(2))) + Expect(handler.BytesInFlight()).To(Equal(uint64(1))) }) It("correctly calculates the entropy, even if the last packet has already been ACKed", func() { - packet1 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true} - packet2 := Packet{PacketNumber: 2, Frames: []frames.Frame{&streamFrame}, EntropyBit: true} + packet1 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 1} + packet2 := Packet{PacketNumber: 2, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 2} err := handler.SentPacket(&packet1) Expect(err).ToNot(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(1))) entropy := EntropyAccumulator(0) entropy.Add(packet1.PacketNumber, packet1.EntropyBit) ack := frames.AckFrame{ @@ -77,15 +81,17 @@ var _ = Describe("SentPacketHandler", func() { } _, err = handler.ReceivedAck(&ack) Expect(err).ToNot(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(0))) err = handler.SentPacket(&packet2) Expect(err).ToNot(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(2))) Expect(handler.lastSentPacketNumber).To(Equal(protocol.PacketNumber(2))) entropy.Add(packet2.PacketNumber, packet2.EntropyBit) Expect(handler.packetHistory[2].Entropy).To(Equal(entropy)) }) It("stores the sent time", func() { - packet := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true} + packet := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 1} err := handler.SentPacket(&packet) Expect(err).ToNot(HaveOccurred()) Expect(handler.packetHistory[1].sendTime.Unix()).To(BeNumerically("~", time.Now().Unix(), 1)) @@ -99,16 +105,17 @@ var _ = Describe("SentPacketHandler", func() { BeforeEach(func() { entropy = EntropyAccumulator(0) packets = []*Packet{ - &Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true}, - &Packet{PacketNumber: 2, Frames: []frames.Frame{&streamFrame}, EntropyBit: true}, - &Packet{PacketNumber: 3, Frames: []frames.Frame{&streamFrame}, EntropyBit: true}, - &Packet{PacketNumber: 4, Frames: []frames.Frame{&streamFrame}, EntropyBit: true}, - &Packet{PacketNumber: 5, Frames: []frames.Frame{&streamFrame}, EntropyBit: true}, - &Packet{PacketNumber: 6, Frames: []frames.Frame{&streamFrame}, EntropyBit: true}, + &Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 1}, + &Packet{PacketNumber: 2, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 1}, + &Packet{PacketNumber: 3, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 1}, + &Packet{PacketNumber: 4, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 1}, + &Packet{PacketNumber: 5, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 1}, + &Packet{PacketNumber: 6, Frames: []frames.Frame{&streamFrame}, EntropyBit: true, Length: 1}, } for _, packet := range packets { handler.SentPacket(packet) } + Expect(handler.BytesInFlight()).To(Equal(uint64(6))) }) It("no NACK ranges", func() { @@ -186,6 +193,7 @@ var _ = Describe("SentPacketHandler", func() { } _, err := handler.ReceivedAck(&ack) Expect(err).To(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(6))) Expect(err).To(Equal(ErrEntropy)) }) @@ -201,6 +209,7 @@ var _ = Describe("SentPacketHandler", func() { } _, err := handler.ReceivedAck(&ack) Expect(err).ToNot(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(2))) Expect(handler.LargestObserved).To(Equal(protocol.PacketNumber(largestObserved))) Expect(handler.highestInOrderAckedPacketNumber).To(Equal(protocol.PacketNumber(largestObserved))) Expect(handler.packetHistory).ToNot(HaveKey(protocol.PacketNumber(largestObserved - 1))) @@ -226,6 +235,7 @@ var _ = Describe("SentPacketHandler", func() { } _, err := handler.ReceivedAck(&ack) Expect(err).ToNot(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(2))) Expect(handler.LargestObserved).To(Equal(protocol.PacketNumber(largestObserved))) Expect(handler.highestInOrderAckedPacketNumber).To(Equal(protocol.PacketNumber(2))) Expect(handler.packetHistory).ToNot(HaveKey(protocol.PacketNumber(2))) @@ -241,16 +251,17 @@ var _ = Describe("SentPacketHandler", func() { BeforeEach(func() { packets = []*Packet{ - &Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, - &Packet{PacketNumber: 2, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, - &Packet{PacketNumber: 3, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, - &Packet{PacketNumber: 4, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, - &Packet{PacketNumber: 5, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, - &Packet{PacketNumber: 6, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, + &Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, + &Packet{PacketNumber: 2, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, + &Packet{PacketNumber: 3, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, + &Packet{PacketNumber: 4, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, + &Packet{PacketNumber: 5, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, + &Packet{PacketNumber: 6, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, } for _, packet := range packets { handler.SentPacket(packet) } + Expect(handler.BytesInFlight()).To(Equal(uint64(6))) }) Context("ACK validation", func() { @@ -261,9 +272,11 @@ var _ = Describe("SentPacketHandler", func() { } _, err := handler.ReceivedAck(&ack) Expect(err).ToNot(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(3))) _, err = handler.ReceivedAck(&ack) Expect(err).To(HaveOccurred()) Expect(err).To(Equal(ErrDuplicateOrOutOfOrderAck)) + Expect(handler.BytesInFlight()).To(Equal(uint64(3))) }) It("rejects out of order ACKs", func() { @@ -273,11 +286,13 @@ var _ = Describe("SentPacketHandler", func() { } _, err := handler.ReceivedAck(&ack) Expect(err).ToNot(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(3))) ack.LargestObserved-- _, err = handler.ReceivedAck(&ack) Expect(err).To(HaveOccurred()) Expect(err).To(Equal(ErrDuplicateOrOutOfOrderAck)) Expect(handler.LargestObserved).To(Equal(protocol.PacketNumber(largestObserved))) + Expect(handler.BytesInFlight()).To(Equal(uint64(3))) }) It("rejects ACKs with a too high LargestObserved packet number", func() { @@ -288,6 +303,7 @@ var _ = Describe("SentPacketHandler", func() { Expect(err).To(HaveOccurred()) Expect(err).To(Equal(errAckForUnsentPacket)) Expect(handler.highestInOrderAckedPacketNumber).To(Equal(protocol.PacketNumber(0))) + Expect(handler.BytesInFlight()).To(Equal(uint64(6))) }) }) @@ -317,16 +333,17 @@ var _ = Describe("SentPacketHandler", func() { retransmissionThreshold = 1 packets = []*Packet{ - &Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, - &Packet{PacketNumber: 2, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, - &Packet{PacketNumber: 3, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, - &Packet{PacketNumber: 4, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, - &Packet{PacketNumber: 5, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, - &Packet{PacketNumber: 6, Frames: []frames.Frame{&streamFrame}, EntropyBit: false}, + &Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, + &Packet{PacketNumber: 2, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, + &Packet{PacketNumber: 3, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, + &Packet{PacketNumber: 4, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, + &Packet{PacketNumber: 5, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, + &Packet{PacketNumber: 6, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1}, } for _, packet := range packets { handler.SentPacket(packet) } + Expect(handler.BytesInFlight()).To(Equal(uint64(6))) }) It("queues a packet for retransmission", func() { @@ -379,4 +396,44 @@ var _ = Describe("SentPacketHandler", func() { Expect(handler.highestInOrderAckedPacketNumber).To(Equal(protocol.PacketNumber(2))) }) }) + + Context("calculating bytes in flight", func() { + It("works in a typical retransmission scenanrop", func() { + packet1 := Packet{PacketNumber: 1, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 1} + packet2 := Packet{PacketNumber: 2, Frames: []frames.Frame{&streamFrame}, EntropyBit: false, Length: 2} + err := handler.SentPacket(&packet1) + Expect(err).NotTo(HaveOccurred()) + err = handler.SentPacket(&packet2) + Expect(err).NotTo(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(3))) + + // ACK 2, NACK 1 + ack := frames.AckFrame{ + LargestObserved: 2, + NackRanges: []frames.NackRange{frames.NackRange{FirstPacketNumber: 1, LastPacketNumber: 1}}, + } + _, err = handler.ReceivedAck(&ack) + Expect(err).NotTo(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(1))) + + // Simulate 2 more NACKs + handler.nackPacket(1) + handler.nackPacket(1) + Expect(handler.BytesInFlight()).To(Equal(uint64(0))) + + // Retransmission + packet3 := Packet{PacketNumber: 3, EntropyBit: false, Length: 1} + err = handler.SentPacket(&packet3) + Expect(err).NotTo(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(1))) + + // ACK + ack = frames.AckFrame{ + LargestObserved: 3, + } + _, err = handler.ReceivedAck(&ack) + Expect(err).NotTo(HaveOccurred()) + Expect(handler.BytesInFlight()).To(Equal(uint64(0))) + }) + }) }) diff --git a/session.go b/session.go index 2c0cb050c..8a527336a 100644 --- a/session.go +++ b/session.go @@ -283,6 +283,7 @@ func (s *Session) sendPacket() error { PacketNumber: packet.number, Frames: packet.frames, EntropyBit: packet.entropyBit, + Length: uint64(len(packet.raw)), }) if err != nil { return err