retransmit packets with the encryption level they were sent with

This commit is contained in:
Marten Seemann
2017-02-25 16:40:53 +07:00
parent 5e91f139ce
commit 936a29ff35
2 changed files with 256 additions and 186 deletions

View File

@@ -582,21 +582,39 @@ func (s *session) sendPacket() error {
}
utils.Debugf("\tDequeueing retransmission for packet 0x%x", retransmitPacket.PacketNumber)
// resend the frames that were in the packet
for _, frame := range retransmitPacket.GetFramesForRetransmission() {
switch frame.(type) {
case *frames.StreamFrame:
s.streamFramer.AddFrameForRetransmission(frame.(*frames.StreamFrame))
case *frames.WindowUpdateFrame:
// only retransmit WindowUpdates if the stream is not yet closed and the we haven't sent another WindowUpdate with a higher ByteOffset for the stream
var currentOffset protocol.ByteCount
f := frame.(*frames.WindowUpdateFrame)
currentOffset, err = s.flowControlManager.GetReceiveWindow(f.StreamID)
if err == nil && f.ByteOffset >= currentOffset {
if retransmitPacket.EncryptionLevel != protocol.EncryptionForwardSecure {
utils.Debugf("\tDequeueing handshake retransmission for packet 0x%x", retransmitPacket.PacketNumber)
stopWaitingFrame := s.sentPacketHandler.GetStopWaitingFrame(true)
var packet *packedPacket
packet, err = s.packer.RetransmitNonForwardSecurePacket(stopWaitingFrame, retransmitPacket)
if err != nil {
return err
}
if packet == nil {
continue
}
err = s.sendPackedPacket(packet)
if err != nil {
return err
}
continue
} else {
// resend the frames that were in the packet
for _, frame := range retransmitPacket.GetFramesForRetransmission() {
switch frame.(type) {
case *frames.StreamFrame:
s.streamFramer.AddFrameForRetransmission(frame.(*frames.StreamFrame))
case *frames.WindowUpdateFrame:
// only retransmit WindowUpdates if the stream is not yet closed and the we haven't sent another WindowUpdate with a higher ByteOffset for the stream
var currentOffset protocol.ByteCount
f := frame.(*frames.WindowUpdateFrame)
currentOffset, err = s.flowControlManager.GetReceiveWindow(f.StreamID)
if err == nil && f.ByteOffset >= currentOffset {
controlFrames = append(controlFrames, frame)
}
default:
controlFrames = append(controlFrames, frame)
}
default:
controlFrames = append(controlFrames, frame)
}
}
}
@@ -622,20 +640,7 @@ func (s *session) sendPacket() error {
s.packer.QueueControlFrameForNextPacket(f)
}
err = s.sentPacketHandler.SentPacket(&ackhandler.Packet{
PacketNumber: packet.number,
Frames: packet.frames,
Length: protocol.ByteCount(len(packet.raw)),
EncryptionLevel: packet.encryptionLevel,
})
if err != nil {
return err
}
s.logPacket(packet)
err = s.conn.Write(packet.raw)
putPacketBuffer(packet.raw)
err = s.sendPackedPacket(packet)
if err != nil {
return err
}
@@ -643,6 +648,24 @@ func (s *session) sendPacket() error {
}
}
func (s *session) sendPackedPacket(packet *packedPacket) error {
err := s.sentPacketHandler.SentPacket(&ackhandler.Packet{
PacketNumber: packet.number,
Frames: packet.frames,
Length: protocol.ByteCount(len(packet.raw)),
EncryptionLevel: packet.encryptionLevel,
})
if err != nil {
return err
}
s.logPacket(packet)
err = s.conn.Write(packet.raw)
putPacketBuffer(packet.raw)
return err
}
func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error {
packet, err := s.packer.PackConnectionClose(&frames.ConnectionCloseFrame{ErrorCode: quicErr.ErrorCode, ReasonPhrase: quicErr.ErrorMessage}, s.sentPacketHandler.GetLeastUnacked())
if err != nil {

View File

@@ -825,189 +825,236 @@ var _ = Describe("Session", func() {
})
Context("retransmissions", func() {
var sph *mockSentPacketHandler
BeforeEach(func() {
// a StopWaitingFrame is added, so make sure the packet number of the new package is higher than the packet number of the retransmitted packet
sess.packer.packetNumberGenerator.next = 0x1337 + 10
sph = newMockSentPacketHandler().(*mockSentPacketHandler)
sess.sentPacketHandler = sph
sess.packer.cryptoSetup = &mockCryptoSetup{encLevelSeal: protocol.EncryptionForwardSecure}
})
It("sends a StreamFrame from a packet queued for retransmission", func() {
// a StopWaitingFrame is added, so make sure the packet number of the new package is higher than the packet number of the retransmitted packet
sess.packer.packetNumberGenerator.next = 0x1337 + 9
Context("for handshake packets", func() {
It("retransmits an unencrypted packet", func() {
sf := &frames.StreamFrame{StreamID: 1, Data: []byte("foobar")}
sph.retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{sf},
EncryptionLevel: protocol.EncryptionUnencrypted,
}}
err := sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
Expect(mconn.written).To(HaveLen(1))
sentPackets := sph.sentPackets
Expect(sentPackets).To(HaveLen(1))
Expect(sentPackets[0].EncryptionLevel).To(Equal(protocol.EncryptionUnencrypted))
Expect(sentPackets[0].Frames).To(HaveLen(2))
Expect(sentPackets[0].Frames[1]).To(Equal(sf))
swf := sentPackets[0].Frames[0].(*frames.StopWaitingFrame)
Expect(swf.LeastUnacked).To(Equal(protocol.PacketNumber(0x1337)))
})
f := frames.StreamFrame{
StreamID: 0x5,
Data: []byte("foobar1234567"),
}
p := ackhandler.Packet{
PacketNumber: 0x1337,
Frames: []frames.Frame{&f},
}
sph := newMockSentPacketHandler()
sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{&p}
sess.sentPacketHandler = sph
It("doesn't retransmit non-retransmittable packets", func() {
sph.retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{
&frames.AckFrame{},
&frames.StopWaitingFrame{},
},
EncryptionLevel: protocol.EncryptionUnencrypted,
}}
err := sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
Expect(mconn.written).To(BeEmpty())
})
err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(mconn.written).To(HaveLen(1))
Expect(sph.(*mockSentPacketHandler).requestedStopWaiting).To(BeTrue())
Expect(mconn.written[0]).To(ContainSubstring("foobar1234567"))
It("retransmit a packet encrypted with the initial encryption", func() {
sf := &frames.StreamFrame{StreamID: 1, Data: []byte("foobar")}
sph.retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{sf},
EncryptionLevel: protocol.EncryptionSecure,
}}
err := sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
Expect(mconn.written).To(HaveLen(1))
sentPackets := sph.sentPackets
Expect(sentPackets).To(HaveLen(1))
Expect(sentPackets[0].EncryptionLevel).To(Equal(protocol.EncryptionSecure))
Expect(sentPackets[0].Frames).To(HaveLen(2))
Expect(sentPackets[0].Frames).To(ContainElement(sf))
})
})
It("sends a StreamFrame from a packet queued for retransmission", func() {
// a StopWaitingFrame is added, so make sure the packet number of the new package is higher than the packet number of the retransmitted packet
sess.packer.packetNumberGenerator.next = 0x1337 + 9
Context("for packets after the handshake", func() {
BeforeEach(func() {
sess.packer.SetForwardSecure()
})
f1 := frames.StreamFrame{
StreamID: 0x5,
Data: []byte("foobar"),
}
f2 := frames.StreamFrame{
StreamID: 0x7,
Data: []byte("loremipsum"),
}
p1 := ackhandler.Packet{
PacketNumber: 0x1337,
Frames: []frames.Frame{&f1},
}
p2 := ackhandler.Packet{
PacketNumber: 0x1338,
Frames: []frames.Frame{&f2},
}
sph := newMockSentPacketHandler()
sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{&p1, &p2}
sess.sentPacketHandler = sph
It("sends a StreamFrame from a packet queued for retransmission", func() {
f := frames.StreamFrame{
StreamID: 0x5,
Data: []byte("foobar1234567"),
}
p := ackhandler.Packet{
PacketNumber: 0x1337,
Frames: []frames.Frame{&f},
EncryptionLevel: protocol.EncryptionForwardSecure,
}
sph.retransmissionQueue = []*ackhandler.Packet{&p}
err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(mconn.written).To(HaveLen(1))
Expect(mconn.written[0]).To(ContainSubstring("foobar"))
Expect(mconn.written[0]).To(ContainSubstring("loremipsum"))
})
err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(mconn.written).To(HaveLen(1))
Expect(sph.requestedStopWaiting).To(BeTrue())
Expect(mconn.written[0]).To(ContainSubstring("foobar1234567"))
})
It("always attaches a StopWaiting to a packet that contains a retransmission", func() {
// make sure the packet number of the new package is higher than the packet number of the retransmitted packet
sess.packer.packetNumberGenerator.next = 0x1337 + 9
It("sends a StreamFrame from a packet queued for retransmission", func() {
f1 := frames.StreamFrame{
StreamID: 0x5,
Data: []byte("foobar"),
}
f2 := frames.StreamFrame{
StreamID: 0x7,
Data: []byte("loremipsum"),
}
p1 := ackhandler.Packet{
PacketNumber: 0x1337,
Frames: []frames.Frame{&f1},
EncryptionLevel: protocol.EncryptionForwardSecure,
}
p2 := ackhandler.Packet{
PacketNumber: 0x1338,
Frames: []frames.Frame{&f2},
EncryptionLevel: protocol.EncryptionForwardSecure,
}
sph.retransmissionQueue = []*ackhandler.Packet{&p1, &p2}
f := &frames.StreamFrame{
StreamID: 0x5,
Data: bytes.Repeat([]byte{'f'}, int(1.5*float32(protocol.MaxPacketSize))),
}
sess.streamFramer.AddFrameForRetransmission(f)
err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(mconn.written).To(HaveLen(1))
Expect(mconn.written[0]).To(ContainSubstring("foobar"))
Expect(mconn.written[0]).To(ContainSubstring("loremipsum"))
})
sph := newMockSentPacketHandler()
sess.sentPacketHandler = sph
It("always attaches a StopWaiting to a packet that contains a retransmission", func() {
f := &frames.StreamFrame{
StreamID: 0x5,
Data: bytes.Repeat([]byte{'f'}, int(1.5*float32(protocol.MaxPacketSize))),
}
sess.streamFramer.AddFrameForRetransmission(f)
err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(mconn.written).To(HaveLen(2))
sentPackets := sph.(*mockSentPacketHandler).sentPackets
Expect(sentPackets).To(HaveLen(2))
_, ok := sentPackets[0].Frames[0].(*frames.StopWaitingFrame)
Expect(ok).To(BeTrue())
_, ok = sentPackets[1].Frames[0].(*frames.StopWaitingFrame)
Expect(ok).To(BeTrue())
})
err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(mconn.written).To(HaveLen(2))
sentPackets := sph.sentPackets
Expect(sentPackets).To(HaveLen(2))
_, ok := sentPackets[0].Frames[0].(*frames.StopWaitingFrame)
Expect(ok).To(BeTrue())
_, ok = sentPackets[1].Frames[0].(*frames.StopWaitingFrame)
Expect(ok).To(BeTrue())
})
It("calls MaybeQueueRTOs even if congestion blocked, so that bytesInFlight is updated", func() {
sph := newMockSentPacketHandler()
sph.(*mockSentPacketHandler).congestionLimited = true
sess.sentPacketHandler = sph
err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(sph.(*mockSentPacketHandler).maybeQueueRTOsCalled).To(BeTrue())
})
It("calls MaybeQueueRTOs even if congestion blocked, so that bytesInFlight is updated", func() {
sph.congestionLimited = true
sess.sentPacketHandler = sph
err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(sph.maybeQueueRTOsCalled).To(BeTrue())
})
It("retransmits a WindowUpdates if it hasn't already sent a WindowUpdate with a higher ByteOffset", func() {
_, err := sess.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
fc := newMockFlowControlHandler()
fc.receiveWindow = 0x1000
sess.flowControlManager = fc
sph := newMockSentPacketHandler()
sess.sentPacketHandler = sph
wuf := &frames.WindowUpdateFrame{
StreamID: 5,
ByteOffset: 0x1000,
}
sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{wuf},
}}
err = sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
sentPackets := sph.(*mockSentPacketHandler).sentPackets
Expect(sentPackets).To(HaveLen(1))
Expect(sentPackets[0].Frames).To(ContainElement(wuf))
})
It("doesn't retransmit WindowUpdates if it already sent a WindowUpdate with a higher ByteOffset", func() {
_, err := sess.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
fc := newMockFlowControlHandler()
fc.receiveWindow = 0x2000
sess.flowControlManager = fc
sph := newMockSentPacketHandler()
sess.sentPacketHandler = sph
sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{&frames.WindowUpdateFrame{
It("retransmits a WindowUpdates if it hasn't already sent a WindowUpdate with a higher ByteOffset", func() {
_, err := sess.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
fc := newMockFlowControlHandler()
fc.receiveWindow = 0x1000
sess.flowControlManager = fc
wuf := &frames.WindowUpdateFrame{
StreamID: 5,
ByteOffset: 0x1000,
}},
}}
err = sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
Expect(sph.(*mockSentPacketHandler).sentPackets).To(BeEmpty())
})
}
sph.retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{wuf},
EncryptionLevel: protocol.EncryptionForwardSecure,
}}
err = sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
Expect(sph.sentPackets).To(HaveLen(1))
Expect(sph.sentPackets[0].Frames).To(ContainElement(wuf))
})
It("doesn't retransmit WindowUpdates for closed streams", func() {
str, err := sess.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
// close the stream
str.(*stream).sentFin()
str.Close()
str.(*stream).RegisterRemoteError(nil)
sess.garbageCollectStreams()
_, err = sess.flowControlManager.SendWindowSize(5)
Expect(err).To(MatchError("Error accessing the flowController map."))
sph := newMockSentPacketHandler()
sess.sentPacketHandler = sph
sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{&frames.WindowUpdateFrame{
StreamID: 5,
ByteOffset: 0x1337,
}},
}}
err = sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
sentPackets := sph.(*mockSentPacketHandler).sentPackets
Expect(sentPackets).To(BeEmpty())
})
It("doesn't retransmit WindowUpdates if it already sent a WindowUpdate with a higher ByteOffset", func() {
_, err := sess.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
fc := newMockFlowControlHandler()
fc.receiveWindow = 0x2000
sess.flowControlManager = fc
sph.retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{&frames.WindowUpdateFrame{
StreamID: 5,
ByteOffset: 0x1000,
}},
EncryptionLevel: protocol.EncryptionForwardSecure,
}}
err = sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
Expect(sph.sentPackets).To(BeEmpty())
})
It("retransmits RTO packets", func() {
// We simulate consistently low RTTs, so that the test works faster
n := protocol.PacketNumber(10)
for p := protocol.PacketNumber(1); p < n; p++ {
err := sess.sentPacketHandler.SentPacket(&ackhandler.Packet{PacketNumber: p, Length: 1})
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Microsecond)
ack := &frames.AckFrame{}
ack.LargestAcked = p
err = sess.sentPacketHandler.ReceivedAck(ack, p, time.Now())
Expect(err).NotTo(HaveOccurred())
}
sess.packer.packetNumberGenerator.next = n + 1
// Now, we send a single packet, and expect that it was retransmitted later
It("doesn't retransmit WindowUpdates for closed streams", func() {
str, err := sess.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
// close the stream
str.(*stream).sentFin()
str.Close()
str.(*stream).RegisterRemoteError(nil)
sess.garbageCollectStreams()
_, err = sess.flowControlManager.SendWindowSize(5)
Expect(err).To(MatchError("Error accessing the flowController map."))
sph.retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{&frames.WindowUpdateFrame{
StreamID: 5,
ByteOffset: 0x1337,
}},
EncryptionLevel: protocol.EncryptionForwardSecure,
}}
err = sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
Expect(sph.sentPackets).To(BeEmpty())
})
})
})
It("retransmits RTO packets", func() {
sess.packer.cryptoSetup = &mockCryptoSetup{encLevelSeal: protocol.EncryptionForwardSecure}
// We simulate consistently low RTTs, so that the test works faster
n := protocol.PacketNumber(10)
for p := protocol.PacketNumber(1); p < n; p++ {
err := sess.sentPacketHandler.SentPacket(&ackhandler.Packet{
PacketNumber: n,
Length: 1,
Frames: []frames.Frame{&frames.StreamFrame{
Data: []byte("foobar"),
}},
PacketNumber: p,
Length: 1,
EncryptionLevel: protocol.EncryptionForwardSecure,
})
Expect(err).NotTo(HaveOccurred())
go sess.run()
sess.scheduleSending()
Eventually(func() [][]byte { return mconn.written }).ShouldNot(BeEmpty())
Expect(mconn.written[0]).To(ContainSubstring("foobar"))
time.Sleep(time.Microsecond)
ack := &frames.AckFrame{}
ack.LargestAcked = p
err = sess.sentPacketHandler.ReceivedAck(ack, p, time.Now())
Expect(err).NotTo(HaveOccurred())
}
sess.packer.packetNumberGenerator.next = n + 1
// Now, we send a single packet, and expect that it was retransmitted later
err := sess.sentPacketHandler.SentPacket(&ackhandler.Packet{
PacketNumber: n,
Length: 1,
Frames: []frames.Frame{&frames.StreamFrame{
Data: []byte("foobar"),
}},
EncryptionLevel: protocol.EncryptionForwardSecure,
})
Expect(err).NotTo(HaveOccurred())
go sess.run()
sess.scheduleSending()
Eventually(func() [][]byte { return mconn.written }).ShouldNot(BeEmpty())
Expect(mconn.written[0]).To(ContainSubstring("foobar"))
})
Context("scheduling sending", func() {