From e629a12d06438a7c8ef11a584257baad39f9e522 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 24 Jun 2025 21:20:23 +0800 Subject: [PATCH] drain server accept queue when the transport is closed (#5237) * drain server queue on transport close * add integration test for clearing conn queue * improve documentation and test for Transport.Close * move to handshake_test.go --------- Co-authored-by: sukun --- integrationtests/self/handshake_test.go | 66 +++++++++++++++++++++++++ server.go | 19 +++++-- transport.go | 11 +++-- 3 files changed, 90 insertions(+), 6 deletions(-) diff --git a/integrationtests/self/handshake_test.go b/integrationtests/self/handshake_test.go index c701c6711..3e2ddf91b 100644 --- a/integrationtests/self/handshake_test.go +++ b/integrationtests/self/handshake_test.go @@ -745,3 +745,69 @@ func TestNoPacketsSentWhenClientHelloFails(t *testing.T) { // no packets received, as expected } } + +func TestServerTransportClose(t *testing.T) { + tlsServerConf := getTLSConfig() + tr := &quic.Transport{Conn: newUDPConnLocalhost(t)} + server, err := tr.Listen(tlsServerConf, getQuicConfig(nil)) + require.NoError(t, err) + defer server.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + // the first conn is accepted by the server... + conn1, err := quic.Dial( + ctx, + newUDPConnLocalhost(t), + server.Addr(), + getTLSClientConfig(), + getQuicConfig(&quic.Config{MaxIdleTimeout: scaleDuration(50 * time.Millisecond)}), + ) + require.NoError(t, err) + // ...the second conn isn't, it remains in the server's accept queue + conn2, err := quic.Dial( + ctx, + newUDPConnLocalhost(t), + server.Addr(), + getTLSClientConfig(), + getQuicConfig(&quic.Config{MaxIdleTimeout: scaleDuration(50 * time.Millisecond)}), + ) + require.NoError(t, err) + + time.Sleep(scaleDuration(10 * time.Millisecond)) + + sconn, err := server.Accept(ctx) + require.NoError(t, err) + require.Equal(t, conn1.LocalAddr(), sconn.RemoteAddr()) + + // closing the Transport abruptly terminates connections + require.NoError(t, tr.Close()) + + select { + case <-sconn.Context().Done(): + require.ErrorIs(t, context.Cause(sconn.Context()), quic.ErrTransportClosed) + case <-time.After(time.Second): + t.Fatal("timeout") + } + + // no CONNECTION_CLOSE frame is sent to the peers + select { + case <-conn1.Context().Done(): + require.ErrorIs(t, context.Cause(conn1.Context()), &quic.IdleTimeoutError{}) + case <-time.After(time.Second): + t.Fatal("timeout") + } + select { + case <-conn2.Context().Done(): + require.ErrorIs(t, context.Cause(conn1.Context()), &quic.IdleTimeoutError{}) + case <-time.After(time.Second): + t.Fatal("timeout") + } + + // Accept should error after the transport was closed + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() + accepted, err := server.Accept(ctx) + require.ErrorIs(t, err, quic.ErrTransportClosed) + require.Nil(t, accepted) +} diff --git a/server.go b/server.go index 3e72150e2..f6bef2d8d 100644 --- a/server.go +++ b/server.go @@ -343,13 +343,13 @@ func (s *baseServer) accept(ctx context.Context) (*Conn, error) { } func (s *baseServer) Close() error { - s.close(ErrServerClosed, true) + s.close(ErrServerClosed, false) return nil } // close closes the server. The Transport mutex must not be held while calling this method. // This method closes any handshaking connections which requires the tranpsort mutex. -func (s *baseServer) close(e error, notifyOnClose bool) { +func (s *baseServer) close(e error, transportClose bool) { s.closeMx.Lock() if s.closeErr != nil { s.closeMx.Unlock() @@ -360,12 +360,25 @@ func (s *baseServer) close(e error, notifyOnClose bool) { <-s.running s.closeMx.Unlock() - if notifyOnClose { + if !transportClose { s.onClose() } + // wait until all handshakes in flight have terminated s.handshakingCount.Wait() close(s.stopAccepting) + + if transportClose { + // if the transport is closing, drain the connQueue. All connections in the queue + // will be closed by the transport. + for { + select { + case <-s.connQueue: + default: + return + } + } + } } // Addr returns the server's network address diff --git a/transport.go b/transport.go index cf3245c4a..2f105298b 100644 --- a/transport.go +++ b/transport.go @@ -456,8 +456,12 @@ func (t *Transport) runSendQueue() { } // Close stops listening for UDP datagrams on the Transport.Conn. -// If any listener was started, it will be closed as well. -// It is invalid to start new listeners or connections after that. +// It abruptly terminates all existing connections, without sending a CONNECTION_CLOSE +// to the peers. It is the application's responsibility to cleanly terminate existing +// connections prior to calling Close. +// +// If a server was started, it will be closed as well. +// It is not possible to start any new server or dial new connections after that. func (t *Transport) Close() error { // avoid race condition if the transport is currently being initialized t.init(false) @@ -502,9 +506,10 @@ func (t *Transport) close(e error) { e = &errTransportClosed{err: e} t.closeErr = e server := t.server + t.server = nil if server != nil { t.mutex.Unlock() - server.close(e, false) + server.close(e, true) t.mutex.Lock() }