refactor sending of ACK-only packets in a separate function

No functional change expected.
This commit is contained in:
Marten Seemann
2017-12-28 09:24:34 +07:00
parent f8e5a13c7d
commit 46384c4845
2 changed files with 110 additions and 67 deletions

View File

@@ -396,9 +396,16 @@ runLoop:
s.keepAlivePingSent = true
}
if err := s.sendPacket(); err != nil {
s.closeLocal(err)
if !s.sentPacketHandler.SendingAllowed() { // if congestion limited, at least try sending an ACK frame
if err := s.maybeSendAckOnlyPacket(); err != nil {
s.closeLocal(err)
}
} else {
if err := s.sendPacket(); err != nil {
s.closeLocal(err)
}
}
if !s.receivedTooManyUndecrytablePacketsTime.IsZero() && s.receivedTooManyUndecrytablePacketsTime.Add(protocol.PublicResetTimeout).Before(now) && len(s.undecryptablePackets) != 0 {
s.closeLocal(qerr.Error(qerr.DecryptionFailure, "too many undecryptable packets received"))
}
@@ -697,6 +704,25 @@ func (s *session) processTransportParameters(params *handshake.TransportParamete
// so we don't need to update stream flow control windows
}
func (s *session) maybeSendAckOnlyPacket() error {
ack := s.receivedPacketHandler.GetAckFrame()
if ack == nil {
return nil
}
s.packer.QueueControlFrame(ack)
if !s.version.UsesIETFFrameFormat() { // for gQUIC, maybe add a STOP_WAITING
if swf := s.sentPacketHandler.GetStopWaitingFrame(false); swf != nil {
s.packer.QueueControlFrame(swf)
}
}
packet, err := s.packer.PackAckPacket()
if err != nil {
return err
}
return s.sendPackedPacket(packet)
}
func (s *session) sendPacket() error {
s.packer.SetLeastUnacked(s.sentPacketHandler.GetLeastUnacked())
@@ -716,20 +742,7 @@ func (s *session) sendPacket() error {
// Repeatedly try sending until we don't have any more data, or run out of the congestion window
for {
if !s.sentPacketHandler.SendingAllowed() {
if ack == nil {
return nil
}
// If we aren't allowed to send, at least try sending an ACK frame
if !s.version.UsesIETFFrameFormat() {
if swf := s.sentPacketHandler.GetStopWaitingFrame(false); swf != nil {
s.packer.QueueControlFrame(swf)
}
}
packet, err := s.packer.PackAckPacket()
if err != nil {
return err
}
return s.sendPackedPacket(packet)
return nil
}
// check for retransmissions first
@@ -752,7 +765,7 @@ func (s *session) sendPacket() error {
if err != nil {
return err
}
if err = s.sendPackedPacket(packet); err != nil {
if err := s.sendPackedPacket(packet); err != nil {
return err
}
} else {
@@ -787,7 +800,6 @@ func (s *session) sendPacket() error {
if err = s.sendPackedPacket(packet); err != nil {
return err
}
ack = nil
}
}

View File

@@ -745,51 +745,19 @@ var _ = Describe("Session", func() {
Expect(mconn.written).To(Receive(ContainSubstring(string([]byte{0x03, 0x5e}))))
})
It("sends ACK frames when congestion limited", func() {
swf := &wire.StopWaitingFrame{LeastUnacked: 10}
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().SendingAllowed().Return(false)
sph.EXPECT().GetStopWaitingFrame(false).Return(swf)
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(HaveLen(2))
Expect(p.Frames[0]).To(BeAssignableToTypeOf(&wire.AckFrame{}))
Expect(p.Frames[1]).To(Equal(swf))
})
sess.sentPacketHandler = sph
sess.packer.packetNumberGenerator.next = 0x1338
sess.receivedPacketHandler.ReceivedPacket(1, true)
err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(mconn.written).To(HaveLen(1))
})
It("doesn't include a STOP_WAITING for an ACK-only packet for IETF QUIC", func() {
sess.version = versionIETFFrames
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().SendingAllowed().Return(false)
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(HaveLen(1))
Expect(p.Frames[0]).To(BeAssignableToTypeOf(&wire.AckFrame{}))
})
sess.sentPacketHandler = sph
sess.packer.packetNumberGenerator.next = 0x1338
sess.receivedPacketHandler.ReceivedPacket(1, true)
err := sess.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(mconn.written).To(HaveLen(1))
})
It("sends a retransmittable packet when required by the SentPacketHandler", func() {
sess.packer.QueueControlFrame(&wire.AckFrame{LargestAcked: 1000})
ack := &wire.AckFrame{LargestAcked: 1000}
sess.packer.QueueControlFrame(ack)
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().SendingAllowed().Return(true)
sph.EXPECT().SendingAllowed().Return(false)
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().ShouldSendRetransmittablePacket().Return(true)
sph.EXPECT().SentPacket(gomock.Any())
sph.EXPECT().SendingAllowed().Return(true)
sph.EXPECT().SendingAllowed()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(HaveLen(2))
Expect(p.Frames).To(ContainElement(ack))
})
sess.sentPacketHandler = sph
err := sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
@@ -803,10 +771,10 @@ var _ = Describe("Session", func() {
sess.connFlowController = fc
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().SendingAllowed().Return(true)
sph.EXPECT().SendingAllowed()
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().ShouldSendRetransmittablePacket()
sph.EXPECT().SendingAllowed().Return(true)
sph.EXPECT().SendingAllowed()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(Equal([]wire.Frame{
&wire.MaxDataFrame{ByteOffset: 0x1337},
@@ -824,10 +792,10 @@ var _ = Describe("Session", func() {
})
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().SendingAllowed().Return(true)
sph.EXPECT().SendingAllowed()
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().ShouldSendRetransmittablePacket()
sph.EXPECT().SendingAllowed().Return(true)
sph.EXPECT().SendingAllowed()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(ContainElement(&wire.MaxStreamDataFrame{StreamID: 2, ByteOffset: 20}))
})
@@ -843,10 +811,10 @@ var _ = Describe("Session", func() {
sess.connFlowController = fc
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().SendingAllowed().Return(true)
sph.EXPECT().SendingAllowed()
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().ShouldSendRetransmittablePacket()
sph.EXPECT().SendingAllowed().Return(true)
sph.EXPECT().SendingAllowed()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(Equal([]wire.Frame{
&wire.BlockedFrame{Offset: 1337},
@@ -873,10 +841,10 @@ var _ = Describe("Session", func() {
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLeastUnacked().AnyTimes()
sph.EXPECT().GetStopWaitingFrame(gomock.Any())
sph.EXPECT().SendingAllowed().Return(true)
sph.EXPECT().SendingAllowed().Return(false)
sph.EXPECT().DequeuePacketForRetransmission()
sph.EXPECT().ShouldSendRetransmittablePacket()
sph.EXPECT().SendingAllowed().Return(true)
sph.EXPECT().SendingAllowed()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
sentPacket = p
})
@@ -897,6 +865,70 @@ var _ = Describe("Session", func() {
})
})
Context("sending ACK only packets", func() {
It("doesn't do anything if there's no ACK to be sent", func() {
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sess.sentPacketHandler = sph
err := sess.maybeSendAckOnlyPacket()
Expect(err).ToNot(HaveOccurred())
Expect(mconn.written).To(BeEmpty())
})
It("sends ACK only packets", func() {
swf := &wire.StopWaitingFrame{LeastUnacked: 10}
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLeastUnacked()
sph.EXPECT().GetAlarmTimeout().AnyTimes()
sph.EXPECT().SendingAllowed()
sph.EXPECT().GetStopWaitingFrame(false).Return(swf)
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(HaveLen(2))
Expect(p.Frames[0]).To(BeAssignableToTypeOf(&wire.AckFrame{}))
Expect(p.Frames[1]).To(Equal(swf))
})
sess.sentPacketHandler = sph
sess.packer.packetNumberGenerator.next = 0x1338
sess.receivedPacketHandler.ReceivedPacket(1, true)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(1))
// make sure that the go routine returns
sess.Close(nil)
Eventually(done).Should(BeClosed())
})
It("doesn't include a STOP_WAITING for an ACK-only packet for IETF QUIC", func() {
sess.version = versionIETFFrames
done := make(chan struct{})
sph := mockackhandler.NewMockSentPacketHandler(mockCtrl)
sph.EXPECT().GetLeastUnacked()
sph.EXPECT().GetAlarmTimeout().AnyTimes()
sph.EXPECT().SendingAllowed()
sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {
Expect(p.Frames).To(HaveLen(1))
Expect(p.Frames[0]).To(BeAssignableToTypeOf(&wire.AckFrame{}))
})
sess.sentPacketHandler = sph
sess.packer.packetNumberGenerator.next = 0x1338
sess.receivedPacketHandler.ReceivedPacket(1, true)
go func() {
defer GinkgoRecover()
sess.run()
close(done)
}()
sess.scheduleSending()
Eventually(mconn.written).Should(HaveLen(1))
// make sure that the go routine returns
sess.Close(nil)
Eventually(done).Should(BeClosed())
})
})
Context("retransmissions", func() {
var sph *mockackhandler.MockSentPacketHandler
BeforeEach(func() {
@@ -928,7 +960,6 @@ var _ = Describe("Session", func() {
err := sess.sendPacket()
Expect(err).ToNot(HaveOccurred())
Expect(mconn.written).To(HaveLen(1))
})
It("retransmits an unencrypted packet, and doesn't add a STOP_WAITING frame (for IETF QUIC)", func() {