From 6eeccfd12330b83dbf4d7aa6adf62e4612e3074f Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 27 Apr 2016 18:56:21 +0700 Subject: [PATCH] retransmit lost packets, send StopWaiting frames --- ackhandler/packet.go | 27 +++++++++++ ackhandler/packet_test.go | 97 +++++++++++++++++++++++++++++++++++++++ session.go | 19 ++++++++ 3 files changed, 143 insertions(+) create mode 100644 ackhandler/packet_test.go diff --git a/ackhandler/packet.go b/ackhandler/packet.go index 008e6c2d5..4f783f620 100644 --- a/ackhandler/packet.go +++ b/ackhandler/packet.go @@ -15,3 +15,30 @@ type Packet struct { MissingReports uint8 Retransmitted bool // has this Packet ever been retransmitted } + +func (p *Packet) GetStreamFramesForRetransmission() []*frames.StreamFrame { + streamFrames := make([]*frames.StreamFrame, 0) + for _, frame := range p.Frames { + if streamFrame, isStreamFrame := frame.(*frames.StreamFrame); isStreamFrame { + streamFrames = append(streamFrames, streamFrame) + } + } + return streamFrames +} + +func (p *Packet) GetControlFramesForRetransmission() []frames.Frame { + controlFrames := make([]frames.Frame, 0) + for _, frame := range p.Frames { + // omit ACKs + if _, isStreamFrame := frame.(*frames.StreamFrame); isStreamFrame { + continue + } + + _, isAck := frame.(*frames.AckFrame) + _, isStopWaiting := frame.(*frames.StopWaitingFrame) + if !isAck && !isStopWaiting { + controlFrames = append(controlFrames, frame) + } + } + return controlFrames +} diff --git a/ackhandler/packet_test.go b/ackhandler/packet_test.go new file mode 100644 index 000000000..dee6cc813 --- /dev/null +++ b/ackhandler/packet_test.go @@ -0,0 +1,97 @@ +package ackhandler + +import ( + "github.com/lucas-clemente/quic-go/frames" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Packet", func() { + Context("getFramesForRetransmission", func() { + var packet Packet + var streamFrame1, streamFrame2 *frames.StreamFrame + var ackFrame1, ackFrame2 *frames.AckFrame + var stopWaitingFrame *frames.StopWaitingFrame + var rstStreamFrame *frames.RstStreamFrame + var windowUpdateFrame *frames.WindowUpdateFrame + + BeforeEach(func() { + streamFrame1 = &frames.StreamFrame{ + StreamID: 5, + Data: []byte{0x13, 0x37}, + } + streamFrame2 = &frames.StreamFrame{ + StreamID: 6, + Data: []byte{0xDE, 0xCA, 0xFB, 0xAD}, + } + ackFrame1 = &frames.AckFrame{ + LargestObserved: 13, + Entropy: 5, + } + ackFrame2 = &frames.AckFrame{ + LargestObserved: 333, + Entropy: 17, + } + rstStreamFrame = &frames.RstStreamFrame{ + StreamID: 555, + ErrorCode: 1337, + } + stopWaitingFrame = &frames.StopWaitingFrame{ + LeastUnacked: 7331, + Entropy: 10, + } + windowUpdateFrame = &frames.WindowUpdateFrame{ + StreamID: 999, + } + packet = Packet{ + PacketNumber: 1337, + Frames: []frames.Frame{windowUpdateFrame, streamFrame1, ackFrame1, streamFrame2, rstStreamFrame, ackFrame2, stopWaitingFrame}, + } + }) + + It("gets all StreamFrames", func() { + streamFrames := packet.GetStreamFramesForRetransmission() + Expect(len(streamFrames)).To(Equal(2)) + Expect(streamFrames).To(ContainElement(streamFrame1)) + Expect(streamFrames).To(ContainElement(streamFrame2)) + }) + + It("gets all control frames", func() { + controlFrames := packet.GetControlFramesForRetransmission() + Expect(len(controlFrames)).To(Equal(2)) + Expect(controlFrames).To(ContainElement(rstStreamFrame)) + Expect(controlFrames).To(ContainElement(windowUpdateFrame)) + }) + + It("does not return any ACK frames", func() { + controlFrames := packet.GetControlFramesForRetransmission() + Expect(controlFrames).ToNot(ContainElement(ackFrame1)) + Expect(controlFrames).ToNot(ContainElement(ackFrame2)) + }) + + It("does not return any ACK frames", func() { + controlFrames := packet.GetControlFramesForRetransmission() + Expect(controlFrames).ToNot(ContainElement(stopWaitingFrame)) + }) + + It("returns an empty slice of StreamFrames if no StreamFrames are queued", func() { + // overwrite the globally defined packet here + packet := Packet{ + PacketNumber: 1337, + Frames: []frames.Frame{ackFrame1, rstStreamFrame}, + } + streamFrames := packet.GetStreamFramesForRetransmission() + Expect(len(streamFrames)).To(Equal(0)) + }) + + It("returns an empty slice of control frames if no applicable control frames are queued", func() { + // overwrite the globally defined packet here + packet := Packet{ + PacketNumber: 1337, + Frames: []frames.Frame{streamFrame1, ackFrame1, stopWaitingFrame}, + } + controlFrames := packet.GetControlFramesForRetransmission() + Expect(len(controlFrames)).To(Equal(0)) + }) + }) +}) diff --git a/session.go b/session.go index ea30aa3d2..1d74444ba 100644 --- a/session.go +++ b/session.go @@ -247,6 +247,25 @@ func (s *Session) closeStreamsWithError(err error) { func (s *Session) sendPacket() error { var controlFrames []frames.Frame + + // check for retransmissions first + // TODO: handle multiple packets retransmissions + retransmitPacket := s.sentPacketHandler.DequeuePacketForRetransmission() + if retransmitPacket != nil { + swf := &frames.StopWaitingFrame{ + LeastUnacked: retransmitPacket.PacketNumber + 1, + Entropy: byte(retransmitPacket.Entropy), + } + controlFrames = append(controlFrames, swf) + + // resend the frames that were in the packet + controlFrames = append(controlFrames, retransmitPacket.GetControlFramesForRetransmission()...) + for _, streamFrame := range retransmitPacket.GetStreamFramesForRetransmission() { + // TODO: add these stream frames with a higher priority + s.packer.AddStreamFrame(*streamFrame) + } + } + ack := s.receivedPacketHandler.DequeueAckFrame() if ack != nil { controlFrames = append(controlFrames, ack)