fix deadlock when closing the Transport (#5220)

* transport: fix deadlock on close

The lock ordering is Transport.mutex before connMx. This deadlocks
some times in tests.
For the latest one:
https://github.com/libp2p/go-libp2p/actions/runs/15534001571/job/43728863965?pr=3305

* remove connMx

* release mutex for server close

* Update server.go

---------

Co-authored-by: Marten Seemann <martenseemann@gmail.com>
This commit is contained in:
sukun
2025-06-24 17:25:42 +05:30
committed by GitHub
parent 92aa7b41d5
commit cf97a0a39c
2 changed files with 31 additions and 33 deletions

View File

@@ -347,6 +347,8 @@ func (s *baseServer) Close() error {
return nil
}
// close closes the server. The Transport mutex must not be held while calling this method.
// This method closes any handshaking connections which requires the tranpsort mutex.
func (s *baseServer) close(e error, notifyOnClose bool) {
s.closeMx.Lock()
if s.closeErr != nil {

View File

@@ -133,11 +133,10 @@ type Transport struct {
// Tracer.Close is called when the transport is closed.
Tracer *logging.Tracer
connMx sync.Mutex
mutex sync.Mutex
handlers map[protocol.ConnectionID]packetHandler
resetTokens map[protocol.StatelessResetToken]packetHandler
mutex sync.Mutex
initOnce sync.Once
initErr error
@@ -320,9 +319,7 @@ func (t *Transport) doDial(
logger,
version,
)
t.connMx.Lock()
t.handlers[srcConnID] = conn
t.connMx.Unlock()
t.mutex.Unlock()
// The error channel needs to be buffered, as the run loop will continue running
@@ -489,8 +486,6 @@ func (t *Transport) closeServer() {
t.closeErr = ErrServerClosed
}
t.connMx.Lock()
defer t.connMx.Unlock()
if len(t.handlers) == 0 {
t.maybeStopListening()
}
@@ -498,16 +493,23 @@ func (t *Transport) closeServer() {
func (t *Transport) close(e error) {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.closeErr != nil {
t.mutex.Unlock()
return
}
e = &errTransportClosed{err: e}
t.closeErr = e
server := t.server
if server != nil {
t.mutex.Unlock()
server.close(e, false)
t.mutex.Lock()
}
// Close existing connections
var wg sync.WaitGroup
t.connMx.Lock()
for _, handler := range t.handlers {
wg.Add(1)
go func(handler packetHandler) {
@@ -515,16 +517,12 @@ func (t *Transport) close(e error) {
wg.Done()
}(handler)
}
t.connMx.Unlock()
t.mutex.Unlock() // closing connections requires releasing transport mutex
wg.Wait()
if t.server != nil {
t.server.close(e, false)
}
if t.Tracer != nil && t.Tracer.Close != nil {
t.Tracer.Close()
}
t.closeErr = e
}
// only print warnings about the UDP receive buffer size once
@@ -670,9 +668,9 @@ func (t *Transport) maybeHandleStatelessReset(data []byte) bool {
}
token := protocol.StatelessResetToken(data[len(data)-16:])
t.connMx.Lock()
t.mutex.Lock()
conn, ok := t.resetTokens[token]
t.connMx.Unlock()
t.mutex.Unlock()
if ok {
t.logger.Debugf("Received a stateless reset with token %#x. Closing connection.", token)
@@ -745,8 +743,8 @@ type packetHandlerMap Transport
var _ connRunner = &packetHandlerMap{}
func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler) bool /* was added */ {
h.connMx.Lock()
defer h.connMx.Unlock()
h.mutex.Lock()
defer h.mutex.Unlock()
if _, ok := h.handlers[id]; ok {
h.logger.Debugf("Not adding connection ID %s, as it already exists.", id)
@@ -758,27 +756,27 @@ func (h *packetHandlerMap) Add(id protocol.ConnectionID, handler packetHandler)
}
func (h *packetHandlerMap) Get(connID protocol.ConnectionID) (packetHandler, bool) {
h.connMx.Lock()
defer h.connMx.Unlock()
h.mutex.Lock()
defer h.mutex.Unlock()
handler, ok := h.handlers[connID]
return handler, ok
}
func (h *packetHandlerMap) AddResetToken(token protocol.StatelessResetToken, handler packetHandler) {
h.connMx.Lock()
h.mutex.Lock()
h.resetTokens[token] = handler
h.connMx.Unlock()
h.mutex.Unlock()
}
func (h *packetHandlerMap) RemoveResetToken(token protocol.StatelessResetToken) {
h.connMx.Lock()
h.mutex.Lock()
delete(h.resetTokens, token)
h.connMx.Unlock()
h.mutex.Unlock()
}
func (h *packetHandlerMap) AddWithConnID(clientDestConnID, newConnID protocol.ConnectionID, handler packetHandler) bool {
h.connMx.Lock()
defer h.connMx.Unlock()
h.mutex.Lock()
defer h.mutex.Unlock()
if _, ok := h.handlers[clientDestConnID]; ok {
h.logger.Debugf("Not adding connection ID %s for a new connection, as it already exists.", clientDestConnID)
@@ -791,9 +789,9 @@ func (h *packetHandlerMap) AddWithConnID(clientDestConnID, newConnID protocol.Co
}
func (h *packetHandlerMap) Remove(id protocol.ConnectionID) {
h.connMx.Lock()
h.mutex.Lock()
delete(h.handlers, id)
h.connMx.Unlock()
h.mutex.Unlock()
h.logger.Debugf("Removing connection ID %s.", id)
}
@@ -819,25 +817,23 @@ func (h *packetHandlerMap) ReplaceWithClosed(ids []protocol.ConnectionID, connCl
handler = newClosedRemoteConn()
}
h.connMx.Lock()
h.mutex.Lock()
for _, id := range ids {
h.handlers[id] = handler
}
h.connMx.Unlock()
h.mutex.Unlock()
h.logger.Debugf("Replacing connection for connection IDs %s with a closed connection.", ids)
time.AfterFunc(expiry, func() {
h.connMx.Lock()
h.mutex.Lock()
for _, id := range ids {
delete(h.handlers, id)
}
if len(h.handlers) == 0 {
t := (*Transport)(h)
t.mutex.Lock()
t.maybeStopListening()
t.mutex.Unlock()
}
h.connMx.Unlock()
h.mutex.Unlock()
h.logger.Debugf("Removing connection IDs %s for a closed connection after it has been retired.", ids)
})
}