drain server accept queue when the transport is closed (#5237)

* drain server queue on transport close

* add integration test for clearing conn queue

* improve documentation and test for Transport.Close

* move to handshake_test.go

---------

Co-authored-by: sukun <sukunrt@gmail.com>
This commit is contained in:
Marten Seemann
2025-06-24 21:20:23 +08:00
committed by GitHub
parent cf97a0a39c
commit e629a12d06
3 changed files with 90 additions and 6 deletions

View File

@@ -745,3 +745,69 @@ func TestNoPacketsSentWhenClientHelloFails(t *testing.T) {
// no packets received, as expected
}
}
func TestServerTransportClose(t *testing.T) {
tlsServerConf := getTLSConfig()
tr := &quic.Transport{Conn: newUDPConnLocalhost(t)}
server, err := tr.Listen(tlsServerConf, getQuicConfig(nil))
require.NoError(t, err)
defer server.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// the first conn is accepted by the server...
conn1, err := quic.Dial(
ctx,
newUDPConnLocalhost(t),
server.Addr(),
getTLSClientConfig(),
getQuicConfig(&quic.Config{MaxIdleTimeout: scaleDuration(50 * time.Millisecond)}),
)
require.NoError(t, err)
// ...the second conn isn't, it remains in the server's accept queue
conn2, err := quic.Dial(
ctx,
newUDPConnLocalhost(t),
server.Addr(),
getTLSClientConfig(),
getQuicConfig(&quic.Config{MaxIdleTimeout: scaleDuration(50 * time.Millisecond)}),
)
require.NoError(t, err)
time.Sleep(scaleDuration(10 * time.Millisecond))
sconn, err := server.Accept(ctx)
require.NoError(t, err)
require.Equal(t, conn1.LocalAddr(), sconn.RemoteAddr())
// closing the Transport abruptly terminates connections
require.NoError(t, tr.Close())
select {
case <-sconn.Context().Done():
require.ErrorIs(t, context.Cause(sconn.Context()), quic.ErrTransportClosed)
case <-time.After(time.Second):
t.Fatal("timeout")
}
// no CONNECTION_CLOSE frame is sent to the peers
select {
case <-conn1.Context().Done():
require.ErrorIs(t, context.Cause(conn1.Context()), &quic.IdleTimeoutError{})
case <-time.After(time.Second):
t.Fatal("timeout")
}
select {
case <-conn2.Context().Done():
require.ErrorIs(t, context.Cause(conn1.Context()), &quic.IdleTimeoutError{})
case <-time.After(time.Second):
t.Fatal("timeout")
}
// Accept should error after the transport was closed
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()
accepted, err := server.Accept(ctx)
require.ErrorIs(t, err, quic.ErrTransportClosed)
require.Nil(t, accepted)
}

View File

@@ -343,13 +343,13 @@ func (s *baseServer) accept(ctx context.Context) (*Conn, error) {
}
func (s *baseServer) Close() error {
s.close(ErrServerClosed, true)
s.close(ErrServerClosed, false)
return nil
}
// close closes the server. The Transport mutex must not be held while calling this method.
// This method closes any handshaking connections which requires the tranpsort mutex.
func (s *baseServer) close(e error, notifyOnClose bool) {
func (s *baseServer) close(e error, transportClose bool) {
s.closeMx.Lock()
if s.closeErr != nil {
s.closeMx.Unlock()
@@ -360,12 +360,25 @@ func (s *baseServer) close(e error, notifyOnClose bool) {
<-s.running
s.closeMx.Unlock()
if notifyOnClose {
if !transportClose {
s.onClose()
}
// wait until all handshakes in flight have terminated
s.handshakingCount.Wait()
close(s.stopAccepting)
if transportClose {
// if the transport is closing, drain the connQueue. All connections in the queue
// will be closed by the transport.
for {
select {
case <-s.connQueue:
default:
return
}
}
}
}
// Addr returns the server's network address

View File

@@ -456,8 +456,12 @@ func (t *Transport) runSendQueue() {
}
// Close stops listening for UDP datagrams on the Transport.Conn.
// If any listener was started, it will be closed as well.
// It is invalid to start new listeners or connections after that.
// It abruptly terminates all existing connections, without sending a CONNECTION_CLOSE
// to the peers. It is the application's responsibility to cleanly terminate existing
// connections prior to calling Close.
//
// If a server was started, it will be closed as well.
// It is not possible to start any new server or dial new connections after that.
func (t *Transport) Close() error {
// avoid race condition if the transport is currently being initialized
t.init(false)
@@ -502,9 +506,10 @@ func (t *Transport) close(e error) {
e = &errTransportClosed{err: e}
t.closeErr = e
server := t.server
t.server = nil
if server != nil {
t.mutex.Unlock()
server.close(e, false)
server.close(e, true)
t.mutex.Lock()
}