From 936a29ff357adb4caf70a2f574b42d0ea3c4c785 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 25 Feb 2017 16:40:53 +0700 Subject: [PATCH] retransmit packets with the encryption level they were sent with --- session.go | 77 ++++++---- session_test.go | 365 +++++++++++++++++++++++++++--------------------- 2 files changed, 256 insertions(+), 186 deletions(-) diff --git a/session.go b/session.go index c3f9efd6..c83a87f4 100644 --- a/session.go +++ b/session.go @@ -582,21 +582,39 @@ func (s *session) sendPacket() error { } utils.Debugf("\tDequeueing retransmission for packet 0x%x", retransmitPacket.PacketNumber) - // resend the frames that were in the packet - for _, frame := range retransmitPacket.GetFramesForRetransmission() { - switch frame.(type) { - case *frames.StreamFrame: - s.streamFramer.AddFrameForRetransmission(frame.(*frames.StreamFrame)) - case *frames.WindowUpdateFrame: - // only retransmit WindowUpdates if the stream is not yet closed and the we haven't sent another WindowUpdate with a higher ByteOffset for the stream - var currentOffset protocol.ByteCount - f := frame.(*frames.WindowUpdateFrame) - currentOffset, err = s.flowControlManager.GetReceiveWindow(f.StreamID) - if err == nil && f.ByteOffset >= currentOffset { + if retransmitPacket.EncryptionLevel != protocol.EncryptionForwardSecure { + utils.Debugf("\tDequeueing handshake retransmission for packet 0x%x", retransmitPacket.PacketNumber) + stopWaitingFrame := s.sentPacketHandler.GetStopWaitingFrame(true) + var packet *packedPacket + packet, err = s.packer.RetransmitNonForwardSecurePacket(stopWaitingFrame, retransmitPacket) + if err != nil { + return err + } + if packet == nil { + continue + } + err = s.sendPackedPacket(packet) + if err != nil { + return err + } + continue + } else { + // resend the frames that were in the packet + for _, frame := range retransmitPacket.GetFramesForRetransmission() { + switch frame.(type) { + case *frames.StreamFrame: + s.streamFramer.AddFrameForRetransmission(frame.(*frames.StreamFrame)) + case *frames.WindowUpdateFrame: + // only retransmit WindowUpdates if the stream is not yet closed and the we haven't sent another WindowUpdate with a higher ByteOffset for the stream + var currentOffset protocol.ByteCount + f := frame.(*frames.WindowUpdateFrame) + currentOffset, err = s.flowControlManager.GetReceiveWindow(f.StreamID) + if err == nil && f.ByteOffset >= currentOffset { + controlFrames = append(controlFrames, frame) + } + default: controlFrames = append(controlFrames, frame) } - default: - controlFrames = append(controlFrames, frame) } } } @@ -622,20 +640,7 @@ func (s *session) sendPacket() error { s.packer.QueueControlFrameForNextPacket(f) } - err = s.sentPacketHandler.SentPacket(&ackhandler.Packet{ - PacketNumber: packet.number, - Frames: packet.frames, - Length: protocol.ByteCount(len(packet.raw)), - EncryptionLevel: packet.encryptionLevel, - }) - if err != nil { - return err - } - - s.logPacket(packet) - - err = s.conn.Write(packet.raw) - putPacketBuffer(packet.raw) + err = s.sendPackedPacket(packet) if err != nil { return err } @@ -643,6 +648,24 @@ func (s *session) sendPacket() error { } } +func (s *session) sendPackedPacket(packet *packedPacket) error { + err := s.sentPacketHandler.SentPacket(&ackhandler.Packet{ + PacketNumber: packet.number, + Frames: packet.frames, + Length: protocol.ByteCount(len(packet.raw)), + EncryptionLevel: packet.encryptionLevel, + }) + if err != nil { + return err + } + + s.logPacket(packet) + + err = s.conn.Write(packet.raw) + putPacketBuffer(packet.raw) + return err +} + func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error { packet, err := s.packer.PackConnectionClose(&frames.ConnectionCloseFrame{ErrorCode: quicErr.ErrorCode, ReasonPhrase: quicErr.ErrorMessage}, s.sentPacketHandler.GetLeastUnacked()) if err != nil { diff --git a/session_test.go b/session_test.go index 0075a3d4..674714c6 100644 --- a/session_test.go +++ b/session_test.go @@ -825,189 +825,236 @@ var _ = Describe("Session", func() { }) Context("retransmissions", func() { + var sph *mockSentPacketHandler BeforeEach(func() { + // a StopWaitingFrame is added, so make sure the packet number of the new package is higher than the packet number of the retransmitted packet + sess.packer.packetNumberGenerator.next = 0x1337 + 10 + sph = newMockSentPacketHandler().(*mockSentPacketHandler) + sess.sentPacketHandler = sph sess.packer.cryptoSetup = &mockCryptoSetup{encLevelSeal: protocol.EncryptionForwardSecure} }) - It("sends a StreamFrame from a packet queued for retransmission", func() { - // a StopWaitingFrame is added, so make sure the packet number of the new package is higher than the packet number of the retransmitted packet - sess.packer.packetNumberGenerator.next = 0x1337 + 9 + Context("for handshake packets", func() { + It("retransmits an unencrypted packet", func() { + sf := &frames.StreamFrame{StreamID: 1, Data: []byte("foobar")} + sph.retransmissionQueue = []*ackhandler.Packet{{ + Frames: []frames.Frame{sf}, + EncryptionLevel: protocol.EncryptionUnencrypted, + }} + err := sess.sendPacket() + Expect(err).ToNot(HaveOccurred()) + Expect(mconn.written).To(HaveLen(1)) + sentPackets := sph.sentPackets + Expect(sentPackets).To(HaveLen(1)) + Expect(sentPackets[0].EncryptionLevel).To(Equal(protocol.EncryptionUnencrypted)) + Expect(sentPackets[0].Frames).To(HaveLen(2)) + Expect(sentPackets[0].Frames[1]).To(Equal(sf)) + swf := sentPackets[0].Frames[0].(*frames.StopWaitingFrame) + Expect(swf.LeastUnacked).To(Equal(protocol.PacketNumber(0x1337))) + }) - f := frames.StreamFrame{ - StreamID: 0x5, - Data: []byte("foobar1234567"), - } - p := ackhandler.Packet{ - PacketNumber: 0x1337, - Frames: []frames.Frame{&f}, - } - sph := newMockSentPacketHandler() - sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{&p} - sess.sentPacketHandler = sph + It("doesn't retransmit non-retransmittable packets", func() { + sph.retransmissionQueue = []*ackhandler.Packet{{ + Frames: []frames.Frame{ + &frames.AckFrame{}, + &frames.StopWaitingFrame{}, + }, + EncryptionLevel: protocol.EncryptionUnencrypted, + }} + err := sess.sendPacket() + Expect(err).ToNot(HaveOccurred()) + Expect(mconn.written).To(BeEmpty()) + }) - err := sess.sendPacket() - Expect(err).NotTo(HaveOccurred()) - Expect(mconn.written).To(HaveLen(1)) - Expect(sph.(*mockSentPacketHandler).requestedStopWaiting).To(BeTrue()) - Expect(mconn.written[0]).To(ContainSubstring("foobar1234567")) + It("retransmit a packet encrypted with the initial encryption", func() { + sf := &frames.StreamFrame{StreamID: 1, Data: []byte("foobar")} + sph.retransmissionQueue = []*ackhandler.Packet{{ + Frames: []frames.Frame{sf}, + EncryptionLevel: protocol.EncryptionSecure, + }} + err := sess.sendPacket() + Expect(err).ToNot(HaveOccurred()) + Expect(mconn.written).To(HaveLen(1)) + sentPackets := sph.sentPackets + Expect(sentPackets).To(HaveLen(1)) + Expect(sentPackets[0].EncryptionLevel).To(Equal(protocol.EncryptionSecure)) + Expect(sentPackets[0].Frames).To(HaveLen(2)) + Expect(sentPackets[0].Frames).To(ContainElement(sf)) + }) }) - It("sends a StreamFrame from a packet queued for retransmission", func() { - // a StopWaitingFrame is added, so make sure the packet number of the new package is higher than the packet number of the retransmitted packet - sess.packer.packetNumberGenerator.next = 0x1337 + 9 + Context("for packets after the handshake", func() { + BeforeEach(func() { + sess.packer.SetForwardSecure() + }) - f1 := frames.StreamFrame{ - StreamID: 0x5, - Data: []byte("foobar"), - } - f2 := frames.StreamFrame{ - StreamID: 0x7, - Data: []byte("loremipsum"), - } - p1 := ackhandler.Packet{ - PacketNumber: 0x1337, - Frames: []frames.Frame{&f1}, - } - p2 := ackhandler.Packet{ - PacketNumber: 0x1338, - Frames: []frames.Frame{&f2}, - } - sph := newMockSentPacketHandler() - sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{&p1, &p2} - sess.sentPacketHandler = sph + It("sends a StreamFrame from a packet queued for retransmission", func() { + f := frames.StreamFrame{ + StreamID: 0x5, + Data: []byte("foobar1234567"), + } + p := ackhandler.Packet{ + PacketNumber: 0x1337, + Frames: []frames.Frame{&f}, + EncryptionLevel: protocol.EncryptionForwardSecure, + } + sph.retransmissionQueue = []*ackhandler.Packet{&p} - err := sess.sendPacket() - Expect(err).NotTo(HaveOccurred()) - Expect(mconn.written).To(HaveLen(1)) - Expect(mconn.written[0]).To(ContainSubstring("foobar")) - Expect(mconn.written[0]).To(ContainSubstring("loremipsum")) - }) + err := sess.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(mconn.written).To(HaveLen(1)) + Expect(sph.requestedStopWaiting).To(BeTrue()) + Expect(mconn.written[0]).To(ContainSubstring("foobar1234567")) + }) - It("always attaches a StopWaiting to a packet that contains a retransmission", func() { - // make sure the packet number of the new package is higher than the packet number of the retransmitted packet - sess.packer.packetNumberGenerator.next = 0x1337 + 9 + It("sends a StreamFrame from a packet queued for retransmission", func() { + f1 := frames.StreamFrame{ + StreamID: 0x5, + Data: []byte("foobar"), + } + f2 := frames.StreamFrame{ + StreamID: 0x7, + Data: []byte("loremipsum"), + } + p1 := ackhandler.Packet{ + PacketNumber: 0x1337, + Frames: []frames.Frame{&f1}, + EncryptionLevel: protocol.EncryptionForwardSecure, + } + p2 := ackhandler.Packet{ + PacketNumber: 0x1338, + Frames: []frames.Frame{&f2}, + EncryptionLevel: protocol.EncryptionForwardSecure, + } + sph.retransmissionQueue = []*ackhandler.Packet{&p1, &p2} - f := &frames.StreamFrame{ - StreamID: 0x5, - Data: bytes.Repeat([]byte{'f'}, int(1.5*float32(protocol.MaxPacketSize))), - } - sess.streamFramer.AddFrameForRetransmission(f) + err := sess.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(mconn.written).To(HaveLen(1)) + Expect(mconn.written[0]).To(ContainSubstring("foobar")) + Expect(mconn.written[0]).To(ContainSubstring("loremipsum")) + }) - sph := newMockSentPacketHandler() - sess.sentPacketHandler = sph + It("always attaches a StopWaiting to a packet that contains a retransmission", func() { + f := &frames.StreamFrame{ + StreamID: 0x5, + Data: bytes.Repeat([]byte{'f'}, int(1.5*float32(protocol.MaxPacketSize))), + } + sess.streamFramer.AddFrameForRetransmission(f) - err := sess.sendPacket() - Expect(err).NotTo(HaveOccurred()) - Expect(mconn.written).To(HaveLen(2)) - sentPackets := sph.(*mockSentPacketHandler).sentPackets - Expect(sentPackets).To(HaveLen(2)) - _, ok := sentPackets[0].Frames[0].(*frames.StopWaitingFrame) - Expect(ok).To(BeTrue()) - _, ok = sentPackets[1].Frames[0].(*frames.StopWaitingFrame) - Expect(ok).To(BeTrue()) - }) + err := sess.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(mconn.written).To(HaveLen(2)) + sentPackets := sph.sentPackets + Expect(sentPackets).To(HaveLen(2)) + _, ok := sentPackets[0].Frames[0].(*frames.StopWaitingFrame) + Expect(ok).To(BeTrue()) + _, ok = sentPackets[1].Frames[0].(*frames.StopWaitingFrame) + Expect(ok).To(BeTrue()) + }) - It("calls MaybeQueueRTOs even if congestion blocked, so that bytesInFlight is updated", func() { - sph := newMockSentPacketHandler() - sph.(*mockSentPacketHandler).congestionLimited = true - sess.sentPacketHandler = sph - err := sess.sendPacket() - Expect(err).NotTo(HaveOccurred()) - Expect(sph.(*mockSentPacketHandler).maybeQueueRTOsCalled).To(BeTrue()) - }) + It("calls MaybeQueueRTOs even if congestion blocked, so that bytesInFlight is updated", func() { + sph.congestionLimited = true + sess.sentPacketHandler = sph + err := sess.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(sph.maybeQueueRTOsCalled).To(BeTrue()) + }) - It("retransmits a WindowUpdates if it hasn't already sent a WindowUpdate with a higher ByteOffset", func() { - _, err := sess.GetOrOpenStream(5) - Expect(err).ToNot(HaveOccurred()) - fc := newMockFlowControlHandler() - fc.receiveWindow = 0x1000 - sess.flowControlManager = fc - sph := newMockSentPacketHandler() - sess.sentPacketHandler = sph - wuf := &frames.WindowUpdateFrame{ - StreamID: 5, - ByteOffset: 0x1000, - } - sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{ - Frames: []frames.Frame{wuf}, - }} - err = sess.sendPacket() - Expect(err).ToNot(HaveOccurred()) - sentPackets := sph.(*mockSentPacketHandler).sentPackets - Expect(sentPackets).To(HaveLen(1)) - Expect(sentPackets[0].Frames).To(ContainElement(wuf)) - }) - - It("doesn't retransmit WindowUpdates if it already sent a WindowUpdate with a higher ByteOffset", func() { - _, err := sess.GetOrOpenStream(5) - Expect(err).ToNot(HaveOccurred()) - fc := newMockFlowControlHandler() - fc.receiveWindow = 0x2000 - sess.flowControlManager = fc - sph := newMockSentPacketHandler() - sess.sentPacketHandler = sph - sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{ - Frames: []frames.Frame{&frames.WindowUpdateFrame{ + It("retransmits a WindowUpdates if it hasn't already sent a WindowUpdate with a higher ByteOffset", func() { + _, err := sess.GetOrOpenStream(5) + Expect(err).ToNot(HaveOccurred()) + fc := newMockFlowControlHandler() + fc.receiveWindow = 0x1000 + sess.flowControlManager = fc + wuf := &frames.WindowUpdateFrame{ StreamID: 5, ByteOffset: 0x1000, - }}, - }} - err = sess.sendPacket() - Expect(err).ToNot(HaveOccurred()) - Expect(sph.(*mockSentPacketHandler).sentPackets).To(BeEmpty()) - }) + } + sph.retransmissionQueue = []*ackhandler.Packet{{ + Frames: []frames.Frame{wuf}, + EncryptionLevel: protocol.EncryptionForwardSecure, + }} + err = sess.sendPacket() + Expect(err).ToNot(HaveOccurred()) + Expect(sph.sentPackets).To(HaveLen(1)) + Expect(sph.sentPackets[0].Frames).To(ContainElement(wuf)) + }) - It("doesn't retransmit WindowUpdates for closed streams", func() { - str, err := sess.GetOrOpenStream(5) - Expect(err).ToNot(HaveOccurred()) - // close the stream - str.(*stream).sentFin() - str.Close() - str.(*stream).RegisterRemoteError(nil) - sess.garbageCollectStreams() - _, err = sess.flowControlManager.SendWindowSize(5) - Expect(err).To(MatchError("Error accessing the flowController map.")) - sph := newMockSentPacketHandler() - sess.sentPacketHandler = sph - sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{ - Frames: []frames.Frame{&frames.WindowUpdateFrame{ - StreamID: 5, - ByteOffset: 0x1337, - }}, - }} - err = sess.sendPacket() - Expect(err).ToNot(HaveOccurred()) - sentPackets := sph.(*mockSentPacketHandler).sentPackets - Expect(sentPackets).To(BeEmpty()) - }) + It("doesn't retransmit WindowUpdates if it already sent a WindowUpdate with a higher ByteOffset", func() { + _, err := sess.GetOrOpenStream(5) + Expect(err).ToNot(HaveOccurred()) + fc := newMockFlowControlHandler() + fc.receiveWindow = 0x2000 + sess.flowControlManager = fc + sph.retransmissionQueue = []*ackhandler.Packet{{ + Frames: []frames.Frame{&frames.WindowUpdateFrame{ + StreamID: 5, + ByteOffset: 0x1000, + }}, + EncryptionLevel: protocol.EncryptionForwardSecure, + }} + err = sess.sendPacket() + Expect(err).ToNot(HaveOccurred()) + Expect(sph.sentPackets).To(BeEmpty()) + }) - It("retransmits RTO packets", func() { - // We simulate consistently low RTTs, so that the test works faster - n := protocol.PacketNumber(10) - for p := protocol.PacketNumber(1); p < n; p++ { - err := sess.sentPacketHandler.SentPacket(&ackhandler.Packet{PacketNumber: p, Length: 1}) - Expect(err).NotTo(HaveOccurred()) - time.Sleep(time.Microsecond) - ack := &frames.AckFrame{} - ack.LargestAcked = p - err = sess.sentPacketHandler.ReceivedAck(ack, p, time.Now()) - Expect(err).NotTo(HaveOccurred()) - } - sess.packer.packetNumberGenerator.next = n + 1 - // Now, we send a single packet, and expect that it was retransmitted later + It("doesn't retransmit WindowUpdates for closed streams", func() { + str, err := sess.GetOrOpenStream(5) + Expect(err).ToNot(HaveOccurred()) + // close the stream + str.(*stream).sentFin() + str.Close() + str.(*stream).RegisterRemoteError(nil) + sess.garbageCollectStreams() + _, err = sess.flowControlManager.SendWindowSize(5) + Expect(err).To(MatchError("Error accessing the flowController map.")) + sph.retransmissionQueue = []*ackhandler.Packet{{ + Frames: []frames.Frame{&frames.WindowUpdateFrame{ + StreamID: 5, + ByteOffset: 0x1337, + }}, + EncryptionLevel: protocol.EncryptionForwardSecure, + }} + err = sess.sendPacket() + Expect(err).ToNot(HaveOccurred()) + Expect(sph.sentPackets).To(BeEmpty()) + }) + }) + }) + + It("retransmits RTO packets", func() { + sess.packer.cryptoSetup = &mockCryptoSetup{encLevelSeal: protocol.EncryptionForwardSecure} + // We simulate consistently low RTTs, so that the test works faster + n := protocol.PacketNumber(10) + for p := protocol.PacketNumber(1); p < n; p++ { err := sess.sentPacketHandler.SentPacket(&ackhandler.Packet{ - PacketNumber: n, - Length: 1, - Frames: []frames.Frame{&frames.StreamFrame{ - Data: []byte("foobar"), - }}, + PacketNumber: p, + Length: 1, + EncryptionLevel: protocol.EncryptionForwardSecure, }) Expect(err).NotTo(HaveOccurred()) - go sess.run() - sess.scheduleSending() - Eventually(func() [][]byte { return mconn.written }).ShouldNot(BeEmpty()) - Expect(mconn.written[0]).To(ContainSubstring("foobar")) + time.Sleep(time.Microsecond) + ack := &frames.AckFrame{} + ack.LargestAcked = p + err = sess.sentPacketHandler.ReceivedAck(ack, p, time.Now()) + Expect(err).NotTo(HaveOccurred()) + } + sess.packer.packetNumberGenerator.next = n + 1 + // Now, we send a single packet, and expect that it was retransmitted later + err := sess.sentPacketHandler.SentPacket(&ackhandler.Packet{ + PacketNumber: n, + Length: 1, + Frames: []frames.Frame{&frames.StreamFrame{ + Data: []byte("foobar"), + }}, + EncryptionLevel: protocol.EncryptionForwardSecure, }) + Expect(err).NotTo(HaveOccurred()) + go sess.run() + sess.scheduleSending() + Eventually(func() [][]byte { return mconn.written }).ShouldNot(BeEmpty()) + Expect(mconn.written[0]).To(ContainSubstring("foobar")) }) Context("scheduling sending", func() {