diff --git a/internal/ackhandler/interfaces.go b/internal/ackhandler/interfaces.go index ec36226df..eb74e4031 100644 --- a/internal/ackhandler/interfaces.go +++ b/internal/ackhandler/interfaces.go @@ -14,11 +14,8 @@ type SentPacketHandler interface { ReceivedAck(ackFrame *wire.AckFrame, withPacketNumber protocol.PacketNumber, encLevel protocol.EncryptionLevel, recvTime time.Time) error SetHandshakeComplete() - // SendingAllowed says if a packet can be sent. - // Sending packets might not be possible because: - // * we're congestion limited - // * we're tracking the maximum number of sent packets - SendingAllowed() bool + // The SendMode determines if and what kind of packets can be sent. + SendMode() SendMode // TimeUntilSend is the time when the next packet should be sent. // It is used for pacing packets. TimeUntilSend() time.Time diff --git a/internal/ackhandler/send_mode.go b/internal/ackhandler/send_mode.go new file mode 100644 index 000000000..3ece73522 --- /dev/null +++ b/internal/ackhandler/send_mode.go @@ -0,0 +1,32 @@ +package ackhandler + +import "fmt" + +// The SendMode says what kind of packets can be sent. +type SendMode uint8 + +const ( + // SendNone means that no packets should be sent + SendNone SendMode = iota + // SendAck means an ACK-only packet should be sent + SendAck + // SendRetransmission means that retransmissions should be sent + SendRetransmission + // SendAny packet should be sent + SendAny +) + +func (s SendMode) String() string { + switch s { + case SendNone: + return "none" + case SendAck: + return "ack" + case SendRetransmission: + return "retransmission" + case SendAny: + return "any" + default: + return fmt.Sprintf("invalid send mode: %d", s) + } +} diff --git a/internal/ackhandler/send_mode_test.go b/internal/ackhandler/send_mode_test.go new file mode 100644 index 000000000..1c55f85f0 --- /dev/null +++ b/internal/ackhandler/send_mode_test.go @@ -0,0 +1,16 @@ +package ackhandler + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Send Mode", func() { + It("has a string representation", func() { + Expect(SendNone.String()).To(Equal("none")) + Expect(SendAny.String()).To(Equal("any")) + Expect(SendAck.String()).To(Equal("ack")) + Expect(SendRetransmission.String()).To(Equal("retransmission")) + Expect(SendMode(123).String()).To(Equal("invalid send mode: 123")) + }) +}) diff --git a/internal/ackhandler/sent_packet_handler.go b/internal/ackhandler/sent_packet_handler.go index 0d5d38930..98a04dd7b 100644 --- a/internal/ackhandler/sent_packet_handler.go +++ b/internal/ackhandler/sent_packet_handler.go @@ -357,18 +357,31 @@ func (h *sentPacketHandler) GetStopWaitingFrame(force bool) *wire.StopWaitingFra return h.stopWaitingManager.GetStopWaitingFrame(force) } -func (h *sentPacketHandler) SendingAllowed() bool { - cwnd := h.congestion.GetCongestionWindow() - congestionLimited := h.bytesInFlight > cwnd - maxTrackedLimited := protocol.PacketNumber(len(h.retransmissionQueue)+h.packetHistory.Len()) >= protocol.MaxTrackedSentPackets - if congestionLimited { - utils.Debugf("Congestion limited: bytes in flight %d, window %d", h.bytesInFlight, cwnd) +func (h *sentPacketHandler) SendMode() SendMode { + numTrackedPackets := len(h.retransmissionQueue) + h.packetHistory.Len() + + // Don't send any packets if we're keeping track of the maximum number of packets. + // Note that since MaxOutstandingSentPackets is smaller than MaxTrackedSentPackets, + // we will stop sending out new data when reaching MaxOutstandingSentPackets, + // but still allow sending of retransmissions and ACKs. + if numTrackedPackets >= protocol.MaxTrackedSentPackets { + utils.Debugf("Limited by the number of tracked packets: tracking %d packets, maximum %d", numTrackedPackets, protocol.MaxTrackedSentPackets) + return SendNone } - // Workaround for #555: - // Always allow sending of retransmissions. This should probably be limited - // to RTOs, but we currently don't have a nice way of distinguishing them. - haveRetransmissions := len(h.retransmissionQueue) > 0 - return !maxTrackedLimited && (!congestionLimited || haveRetransmissions) + // Send retransmissions first, if there are any. + if len(h.retransmissionQueue) > 0 { + return SendRetransmission + } + // Only send ACKs if we're congestion limited. + if cwnd := h.congestion.GetCongestionWindow(); h.bytesInFlight > cwnd { + utils.Debugf("Congestion limited: bytes in flight %d, window %d", h.bytesInFlight, cwnd) + return SendAck + } + if numTrackedPackets >= protocol.MaxOutstandingSentPackets { + utils.Debugf("Max outstanding limited: tracking %d packets, maximum: %d", numTrackedPackets, protocol.MaxOutstandingSentPackets) + return SendAck + } + return SendAny } func (h *sentPacketHandler) TimeUntilSend() time.Time { diff --git a/internal/ackhandler/sent_packet_handler_test.go b/internal/ackhandler/sent_packet_handler_test.go index 6b0ccbac2..00a638234 100644 --- a/internal/ackhandler/sent_packet_handler_test.go +++ b/internal/ackhandler/sent_packet_handler_test.go @@ -694,28 +694,39 @@ var _ = Describe("SentPacketHandler", func() { handler.OnAlarm() // RTO, meaning 2 lost packets }) - It("allows or denies sending based on congestion", func() { + It("only allows sending of ACKs when congestion limited", func() { handler.bytesInFlight = 100 cong.EXPECT().GetCongestionWindow().Return(protocol.ByteCount(200)) - Expect(handler.SendingAllowed()).To(BeTrue()) + Expect(handler.SendMode()).To(Equal(SendAny)) cong.EXPECT().GetCongestionWindow().Return(protocol.ByteCount(75)) - Expect(handler.SendingAllowed()).To(BeFalse()) + Expect(handler.SendMode()).To(Equal(SendAck)) }) - It("allows or denies sending based on the number of tracked packets", func() { - cong.EXPECT().GetCongestionWindow().Times(2) - Expect(handler.SendingAllowed()).To(BeTrue()) - handler.retransmissionQueue = make([]*Packet, protocol.MaxTrackedSentPackets) - Expect(handler.SendingAllowed()).To(BeFalse()) + It("only allows sending of ACKs when we're keeping track of MaxOutstandingSentPackets packets", func() { + cong.EXPECT().GetCongestionWindow().Return(protocol.MaxByteCount).AnyTimes() + cong.EXPECT().TimeUntilSend(gomock.Any()).AnyTimes() + cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + for i := 1; i < protocol.MaxOutstandingSentPackets; i++ { + handler.SentPacket(retransmittablePacket(protocol.PacketNumber(i))) + Expect(handler.SendMode()).To(Equal(SendAny)) + } + handler.SentPacket(retransmittablePacket(protocol.MaxOutstandingSentPackets)) + Expect(handler.SendMode()).To(Equal(SendAck)) }) - It("allows sending if there are retransmisisons outstanding", func() { - cong.EXPECT().GetCongestionWindow().Times(2) - handler.bytesInFlight = 100 - Expect(handler.retransmissionQueue).To(BeEmpty()) - Expect(handler.SendingAllowed()).To(BeFalse()) + It("allows sending retransmissions", func() { + // note that we don't EXPECT a call to GetCongestionWindow + // that means retransmissions are sent without considering the congestion window handler.retransmissionQueue = []*Packet{{PacketNumber: 3}} - Expect(handler.SendingAllowed()).To(BeTrue()) + Expect(handler.SendMode()).To(Equal(SendRetransmission)) + }) + + It("allow retransmissions, if we're keeping track of between MaxOutstandingSentPackets and MaxTrackedSentPackets packets", func() { + Expect(protocol.MaxOutstandingSentPackets).To(BeNumerically("<", protocol.MaxTrackedSentPackets)) + handler.retransmissionQueue = make([]*Packet, protocol.MaxOutstandingSentPackets+10) + Expect(handler.SendMode()).To(Equal(SendRetransmission)) + handler.retransmissionQueue = make([]*Packet, protocol.MaxTrackedSentPackets) + Expect(handler.SendMode()).To(Equal(SendNone)) }) It("gets the pacing delay", func() { diff --git a/internal/mocks/ackhandler/sent_packet_handler.go b/internal/mocks/ackhandler/sent_packet_handler.go index 2c0b51324..0a883102f 100644 --- a/internal/mocks/ackhandler/sent_packet_handler.go +++ b/internal/mocks/ackhandler/sent_packet_handler.go @@ -119,16 +119,16 @@ func (mr *MockSentPacketHandlerMockRecorder) ReceivedAck(arg0, arg1, arg2, arg3 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceivedAck", reflect.TypeOf((*MockSentPacketHandler)(nil).ReceivedAck), arg0, arg1, arg2, arg3) } -// SendingAllowed mocks base method -func (m *MockSentPacketHandler) SendingAllowed() bool { - ret := m.ctrl.Call(m, "SendingAllowed") - ret0, _ := ret[0].(bool) +// SendMode mocks base method +func (m *MockSentPacketHandler) SendMode() ackhandler.SendMode { + ret := m.ctrl.Call(m, "SendMode") + ret0, _ := ret[0].(ackhandler.SendMode) return ret0 } -// SendingAllowed indicates an expected call of SendingAllowed -func (mr *MockSentPacketHandlerMockRecorder) SendingAllowed() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendingAllowed", reflect.TypeOf((*MockSentPacketHandler)(nil).SendingAllowed)) +// SendMode indicates an expected call of SendMode +func (mr *MockSentPacketHandlerMockRecorder) SendMode() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMode", reflect.TypeOf((*MockSentPacketHandler)(nil).SendMode)) } // SentPacket mocks base method diff --git a/internal/protocol/server_parameters.go b/internal/protocol/server_parameters.go index 0b295f24b..96ebbdcd7 100644 --- a/internal/protocol/server_parameters.go +++ b/internal/protocol/server_parameters.go @@ -81,8 +81,15 @@ const MaxTrackedSkippedPackets = 10 // CookieExpiryTime is the valid time of a cookie const CookieExpiryTime = 24 * time.Hour -// MaxTrackedSentPackets is maximum number of sent packets saved for either later retransmission or entropy calculation -const MaxTrackedSentPackets = 2 * DefaultMaxCongestionWindow +// MaxOutstandingSentPackets is maximum number of packets saved for retransmission. +// When reached, it imposes a soft limit on sending new packets: +// Sending ACKs and retransmission is still allowed, but now new regular packets can be sent. +const MaxOutstandingSentPackets = 2 * DefaultMaxCongestionWindow + +// MaxTrackedSentPackets is maximum number of sent packets saved for retransmission. +// When reached, no more packets will be sent. +// This value *must* be larger than MaxOutstandingSentPackets. +const MaxTrackedSentPackets = MaxOutstandingSentPackets * 5 / 4 // MaxTrackedReceivedAckRanges is the maximum number of ACK ranges tracked const MaxTrackedReceivedAckRanges = DefaultMaxCongestionWindow diff --git a/session.go b/session.go index d9e5bf133..8e9915633 100644 --- a/session.go +++ b/session.go @@ -766,40 +766,56 @@ func (s *session) processTransportParameters(params *handshake.TransportParamete func (s *session) sendPackets() error { s.pacingDeadline = time.Time{} - if !s.sentPacketHandler.SendingAllowed() { // if congestion limited, at least try sending an ACK frame - return s.maybeSendAckOnlyPacket() + + sendMode := s.sentPacketHandler.SendMode() + if sendMode == ackhandler.SendNone { // shortcut: return immediately if there's nothing to send + return nil } + numPackets := s.sentPacketHandler.ShouldSendNumPackets() var numPacketsSent int - // Send retransmissions, until - // * we're congestion limited, or - // * there are no more retransmissions, or - // * the maximum number of packets was reached - for ; numPacketsSent < numPackets; numPacketsSent++ { - sentPacket, err := s.maybeSendRetransmission() - if err != nil { - return err +sendLoop: + for { + switch sendMode { + case ackhandler.SendNone: + break sendLoop + case ackhandler.SendAck: + // We can at most send a single ACK only packet. + // There will only be a new ACK after receiving new packets. + // SendAck is only returned when we're congestion limited, so we don't need to set the pacingt timer. + return s.maybeSendAckOnlyPacket() + case ackhandler.SendRetransmission: + sentPacket, err := s.maybeSendRetransmission() + if err != nil { + return err + } + if sentPacket { + numPacketsSent++ + // This can happen if a retransmission queued, but it wasn't necessary to send it. + // e.g. when an Initial is queued, but we already received a packet from the server. + } + case ackhandler.SendAny: + sentPacket, err := s.sendPacket() + if err != nil { + return err + } + if !sentPacket { + break sendLoop + } + numPacketsSent++ + default: + return fmt.Errorf("BUG: invalid send mode %d", sendMode) } - if !sentPacket { // no more retransmission to send. Proceed to send new data. + if numPacketsSent >= numPackets { break } - if !s.sentPacketHandler.SendingAllowed() { - return nil - } - } - for ; numPacketsSent < numPackets; numPacketsSent++ { - sentPacket, err := s.sendPacket() - if err != nil { - return err - } - // If no packet was sent, or we're congestion limit, we're done here. - if !sentPacket || !s.sentPacketHandler.SendingAllowed() { - return nil - } + sendMode = s.sentPacketHandler.SendMode() } // Only start the pacing timer if we sent as many packets as we were allowed. // There will probably be more to send when calling sendPacket again. - s.pacingDeadline = s.sentPacketHandler.TimeUntilSend() + if numPacketsSent == numPackets { + s.pacingDeadline = s.sentPacketHandler.TimeUntilSend() + } return nil } @@ -837,7 +853,6 @@ func (s *session) maybeSendRetransmission() (bool, error) { // As soon as we receive one response, we don't need to send any more Initials. if s.receivedFirstPacket && retransmitPacket.PacketType == protocol.PacketTypeInitial { utils.Debugf("Skipping retransmission of packet %d. Already received a response to an Initial.", retransmitPacket.PacketNumber) - retransmitPacket = nil continue } break diff --git a/session_test.go b/session_test.go index 019f65621..18fed4e79 100644 --- a/session_test.go +++ b/session_test.go @@ -754,10 +754,10 @@ var _ = Describe("Session", func() { PacketNumber: 10, PacketType: protocol.PacketTypeHandshake, }) - sph.EXPECT().DequeuePacketForRetransmission() - sph.EXPECT().SendingAllowed().Return(true).Times(2) - sph.EXPECT().SendingAllowed() + sph.EXPECT().SendMode().Return(ackhandler.SendRetransmission) + sph.EXPECT().SendMode().Return(ackhandler.SendAny) sph.EXPECT().ShouldSendNumPackets().Return(2) + sph.EXPECT().TimeUntilSend() sph.EXPECT().GetStopWaitingFrame(gomock.Any()).Return(&wire.StopWaitingFrame{}) sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { // retransmitted packet @@ -773,6 +773,14 @@ var _ = Describe("Session", func() { err := sess.sendPackets() Expect(err).ToNot(HaveOccurred()) }) + + It("doesn't send when the SentPacketHandler doesn't allow it", func() { + sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) + sph.EXPECT().SendMode().Return(ackhandler.SendNone) + sess.sentPacketHandler = sph + err := sess.sendPackets() + Expect(err).ToNot(HaveOccurred()) + }) }) Context("packet pacing", func() { @@ -792,12 +800,11 @@ var _ = Describe("Session", func() { sph.EXPECT().SentPacket(gomock.Any()).Times(2) sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2) sph.EXPECT().TimeUntilSend().Return(time.Now()).Times(2) - sph.EXPECT().SendingAllowed().Do(func() { + sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) + sph.EXPECT().SendMode().Return(ackhandler.SendAny).Do(func() { // make sure there's something to send sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1}) - }).Return(true).Times(3) // allow 2 packets... - // ...then report that we're congestion limited - sph.EXPECT().SendingAllowed() + }).Times(2) // allow 2 packets... done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -812,6 +819,29 @@ var _ = Describe("Session", func() { Eventually(done).Should(BeClosed()) }) + // when becoming congestion limited, at some point the SendMode will change from SendAny to SendAck + // we shouldn't send the ACK in the same run + It("doesn't send an ACK right after becoming congestion limited", func() { + sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1}) + sph.EXPECT().SentPacket(gomock.Any()) + sph.EXPECT().ShouldSendNumPackets().Return(1000) + sph.EXPECT().TimeUntilSend().Return(time.Now()) + sph.EXPECT().SendMode().Return(ackhandler.SendAny) + sph.EXPECT().SendMode().Return(ackhandler.SendAck) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + sess.run() + close(done) + }() + sess.scheduleSending() + Eventually(mconn.written).Should(HaveLen(1)) + Consistently(mconn.written).Should(HaveLen(1)) + // make the go routine return + sess.Close(nil) + Eventually(done).Should(BeClosed()) + }) + It("paces packets", func() { pacingDelay := scaleDuration(100 * time.Millisecond) sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1}) @@ -820,10 +850,10 @@ var _ = Describe("Session", func() { sph.EXPECT().TimeUntilSend().Return(time.Now().Add(pacingDelay)) // send one sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) sph.EXPECT().ShouldSendNumPackets().Times(2).Return(1) - sph.EXPECT().SendingAllowed().Do(func() { // after sending the first packet + sph.EXPECT().SendMode().Return(ackhandler.SendAny).Do(func() { // after sending the first packet // make sure there's something to send sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 2}) - }).Return(true).AnyTimes() + }).AnyTimes() done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -844,10 +874,10 @@ var _ = Describe("Session", func() { sph.EXPECT().ShouldSendNumPackets().Return(3) sph.EXPECT().TimeUntilSend().Return(time.Now()) sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) - sph.EXPECT().SendingAllowed().Do(func() { + sph.EXPECT().SendMode().Return(ackhandler.SendAny).Do(func() { // make sure there's something to send sess.packer.QueueControlFrame(&wire.MaxDataFrame{ByteOffset: 1}) - }).Return(true).Times(4) + }).Times(3) done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -864,7 +894,7 @@ var _ = Describe("Session", func() { It("doesn't set a pacing timer when there is no data to send", func() { sph.EXPECT().TimeUntilSend().Return(time.Now()) sph.EXPECT().ShouldSendNumPackets().Return(1) - sph.EXPECT().SendingAllowed().Return(true).AnyTimes() + sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -896,7 +926,8 @@ var _ = Describe("Session", func() { sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph.EXPECT().GetPacketNumberLen(gomock.Any()).Return(protocol.PacketNumberLen2).AnyTimes() sph.EXPECT().GetAlarmTimeout().AnyTimes() - sph.EXPECT().SendingAllowed() + sph.EXPECT().SendMode().Return(ackhandler.SendAck) + sph.EXPECT().ShouldSendNumPackets().Return(1000) sph.EXPECT().GetStopWaitingFrame(false).Return(swf) sph.EXPECT().TimeUntilSend() sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { @@ -927,7 +958,8 @@ var _ = Describe("Session", func() { sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph.EXPECT().GetPacketNumberLen(gomock.Any()).Return(protocol.PacketNumberLen2).AnyTimes() sph.EXPECT().GetAlarmTimeout().AnyTimes() - sph.EXPECT().SendingAllowed() + sph.EXPECT().SendMode().Return(ackhandler.SendAck) + sph.EXPECT().ShouldSendNumPackets().Return(1000) sph.EXPECT().TimeUntilSend() sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { Expect(p.Frames).To(HaveLen(1)) @@ -1105,22 +1137,13 @@ var _ = Describe("Session", func() { It("sends when scheduleSending is called", func() { sess.packer.packetNumberGenerator.next = 10000 - f := &wire.StreamFrame{ - StreamID: 0x5, - Data: []byte("foobar"), - } + sess.packer.QueueControlFrame(&wire.BlockedFrame{}) sph := mockackhandler.NewMockSentPacketHandler(mockCtrl) sph.EXPECT().GetAlarmTimeout().AnyTimes() sph.EXPECT().TimeUntilSend().AnyTimes() - sph.EXPECT().SendingAllowed().AnyTimes().Return(true) + sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() sph.EXPECT().ShouldSendNumPackets().AnyTimes().Return(1) sph.EXPECT().GetPacketNumberLen(gomock.Any()).Return(protocol.PacketNumberLen2).AnyTimes() - sph.EXPECT().GetStopWaitingFrame(true).Return(&wire.StopWaitingFrame{LeastUnacked: 10}) - sph.EXPECT().DequeuePacketForRetransmission().Return(&ackhandler.Packet{ - PacketNumber: 0x1337, - Frames: []wire.Frame{f}, - EncryptionLevel: protocol.EncryptionForwardSecure, - }) sph.EXPECT().SentPacket(gomock.Any()) sess.sentPacketHandler = sph @@ -1144,8 +1167,7 @@ var _ = Describe("Session", func() { sph.EXPECT().TimeUntilSend().Return(time.Now()) sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)) sph.EXPECT().GetAlarmTimeout().AnyTimes() - sph.EXPECT().SendingAllowed().Return(true).AnyTimes() - sph.EXPECT().DequeuePacketForRetransmission() + sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() sph.EXPECT().GetStopWaitingFrame(gomock.Any()) sph.EXPECT().ShouldSendNumPackets().Return(1) sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) {