From cf97a0a39c7fcd90ef0b8f78be772e284873a0b6 Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 24 Jun 2025 17:25:42 +0530 Subject: [PATCH] fix deadlock when closing the Transport (#5220) * transport: fix deadlock on close The lock ordering is Transport.mutex before connMx. This deadlocks some times in tests. For the latest one: https://github.com/libp2p/go-libp2p/actions/runs/15534001571/job/43728863965?pr=3305 * remove connMx * release mutex for server close * Update server.go --------- Co-authored-by: Marten Seemann --- server.go | 2 ++ transport.go | 62 ++++++++++++++++++++++++---------------------------- 2 files changed, 31 insertions(+), 33 deletions(-) diff --git a/server.go b/server.go index 14ac8f81f..3e72150e2 100644 --- a/server.go +++ b/server.go @@ -347,6 +347,8 @@ func (s *baseServer) Close() error { return nil } +// close closes the server. The Transport mutex must not be held while calling this method. +// This method closes any handshaking connections which requires the tranpsort mutex. func (s *baseServer) close(e error, notifyOnClose bool) { s.closeMx.Lock() if s.closeErr != nil { diff --git a/transport.go b/transport.go index d0775ad7e..cf3245c4a 100644 --- a/transport.go +++ b/transport.go @@ -133,11 +133,10 @@ type Transport struct { // Tracer.Close is called when the transport is closed. Tracer *logging.Tracer - connMx sync.Mutex + mutex sync.Mutex handlers map[protocol.ConnectionID]packetHandler resetTokens map[protocol.StatelessResetToken]packetHandler - mutex sync.Mutex initOnce sync.Once initErr error @@ -320,9 +319,7 @@ func (t *Transport) doDial( logger, version, ) - t.connMx.Lock() t.handlers[srcConnID] = conn - t.connMx.Unlock() t.mutex.Unlock() // The error channel needs to be buffered, as the run loop will continue running @@ -489,8 +486,6 @@ func (t *Transport) closeServer() { t.closeErr = ErrServerClosed } - t.connMx.Lock() - defer t.connMx.Unlock() if len(t.handlers) == 0 { t.maybeStopListening() } @@ -498,16 +493,23 @@ func (t *Transport) closeServer() { func (t *Transport) close(e error) { t.mutex.Lock() - defer t.mutex.Unlock() if t.closeErr != nil { + t.mutex.Unlock() return } e = &errTransportClosed{err: e} + t.closeErr = e + server := t.server + if server != nil { + t.mutex.Unlock() + server.close(e, false) + t.mutex.Lock() + } + // Close existing connections var wg sync.WaitGroup - t.connMx.Lock() for _, handler := range t.handlers { wg.Add(1) go func(handler packetHandler) { @@ -515,16 +517,12 @@ func (t *Transport) close(e error) { wg.Done() }(handler) } - t.connMx.Unlock() + t.mutex.Unlock() // closing connections requires releasing transport mutex wg.Wait() - if t.server != nil { - t.server.close(e, false) - } if t.Tracer != nil && t.Tracer.Close != nil { t.Tracer.Close() } - t.closeErr = e } // only print warnings about the UDP receive buffer size once @@ -670,9 +668,9 @@ func (t *Transport) maybeHandleStatelessReset(data []byte) bool { } token := protocol.StatelessResetToken(data[len(data)-16:]) - t.connMx.Lock() + t.mutex.Lock() conn, ok := t.resetTokens[token] - t.connMx.Unlock() + t.mutex.Unlock() if ok { t.logger.Debugf("Received a stateless reset with token %#x. Closing connection.", token) @@ -745,8 +743,8 @@ type packetHandlerMap Transport var _ connRunner = &packetHandlerMap{} func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler) bool /* was added */ { - h.connMx.Lock() - defer h.connMx.Unlock() + h.mutex.Lock() + defer h.mutex.Unlock() if _, ok := h.handlers[id]; ok { h.logger.Debugf("Not adding connection ID %s, as it already exists.", id) @@ -758,27 +756,27 @@ func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler) } func (h *packetHandlerMap) Get(connID protocol.ConnectionID) (packetHandler, bool) { - h.connMx.Lock() - defer h.connMx.Unlock() + h.mutex.Lock() + defer h.mutex.Unlock() handler, ok := h.handlers[connID] return handler, ok } func (h *packetHandlerMap) AddResetToken(token protocol.StatelessResetToken, handler packetHandler) { - h.connMx.Lock() + h.mutex.Lock() h.resetTokens[token] = handler - h.connMx.Unlock() + h.mutex.Unlock() } func (h *packetHandlerMap) RemoveResetToken(token protocol.StatelessResetToken) { - h.connMx.Lock() + h.mutex.Lock() delete(h.resetTokens, token) - h.connMx.Unlock() + h.mutex.Unlock() } func (h *packetHandlerMap) AddWithConnID(clientDestConnID, newConnID protocol.ConnectionID, handler packetHandler) bool { - h.connMx.Lock() - defer h.connMx.Unlock() + h.mutex.Lock() + defer h.mutex.Unlock() if _, ok := h.handlers[clientDestConnID]; ok { h.logger.Debugf("Not adding connection ID %s for a new connection, as it already exists.", clientDestConnID) @@ -791,9 +789,9 @@ func (h *packetHandlerMap) AddWithConnID(clientDestConnID, newConnID protocol.Co } func (h *packetHandlerMap) Remove(id protocol.ConnectionID) { - h.connMx.Lock() + h.mutex.Lock() delete(h.handlers, id) - h.connMx.Unlock() + h.mutex.Unlock() h.logger.Debugf("Removing connection ID %s.", id) } @@ -819,25 +817,23 @@ func (h *packetHandlerMap) ReplaceWithClosed(ids []protocol.ConnectionID, connCl handler = newClosedRemoteConn() } - h.connMx.Lock() + h.mutex.Lock() for _, id := range ids { h.handlers[id] = handler } - h.connMx.Unlock() + h.mutex.Unlock() h.logger.Debugf("Replacing connection for connection IDs %s with a closed connection.", ids) time.AfterFunc(expiry, func() { - h.connMx.Lock() + h.mutex.Lock() for _, id := range ids { delete(h.handlers, id) } if len(h.handlers) == 0 { t := (*Transport)(h) - t.mutex.Lock() t.maybeStopListening() - t.mutex.Unlock() } - h.connMx.Unlock() + h.mutex.Unlock() h.logger.Debugf("Removing connection IDs %s for a closed connection after it has been retired.", ids) }) }