implement closing of paths probed for connection migration (#4979)

This commit is contained in:
Marten Seemann
2025-04-13 19:59:43 +08:00
committed by GitHub
parent 26ba8d978f
commit 97da280142
3 changed files with 145 additions and 20 deletions

View File

@@ -12,6 +12,8 @@ import (
type pathID int64
const invalidPathID pathID = -1
const maxPaths = 3
type path struct {

View File

@@ -14,8 +14,12 @@ import (
"github.com/quic-go/quic-go/internal/wire"
)
// ErrPathNotValidated is returned when trying to use a path before path probing has completed.
var ErrPathNotValidated = errors.New("path not yet validated")
var (
// ErrPathClosed is returned when trying to switch to a path that has been closed.
ErrPathClosed = errors.New("path closed")
// ErrPathNotValidated is returned when trying to use a path before path probing has completed.
ErrPathNotValidated = errors.New("path not yet validated")
)
var errPathDoesNotExist = errors.New("path does not exist")
@@ -28,6 +32,7 @@ type Path struct {
enablePath func()
validated atomic.Bool
abandon chan struct{}
}
func (p *Path) Probe(ctx context.Context) error {
@@ -47,6 +52,8 @@ func (p *Path) Probe(ctx context.Context) error {
case <-timerChan:
p.pathManager.enqueueProbe(p)
case <-path.ProbeSent():
case <-p.abandon:
return ErrPathClosed
}
if timer != nil {
@@ -62,11 +69,37 @@ func (p *Path) Probe(ctx context.Context) error {
// It immediately stops sending on the old path, and sends on this new path.
func (p *Path) Switch() error {
if err := p.pathManager.switchToPath(p.id); err != nil {
if errors.Is(err, errPathDoesNotExist) && !p.validated.Load() {
return ErrPathNotValidated
switch {
case errors.Is(err, ErrPathNotValidated):
return err
case errors.Is(err, errPathDoesNotExist) && !p.validated.Load():
select {
case <-p.abandon:
return ErrPathClosed
default:
return ErrPathNotValidated
}
default:
return ErrPathClosed
}
}
return nil
}
// Close abandons a path.
// It is not possible to close the path thats currently active.
// After closing, it is not possible to probe this path again.
func (p *Path) Close() error {
select {
case <-p.abandon:
return nil
default:
}
if err := p.pathManager.removePath(p.id); err != nil {
return err
}
close(p.abandon)
return nil
}
@@ -88,6 +121,7 @@ type pathManagerOutgoing struct {
scheduleSending func()
mx sync.Mutex
activePath pathID
pathsToProbe []pathID
paths map[pathID]*pathOutgoing
nextPathID pathID
@@ -100,6 +134,8 @@ func newPathManagerOutgoing(
scheduleSending func(),
) *pathManagerOutgoing {
return &pathManagerOutgoing{
activePath: 0, // at initialization time, we're guaranteed to be using the handshake path
nextPathID: 1,
getConnID: getConnID,
retireConnID: retireConnID,
scheduleSending: scheduleSending,
@@ -134,6 +170,32 @@ func (pm *pathManagerOutgoing) enqueueProbe(p *Path) {
pm.scheduleSending()
}
func (pm *pathManagerOutgoing) removePath(id pathID) error {
if err := pm.removePathImpl(id); err != nil {
return err
}
pm.scheduleSending()
return nil
}
func (pm *pathManagerOutgoing) removePathImpl(id pathID) error {
pm.mx.Lock()
defer pm.mx.Unlock()
if id == pm.activePath {
return errors.New("cannot close active path")
}
p, ok := pm.paths[id]
if !ok {
return nil
}
if len(p.pathChallenges) > 0 {
pm.retireConnID(id)
}
delete(pm.paths, id)
return nil
}
func (pm *pathManagerOutgoing) switchToPath(id pathID) error {
pm.mx.Lock()
defer pm.mx.Unlock()
@@ -146,6 +208,7 @@ func (pm *pathManagerOutgoing) switchToPath(id pathID) error {
return ErrPathNotValidated
}
pm.pathToSwitchTo = p
pm.activePath = id
return nil
}
@@ -161,6 +224,7 @@ func (pm *pathManagerOutgoing) NewPath(t *Transport, initialRTT time.Duration, e
tr: t,
enablePath: enablePath,
initialRTT: initialRTT,
abandon: make(chan struct{}),
}
}
@@ -169,21 +233,19 @@ func (pm *pathManagerOutgoing) NextPathToProbe() (_ protocol.ConnectionID, _ ack
defer pm.mx.Unlock()
var p *pathOutgoing
var id pathID
for {
if len(pm.pathsToProbe) == 0 {
return protocol.ConnectionID{}, ackhandler.Frame{}, nil, false
}
id = pm.pathsToProbe[0]
pm.pathsToProbe = pm.pathsToProbe[1:]
id := invalidPathID
for _, pID := range pm.pathsToProbe {
var ok bool
// if the path doesn't exist in the map, it might have been abandoned
p, ok = pm.paths[id]
p, ok = pm.paths[pID]
if ok {
id = pID
break
}
// if the path doesn't exist in the map, it might have been abandoned
pm.pathsToProbe = pm.pathsToProbe[1:]
}
if id == invalidPathID {
return protocol.ConnectionID{}, ackhandler.Frame{}, nil, false
}
connID, ok := pm.getConnID(id)
@@ -195,6 +257,7 @@ func (pm *pathManagerOutgoing) NextPathToProbe() (_ protocol.ConnectionID, _ ack
_, _ = rand.Read(b[:])
p.pathChallenges = append(p.pathChallenges, b)
pm.pathsToProbe = pm.pathsToProbe[1:]
p.enablePath()
select {
case p.probeSent <- struct{}{}:

View File

@@ -14,12 +14,14 @@ import (
func TestPathManagerOutgoingPathProbing(t *testing.T) {
connIDs := []protocol.ConnectionID{
protocol.ParseConnectionID([]byte{1, 2, 3, 4, 5, 6, 7, 8}),
protocol.ParseConnectionID([]byte{2, 3, 4, 5, 6, 7, 8, 9}),
}
var retiredConnIDs []protocol.ConnectionID
pm := newPathManagerOutgoing(
func(id pathID) (protocol.ConnectionID, bool) { return connIDs[id], true },
func(id pathID) { retiredConnIDs = append(retiredConnIDs, connIDs[id]) },
func(id pathID) (protocol.ConnectionID, bool) {
connID := connIDs[0]
connIDs = connIDs[1:]
return connID, true
},
func(id pathID) { t.Fatal("didn't expect any connection ID to be retired") },
func() {},
)
@@ -41,7 +43,7 @@ func TestPathManagerOutgoingPathProbing(t *testing.T) {
connID, f, tr, ok := pm.NextPathToProbe()
require.True(t, ok)
require.Equal(t, tr1, tr)
require.Equal(t, connIDs[0], connID)
require.Equal(t, protocol.ParseConnectionID([]byte{1, 2, 3, 4, 5, 6, 7, 8}), connID)
require.IsType(t, &wire.PathChallengeFrame{}, f.Frame)
pc := f.Frame.(*wire.PathChallengeFrame)
require.True(t, enabled)
@@ -93,6 +95,8 @@ func TestPathManagerOutgoingPathProbing(t *testing.T) {
_, ok = pm.ShouldSwitchPath()
require.False(t, ok)
require.NoError(t, p.Switch())
// the active path can't be closed
require.EqualError(t, p.Close(), "cannot close active path")
switchToTransport, ok := pm.ShouldSwitchPath()
require.True(t, ok)
require.Equal(t, tr1, switchToTransport)
@@ -199,3 +203,59 @@ func TestPathManagerOutgoingRetransmissions(t *testing.T) {
t.Fatal("timeout")
}
}
func TestPathManagerOutgoingAbandonPath(t *testing.T) {
connIDs := []protocol.ConnectionID{
protocol.ParseConnectionID([]byte{1, 2, 3, 4, 5, 6, 7, 8}),
}
var retiredPaths []pathID
pm := newPathManagerOutgoing(
func(id pathID) (protocol.ConnectionID, bool) {
connID := connIDs[0]
connIDs = connIDs[1:]
return connID, true
},
func(id pathID) { retiredPaths = append(retiredPaths, id) },
func() {},
)
// path abandoned before the PATH_CHALLENGE is sent out
p1 := pm.NewPath(&Transport{}, time.Second, func() {})
errChan := make(chan error, 1)
go func() { errChan <- p1.Probe(context.Background()) }()
// wait for the path to be queued for probing
time.Sleep(scaleDuration(5 * time.Millisecond))
require.NoError(t, p1.Close())
// closing the path multiple times is ok
require.NoError(t, p1.Close())
require.NoError(t, p1.Close())
_, _, _, ok := pm.NextPathToProbe()
require.False(t, ok)
select {
case err := <-errChan:
require.ErrorIs(t, err, ErrPathClosed)
case <-time.After(time.Second):
t.Fatal("timeout")
}
require.Empty(t, retiredPaths)
p2 := pm.NewPath(&Transport{}, time.Second, func() {})
go func() { errChan <- p2.Probe(context.Background()) }()
// wait for the path to be queued for probing
time.Sleep(scaleDuration(5 * time.Millisecond))
connID, f, _, ok := pm.NextPathToProbe()
require.True(t, ok)
require.Equal(t, protocol.ParseConnectionID([]byte{1, 2, 3, 4, 5, 6, 7, 8}), connID)
require.NoError(t, p2.Close())
require.Equal(t, []pathID{p2.id}, retiredPaths)
pm.HandlePathResponseFrame(&wire.PathResponseFrame{Data: f.Frame.(*wire.PathChallengeFrame).Data})
_, _, _, ok = pm.NextPathToProbe()
require.False(t, ok)
// it's not possible to switch to an abandoned path
require.ErrorIs(t, p2.Switch(), ErrPathClosed)
}