From 22b12f199ecf2655c7112e56adaedd80a1003e92 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 1 Sep 2019 13:59:44 +0700 Subject: [PATCH 1/2] replace locally closed sessions with a dedicated closed session --- closed_session.go | 92 +++++++++++++++ closed_session_test.go | 53 +++++++++ mock_packet_handler_manager_test.go | 12 ++ mock_session_runner_test.go | 12 ++ packet_handler_map.go | 20 +++- server.go | 2 + session.go | 40 ++----- session_test.go | 174 ++++++++++------------------ 8 files changed, 259 insertions(+), 146 deletions(-) create mode 100644 closed_session.go create mode 100644 closed_session_test.go diff --git a/closed_session.go b/closed_session.go new file mode 100644 index 00000000..1ab000bf --- /dev/null +++ b/closed_session.go @@ -0,0 +1,92 @@ +package quic + +import ( + "sync" + + "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/utils" +) + +type closedLocalSession struct { + conn connection + connClosePacket []byte + + closeOnce sync.Once + closeChan chan struct{} // is closed when the session is closed or destroyed + + receivedPackets chan *receivedPacket + counter uint64 // number of packets received + + perspective protocol.Perspective + + logger utils.Logger +} + +var _ packetHandler = &closedLocalSession{} + +// newClosedLocalSession creates a new closedLocalSession and runs it. +func newClosedLocalSession( + conn connection, + connClosePacket []byte, + perspective protocol.Perspective, + logger utils.Logger, +) packetHandler { + s := &closedLocalSession{ + conn: conn, + connClosePacket: connClosePacket, + perspective: perspective, + logger: logger, + closeChan: make(chan struct{}), + receivedPackets: make(chan *receivedPacket, 64), + } + go s.run() + return s +} + +func (s *closedLocalSession) run() { + for { + select { + case p := <-s.receivedPackets: + s.handlePacketImpl(p) + case <-s.closeChan: + return + } + } +} + +func (s *closedLocalSession) handlePacket(p *receivedPacket) { + select { + case s.receivedPackets <- p: + default: + } +} + +func (s *closedLocalSession) handlePacketImpl(p *receivedPacket) { + s.counter++ + // exponential backoff + // only send a CONNECTION_CLOSE for the 1st, 2nd, 4th, 8th, 16th, ... packet arriving + for n := s.counter; n > 1; n = n / 2 { + if n%2 != 0 { + return + } + } + s.logger.Debugf("Received %d packets after sending CONNECTION_CLOSE. Retransmitting.", s.counter) + if err := s.conn.Write(s.connClosePacket); err != nil { + s.logger.Debugf("Error retransmitting CONNECTION_CLOSE: %s", err) + } +} + +func (s *closedLocalSession) Close() error { + s.destroy(nil) + return nil +} + +func (s *closedLocalSession) destroy(error) { + s.closeOnce.Do(func() { + close(s.closeChan) + }) +} + +func (s *closedLocalSession) getPerspective() protocol.Perspective { + return s.perspective +} diff --git a/closed_session_test.go b/closed_session_test.go new file mode 100644 index 00000000..9cabfeec --- /dev/null +++ b/closed_session_test.go @@ -0,0 +1,53 @@ +package quic + +import ( + "errors" + "time" + + "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/utils" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Closed local session", func() { + var ( + sess packetHandler + mconn *mockConnection + ) + + BeforeEach(func() { + mconn = newMockConnection() + sess = newClosedLocalSession(mconn, []byte("close"), protocol.PerspectiveClient, utils.DefaultLogger) + }) + + AfterEach(func() { + Eventually(areClosedSessionsRunning).Should(BeFalse()) + }) + + It("tells its perspective", func() { + Expect(sess.getPerspective()).To(Equal(protocol.PerspectiveClient)) + // stop the session + Expect(sess.Close()).To(Succeed()) + }) + + It("repeats the packet containing the CONNECTION_CLOSE frame", func() { + for i := 1; i <= 20; i++ { + sess.handlePacket(&receivedPacket{}) + if i == 1 || i == 2 || i == 4 || i == 8 || i == 16 { + Eventually(mconn.written).Should(Receive(Equal([]byte("close")))) // receive the CONNECTION_CLOSE + } else { + Consistently(mconn.written, 10*time.Millisecond).Should(HaveLen(0)) + } + } + // stop the session + Expect(sess.Close()).To(Succeed()) + }) + + It("destroys sessions", func() { + Expect(areClosedSessionsRunning()).To(BeTrue()) + sess.destroy(errors.New("destroy")) + Eventually(areClosedSessionsRunning).Should(BeFalse()) + }) +}) diff --git a/mock_packet_handler_manager_test.go b/mock_packet_handler_manager_test.go index 3793d270..8d057df6 100644 --- a/mock_packet_handler_manager_test.go +++ b/mock_packet_handler_manager_test.go @@ -122,6 +122,18 @@ func (mr *MockPacketHandlerManagerMockRecorder) RemoveResetToken(arg0 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockPacketHandlerManager)(nil).RemoveResetToken), arg0) } +// ReplaceWithClosed mocks base method +func (m *MockPacketHandlerManager) ReplaceWithClosed(arg0 protocol.ConnectionID, arg1 packetHandler) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ReplaceWithClosed", arg0, arg1) +} + +// ReplaceWithClosed indicates an expected call of ReplaceWithClosed +func (mr *MockPacketHandlerManagerMockRecorder) ReplaceWithClosed(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplaceWithClosed", reflect.TypeOf((*MockPacketHandlerManager)(nil).ReplaceWithClosed), arg0, arg1) +} + // Retire mocks base method func (m *MockPacketHandlerManager) Retire(arg0 protocol.ConnectionID) { m.ctrl.T.Helper() diff --git a/mock_session_runner_test.go b/mock_session_runner_test.go index 9bb54a05..d9f5d3f4 100644 --- a/mock_session_runner_test.go +++ b/mock_session_runner_test.go @@ -70,6 +70,18 @@ func (mr *MockSessionRunnerMockRecorder) RemoveResetToken(arg0 interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveResetToken", reflect.TypeOf((*MockSessionRunner)(nil).RemoveResetToken), arg0) } +// ReplaceWithClosed mocks base method +func (m *MockSessionRunner) ReplaceWithClosed(arg0 protocol.ConnectionID, arg1 packetHandler) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ReplaceWithClosed", arg0, arg1) +} + +// ReplaceWithClosed indicates an expected call of ReplaceWithClosed +func (mr *MockSessionRunnerMockRecorder) ReplaceWithClosed(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplaceWithClosed", reflect.TypeOf((*MockSessionRunner)(nil).ReplaceWithClosed), arg0, arg1) +} + // Retire mocks base method func (m *MockSessionRunner) Retire(arg0 protocol.ConnectionID) { m.ctrl.T.Helper() diff --git a/packet_handler_map.go b/packet_handler_map.go index 57a77f7a..600f752d 100644 --- a/packet_handler_map.go +++ b/packet_handler_map.go @@ -65,18 +65,30 @@ func newPacketHandlerMap( func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler) { h.mutex.Lock() - h.handlers[string(id)] = handler + h.addLocked(id, handler) h.mutex.Unlock() } +func (h *packetHandlerMap) addLocked(id protocol.ConnectionID, handler packetHandler) { + h.handlers[string(id)] = handler +} + func (h *packetHandlerMap) Remove(id protocol.ConnectionID) { + h.mutex.Lock() h.removeByConnectionIDAsString(string(id)) + h.mutex.Unlock() +} + +func (h *packetHandlerMap) ReplaceWithClosed(id protocol.ConnectionID, handler packetHandler) { + h.mutex.Lock() + h.removeByConnectionIDAsString(string(id)) + h.addLocked(id, handler) + h.mutex.Unlock() + h.retireByConnectionIDAsString(string(id)) } func (h *packetHandlerMap) removeByConnectionIDAsString(id string) { - h.mutex.Lock() delete(h.handlers, id) - h.mutex.Unlock() } func (h *packetHandlerMap) Retire(id protocol.ConnectionID) { @@ -85,7 +97,9 @@ func (h *packetHandlerMap) Retire(id protocol.ConnectionID) { func (h *packetHandlerMap) retireByConnectionIDAsString(id string) { time.AfterFunc(h.deleteRetiredSessionsAfter, func() { + h.mutex.Lock() h.removeByConnectionIDAsString(id) + h.mutex.Unlock() }) } diff --git a/server.go b/server.go index 1e713cd6..91d375a5 100644 --- a/server.go +++ b/server.go @@ -37,6 +37,7 @@ type packetHandlerManager interface { Add(protocol.ConnectionID, packetHandler) Retire(protocol.ConnectionID) Remove(protocol.ConnectionID) + ReplaceWithClosed(protocol.ConnectionID, packetHandler) AddResetToken([16]byte, packetHandler) RemoveResetToken([16]byte) GetStatelessResetToken(protocol.ConnectionID) [16]byte @@ -59,6 +60,7 @@ type quicSession interface { type sessionRunner interface { Retire(protocol.ConnectionID) Remove(protocol.ConnectionID) + ReplaceWithClosed(protocol.ConnectionID, packetHandler) AddResetToken([16]byte, packetHandler) RemoveResetToken([16]byte) } diff --git a/session.go b/session.go index 19e2cc3a..c0ccbdcb 100644 --- a/session.go +++ b/session.go @@ -132,11 +132,8 @@ type session struct { sendingScheduled chan struct{} closeOnce sync.Once - closed utils.AtomicBool // closeChan is used to notify the run loop that it should terminate - closeChan chan closeError - connectionClosePacket *packedPacket - packetsReceivedAfterClose int + closeChan chan closeError ctx context.Context ctxCancel context.CancelFunc @@ -487,7 +484,6 @@ runLoop: } s.handleCloseError(closeErr) - s.closed.Set(true) s.logger.Infof("Connection %s closed.", s.srcConnID) s.cryptoStreamHandler.Close() s.sendQueue.Close() @@ -803,9 +799,6 @@ func (s *session) handleFrame(f wire.Frame, pn protocol.PacketNumber, encLevel p // handlePacket is called by the server with a new packet func (s *session) handlePacket(p *receivedPacket) { - if s.closed.Get() { - s.handlePacketAfterClosed(p) - } // Discard packets once the amount of queued packets is larger than // the channel size, protocol.MaxSessionUnprocessedPackets select { @@ -814,24 +807,6 @@ func (s *session) handlePacket(p *receivedPacket) { } } -func (s *session) handlePacketAfterClosed(p *receivedPacket) { - s.packetsReceivedAfterClose++ - if s.connectionClosePacket == nil { - return - } - // exponential backoff - // only send a CONNECTION_CLOSE for the 1st, 2nd, 4th, 8th, 16th, ... packet arriving - for n := s.packetsReceivedAfterClose; n > 1; n = n / 2 { - if n%2 != 0 { - return - } - } - s.logger.Debugf("Received %d packets after sending CONNECTION_CLOSE. Retransmitting.", s.packetsReceivedAfterClose) - if err := s.conn.Write(s.connectionClosePacket.raw); err != nil { - s.logger.Debugf("Error retransmitting CONNECTION_CLOSE: %s", err) - } -} - func (s *session) handleConnectionCloseFrame(frame *wire.ConnectionCloseFrame) { var e error if frame.IsApplicationError { @@ -946,7 +921,6 @@ func (s *session) closeLocal(e error) { } else { s.logger.Errorf("Closing session with error: %s", e) } - s.sessionRunner.Retire(s.srcConnID) s.closeChan <- closeError{err: e, sendClose: true, remote: false} }) } @@ -1019,9 +993,12 @@ func (s *session) handleCloseError(closeErr closeError) { if closeErr.remote { return } - if err := s.sendConnectionClose(quicErr); err != nil { + connClosePacket, err := s.sendConnectionClose(quicErr) + if err != nil { s.logger.Debugf("Error sending CONNECTION_CLOSE: %s", err) } + cs := newClosedLocalSession(s.conn, connClosePacket, s.perspective, s.logger) + s.sessionRunner.ReplaceWithClosed(s.srcConnID, cs) } func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) { @@ -1206,7 +1183,7 @@ func (s *session) sendPackedPacket(packet *packedPacket) { s.sendQueue.Send(packet) } -func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error { +func (s *session) sendConnectionClose(quicErr *qerr.QuicError) ([]byte, error) { var reason string // don't send details of crypto errors if !quicErr.IsCryptoError() { @@ -1217,11 +1194,10 @@ func (s *session) sendConnectionClose(quicErr *qerr.QuicError) error { ReasonPhrase: reason, }) if err != nil { - return err + return nil, err } - s.connectionClosePacket = packet s.logPacket(packet) - return s.conn.Write(packet.raw) + return packet.raw, s.conn.Write(packet.raw) } func (s *session) logPacket(packet *packedPacket) { diff --git a/session_test.go b/session_test.go index 1c5ed3ef..5c92a4b2 100644 --- a/session_test.go +++ b/session_test.go @@ -64,6 +64,12 @@ func areSessionsRunning() bool { return strings.Contains(b.String(), "quic-go.(*session).run") } +func areClosedSessionsRunning() bool { + var b bytes.Buffer + pprof.Lookup("goroutine").WriteTo(&b, 1) + return strings.Contains(b.String(), "quic-go.(*closedLocalSession).run") +} + var _ = Describe("Session", func() { var ( sess *session @@ -85,6 +91,13 @@ var _ = Describe("Session", func() { } } + expectReplaceWithClosed := func() { + sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) { + Expect(s.Close()).To(Succeed()) + Eventually(areClosedSessionsRunning).Should(BeFalse()) + }) + } + BeforeEach(func() { Eventually(areSessionsRunning).Should(BeFalse()) @@ -404,7 +417,7 @@ var _ = Describe("Session", func() { It("shuts down without error", func() { streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, "")) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() cryptoSetup.EXPECT().Close() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{raw: []byte("connection close")}, nil) Expect(sess.Close()).To(Succeed()) @@ -416,7 +429,7 @@ var _ = Describe("Session", func() { It("only closes once", func() { streamManager.EXPECT().CloseWithError(qerr.Error(qerr.NoError, "")) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() cryptoSetup.EXPECT().Close() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) Expect(sess.Close()).To(Succeed()) @@ -429,7 +442,7 @@ var _ = Describe("Session", func() { It("closes streams with proper error", func() { testErr := errors.New("test error") streamManager.EXPECT().CloseWithError(qerr.Error(0x1337, testErr.Error())) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() cryptoSetup.EXPECT().Close() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) sess.CloseWithError(0x1337, testErr.Error()) @@ -460,7 +473,7 @@ var _ = Describe("Session", func() { It("cancels the context when the run loop exists", func() { streamManager.EXPECT().CloseWithError(gomock.Any()) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() cryptoSetup.EXPECT().Close() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) returned := make(chan struct{}) @@ -475,24 +488,6 @@ var _ = Describe("Session", func() { sess.Close() Eventually(returned).Should(BeClosed()) }) - - It("retransmits the CONNECTION_CLOSE packet if packets are arriving late", func() { - streamManager.EXPECT().CloseWithError(gomock.Any()) - sessionRunner.EXPECT().Retire(gomock.Any()) - cryptoSetup.EXPECT().Close() - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{raw: []byte("foobar")}, nil) - sess.Close() - Expect(mconn.written).To(Receive(Equal([]byte("foobar")))) // receive the CONNECTION_CLOSE - Eventually(sess.Context().Done()).Should(BeClosed()) - for i := 1; i <= 20; i++ { - sess.handlePacket(&receivedPacket{}) - if i == 1 || i == 2 || i == 4 || i == 8 || i == 16 { - Expect(mconn.written).To(Receive(Equal([]byte("foobar")))) // receive the CONNECTION_CLOSE - } else { - Expect(mconn.written).To(HaveLen(0)) - } - } - }) }) Context("receiving packets", func() { @@ -574,7 +569,7 @@ var _ = Describe("Session", func() { cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) sess.run() }() - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() sess.handlePacket(getPacket(&wire.ExtendedHeader{ Header: wire.Header{DestConnectionID: sess.srcConnID}, PacketNumberLen: protocol.PacketNumberLen1, @@ -599,7 +594,7 @@ var _ = Describe("Session", func() { Expect(err.(*qerr.QuicError).ErrorCode).To(Equal(qerr.ProtocolViolation)) close(done) }() - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() sess.handlePacket(getPacket(&wire.ExtendedHeader{ Header: wire.Header{DestConnectionID: sess.srcConnID}, PacketNumberLen: protocol.PacketNumberLen1, @@ -619,7 +614,7 @@ var _ = Describe("Session", func() { cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) runErr <- sess.run() }() - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() sess.handlePacket(getPacket(&wire.ExtendedHeader{ Header: wire.Header{DestConnectionID: sess.srcConnID}, PacketNumberLen: protocol.PacketNumberLen1, @@ -646,7 +641,7 @@ var _ = Describe("Session", func() { Expect(err).To(MatchError("PROTOCOL_VIOLATION: empty packet")) close(done) }() - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() sess.handlePacket(getPacket(&wire.ExtendedHeader{ Header: wire.Header{DestConnectionID: sess.srcConnID}, PacketNumberLen: protocol.PacketNumberLen1, @@ -847,7 +842,7 @@ var _ = Describe("Session", func() { AfterEach(func() { streamManager.EXPECT().CloseWithError(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() cryptoSetup.EXPECT().Close() sess.Close() Eventually(sess.Context().Done()).Should(BeClosed()) @@ -946,6 +941,15 @@ var _ = Describe("Session", func() { streamManager.EXPECT().CloseWithError(gomock.Any()) }) + AfterEach(func() { + // make the go routine return + packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) + expectReplaceWithClosed() + cryptoSetup.EXPECT().Close() + Expect(sess.Close()).To(Succeed()) + Eventually(sess.Context().Done()).Should(BeClosed()) + }) + It("sends multiple packets one by one immediately", func() { sph.EXPECT().SentPacket(gomock.Any()).Times(2) sph.EXPECT().ShouldSendNumPackets().Return(1).Times(2) @@ -954,22 +958,14 @@ var _ = Describe("Session", func() { sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(2) // allow 2 packets... packer.EXPECT().PackPacket().Return(getPacket(10), nil) packer.EXPECT().PackPacket().Return(getPacket(11), nil) - done := make(chan struct{}) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) sess.run() - close(done) }() sess.scheduleSending() Eventually(mconn.written).Should(HaveLen(2)) Consistently(mconn.written).Should(HaveLen(2)) - // make the go routine return - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) }) // when becoming congestion limited, at some point the SendMode will change from SendAny to SendAck @@ -981,22 +977,14 @@ var _ = Describe("Session", func() { sph.EXPECT().SendMode().Return(ackhandler.SendAny) sph.EXPECT().SendMode().Return(ackhandler.SendAck) packer.EXPECT().PackPacket().Return(getPacket(100), nil) - done := make(chan struct{}) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) sess.run() - close(done) }() sess.scheduleSending() Eventually(mconn.written).Should(HaveLen(1)) Consistently(mconn.written).Should(HaveLen(1)) - // make the go routine return - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) }) It("paces packets", func() { @@ -1009,23 +997,15 @@ var _ = Describe("Session", func() { sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() packer.EXPECT().PackPacket().Return(getPacket(100), nil) packer.EXPECT().PackPacket().Return(getPacket(101), nil) - done := make(chan struct{}) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) sess.run() - close(done) }() sess.scheduleSending() Eventually(mconn.written).Should(HaveLen(1)) Consistently(mconn.written, pacingDelay/2).Should(HaveLen(1)) Eventually(mconn.written, 2*pacingDelay).Should(HaveLen(2)) - // make the go routine return - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) }) It("sends multiple packets at once", func() { @@ -1037,21 +1017,13 @@ var _ = Describe("Session", func() { packer.EXPECT().PackPacket().Return(getPacket(1000), nil) packer.EXPECT().PackPacket().Return(getPacket(1001), nil) packer.EXPECT().PackPacket().Return(getPacket(1002), nil) - done := make(chan struct{}) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) sess.run() - close(done) }() sess.scheduleSending() Eventually(mconn.written).Should(HaveLen(3)) - // make the go routine return - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) }) It("doesn't set a pacing timer when there is no data to send", func() { @@ -1059,21 +1031,13 @@ var _ = Describe("Session", func() { sph.EXPECT().ShouldSendNumPackets().Return(1) sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() packer.EXPECT().PackPacket() - done := make(chan struct{}) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) sess.run() - close(done) }() sess.scheduleSending() // no packet will get sent Consistently(mconn.written).ShouldNot(Receive()) - // make the go routine return - sessionRunner.EXPECT().Retire(gomock.Any()) - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) }) }) @@ -1097,7 +1061,7 @@ var _ = Describe("Session", func() { sess.scheduleSending() Eventually(mconn.written).Should(Receive()) // make the go routine return - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() streamManager.EXPECT().CloseWithError(gomock.Any()) packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() @@ -1131,7 +1095,7 @@ var _ = Describe("Session", func() { Eventually(mconn.written).Should(Receive()) // make sure the go routine returns packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() streamManager.EXPECT().CloseWithError(gomock.Any()) cryptoSetup.EXPECT().Close() sess.Close() @@ -1153,9 +1117,9 @@ var _ = Describe("Session", func() { Consistently(handshakeCtx.Done()).ShouldNot(BeClosed()) close(finishHandshake) Eventually(handshakeCtx.Done()).Should(BeClosed()) - //make sure the go routine returns + // make sure the go routine returns streamManager.EXPECT().CloseWithError(gomock.Any()) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() Expect(sess.Close()).To(Succeed()) @@ -1165,7 +1129,7 @@ var _ = Describe("Session", func() { It("doesn't cancel the HandshakeComplete context when the handshake fails", func() { packer.EXPECT().PackPacket().AnyTimes() streamManager.EXPECT().CloseWithError(gomock.Any()) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() go func() { @@ -1197,9 +1161,9 @@ var _ = Describe("Session", func() { sess.run() }() Eventually(done).Should(BeClosed()) - //make sure the go routine returns + // make sure the go routine returns streamManager.EXPECT().CloseWithError(gomock.Any()) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() Expect(sess.Close()).To(Succeed()) @@ -1215,7 +1179,7 @@ var _ = Describe("Session", func() { close(done) }() streamManager.EXPECT().CloseWithError(gomock.Any()) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() Expect(sess.Close()).To(Succeed()) @@ -1233,7 +1197,7 @@ var _ = Describe("Session", func() { close(done) }() streamManager.EXPECT().CloseWithError(gomock.Any()) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() Expect(sess.CloseWithError(0x1337, testErr.Error())).To(Succeed()) @@ -1250,7 +1214,7 @@ var _ = Describe("Session", func() { Expect(err.Error()).To(ContainSubstring("transport parameter")) }() streamManager.EXPECT().CloseWithError(gomock.Any()) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() sess.processTransportParameters([]byte("invalid")) @@ -1278,7 +1242,7 @@ var _ = Describe("Session", func() { // make the go routine return streamManager.EXPECT().CloseWithError(gomock.Any()) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() sess.Close() @@ -1295,6 +1259,16 @@ var _ = Describe("Session", func() { sess.peerParams = &handshake.TransportParameters{IdleTimeout: remoteIdleTimeout} }) + AfterEach(func() { + // make the go routine return + expectReplaceWithClosed() + streamManager.EXPECT().CloseWithError(gomock.Any()) + packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) + cryptoSetup.EXPECT().Close() + Expect(sess.Close()).To(Succeed()) + Eventually(sess.Context().Done()).Should(BeClosed()) + }) + It("sends a PING as a keep-alive", func() { sess.handshakeComplete = true sess.config.KeepAlive = true @@ -1304,63 +1278,36 @@ var _ = Describe("Session", func() { close(sent) return nil, nil }) - done := make(chan struct{}) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) sess.run() - close(done) }() Eventually(sent).Should(BeClosed()) - // make the go routine return - sessionRunner.EXPECT().Retire(gomock.Any()) - streamManager.EXPECT().CloseWithError(gomock.Any()) - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) }) It("doesn't send a PING packet if keep-alive is disabled", func() { sess.handshakeComplete = true sess.config.KeepAlive = false sess.lastPacketReceivedTime = time.Now().Add(-remoteIdleTimeout / 2) - done := make(chan struct{}) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) sess.run() - close(done) }() Consistently(mconn.written).ShouldNot(Receive()) - // make the go routine return - sessionRunner.EXPECT().Retire(gomock.Any()) - streamManager.EXPECT().CloseWithError(gomock.Any()) - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) }) It("doesn't send a PING if the handshake isn't completed yet", func() { sess.handshakeComplete = false sess.config.KeepAlive = true sess.lastPacketReceivedTime = time.Now().Add(-remoteIdleTimeout / 2) - done := make(chan struct{}) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) sess.run() - close(done) }() Consistently(mconn.written).ShouldNot(Receive()) - // make the go routine return - sessionRunner.EXPECT().Retire(gomock.Any()) - streamManager.EXPECT().CloseWithError(gomock.Any()) - packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - cryptoSetup.EXPECT().Close() - sess.Close() - Eventually(done).Should(BeClosed()) }) }) @@ -1422,7 +1369,7 @@ var _ = Describe("Session", func() { }() Consistently(sess.Context().Done()).ShouldNot(BeClosed()) // make the go routine return - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() cryptoSetup.EXPECT().Close() sess.Close() Eventually(sess.Context().Done()).Should(BeClosed()) @@ -1461,7 +1408,7 @@ var _ = Describe("Session", func() { Consistently(sess.Context().Done()).ShouldNot(BeClosed()) // make the go routine return packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() cryptoSetup.EXPECT().Close() sess.Close() Eventually(sess.Context().Done()).Should(BeClosed()) @@ -1563,6 +1510,13 @@ var _ = Describe("Client Session", func() { } } + expectReplaceWithClosed := func() { + sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) { + Expect(s.Close()).To(Succeed()) + Eventually(areClosedSessionsRunning).Should(BeFalse()) + }) + } + BeforeEach(func() { quicConf = populateClientConfig(&Config{}, true) }) @@ -1625,7 +1579,7 @@ var _ = Describe("Client Session", func() { }, []byte{0}))).To(BeTrue()) // make sure the go routine returns packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() cryptoSetup.EXPECT().Close() Expect(sess.Close()).To(Succeed()) Eventually(sess.Context().Done()).Should(BeClosed()) @@ -1707,8 +1661,7 @@ var _ = Describe("Client Session", func() { Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("transport parameter")) }() - // streamManager.EXPECT().CloseWithError(gomock.Any()) - sessionRunner.EXPECT().Retire(gomock.Any()) + expectReplaceWithClosed() packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&packedPacket{}, nil) cryptoSetup.EXPECT().Close() sess.processTransportParameters([]byte("invalid")) @@ -1803,7 +1756,6 @@ var _ = Describe("Client Session", func() { // Illustrates that an injected Initial with an ACK frame for an unsent packet causes // the connection to immediately break down It("fails on Initial-level ACK for unsent packet", func() { - sessionRunner.EXPECT().Retire(gomock.Any()) ackFrame := testutils.ComposeAckFrame(0, 0) initialPacket := testutils.ComposeInitialPacket(sess.destConnID, sess.srcConnID, sess.version, sess.destConnID, []wire.Frame{ackFrame}) Expect(sess.handlePacketImpl(wrapPacket(initialPacket))).To(BeFalse()) From b8c5ed2a146a6cd90c50fa14f144e13e922780cd Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 1 Sep 2019 14:11:44 +0700 Subject: [PATCH 2/2] ignore reordered packets for remote closed sessions --- closed_session.go | 21 +++++++++++++++++++++ session.go | 2 +- session_test.go | 11 ++++++++--- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/closed_session.go b/closed_session.go index 1ab000bf..3a0728f1 100644 --- a/closed_session.go +++ b/closed_session.go @@ -7,6 +7,9 @@ import ( "github.com/lucas-clemente/quic-go/internal/utils" ) +// A closedLocalSession is a session that we closed locally. +// When receiving packets for such a session, we need to retransmit the packet containing the CONNECTION_CLOSE frame, +// with an exponential backoff. type closedLocalSession struct { conn connection connClosePacket []byte @@ -90,3 +93,21 @@ func (s *closedLocalSession) destroy(error) { func (s *closedLocalSession) getPerspective() protocol.Perspective { return s.perspective } + +// A closedRemoteSession is a session that was closed remotely. +// For such a session, we might receive reordered packets that were sent before the CONNECTION_CLOSE. +// We can just ignore those packets. +type closedRemoteSession struct { + perspective protocol.Perspective +} + +var _ packetHandler = &closedRemoteSession{} + +func newClosedRemoteSession(pers protocol.Perspective) packetHandler { + return &closedRemoteSession{perspective: pers} +} + +func (s *closedRemoteSession) handlePacket(*receivedPacket) {} +func (s *closedRemoteSession) Close() error { return nil } +func (s *closedRemoteSession) destroy(error) {} +func (s *closedRemoteSession) getPerspective() protocol.Perspective { return s.perspective } diff --git a/session.go b/session.go index c0ccbdcb..baf2ad85 100644 --- a/session.go +++ b/session.go @@ -954,7 +954,7 @@ func (s *session) closeForRecreating() protocol.PacketNumber { func (s *session) closeRemote(e error) { s.closeOnce.Do(func() { s.logger.Errorf("Peer closed session with error: %s", e) - s.sessionRunner.Remove(s.srcConnID) + s.sessionRunner.ReplaceWithClosed(s.srcConnID, newClosedRemoteSession(s.perspective)) s.closeChan <- closeError{err: e, remote: true} }) } diff --git a/session_test.go b/session_test.go index 5c92a4b2..c5dd26e9 100644 --- a/session_test.go +++ b/session_test.go @@ -93,6 +93,7 @@ var _ = Describe("Session", func() { expectReplaceWithClosed := func() { sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) { + Expect(s).To(BeAssignableToTypeOf(&closedLocalSession{})) Expect(s.Close()).To(Succeed()) Eventually(areClosedSessionsRunning).Should(BeFalse()) }) @@ -350,7 +351,9 @@ var _ = Describe("Session", func() { It("handles CONNECTION_CLOSE frames, with a transport error code", func() { testErr := qerr.Error(qerr.StreamLimitError, "foobar") streamManager.EXPECT().CloseWithError(testErr) - sessionRunner.EXPECT().Remove(gomock.Any()) + sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) { + Expect(s).To(BeAssignableToTypeOf(&closedRemoteSession{})) + }) cryptoSetup.EXPECT().Close() go func() { @@ -369,7 +372,9 @@ var _ = Describe("Session", func() { It("handles CONNECTION_CLOSE frames, with an application error code", func() { testErr := qerr.ApplicationError(0x1337, "foobar") streamManager.EXPECT().CloseWithError(testErr) - sessionRunner.EXPECT().Remove(gomock.Any()) + sessionRunner.EXPECT().ReplaceWithClosed(sess.srcConnID, gomock.Any()).Do(func(_ protocol.ConnectionID, s packetHandler) { + Expect(s).To(BeAssignableToTypeOf(&closedRemoteSession{})) + }) cryptoSetup.EXPECT().Close() go func() { @@ -1764,7 +1769,7 @@ var _ = Describe("Client Session", func() { // Illustrates that an injected Initial with a CONNECTION_CLOSE frame causes // the connection to immediately break down It("fails on Initial-level CONNECTION_CLOSE frame", func() { - sessionRunner.EXPECT().Remove(gomock.Any()) + sessionRunner.EXPECT().ReplaceWithClosed(gomock.Any(), gomock.Any()) connCloseFrame := testutils.ComposeConnCloseFrame() initialPacket := testutils.ComposeInitialPacket(sess.destConnID, sess.srcConnID, sess.version, sess.destConnID, []wire.Frame{connCloseFrame}) Expect(sess.handlePacketImpl(wrapPacket(initialPacket))).To(BeTrue())