forked from quic-go/quic-go
use simnet in CONNECTION_CLOSE retransmission test (#5395)
* use simnet in CONNECTION_CLOSE retransmission test * fix race
This commit is contained in:
1
.github/workflows/integration.yml
vendored
1
.github/workflows/integration.yml
vendored
@@ -26,6 +26,7 @@ jobs:
|
|||||||
env:
|
env:
|
||||||
DEBUG: false # set this to true to export qlogs and save them as artifacts
|
DEBUG: false # set this to true to export qlogs and save them as artifacts
|
||||||
TIMESCALE_FACTOR: 3
|
TIMESCALE_FACTOR: 3
|
||||||
|
GOEXPERIMENT: ${{ matrix.go == '1.24.x' && 'synctest' || '' }}
|
||||||
name: "Integration (${{ matrix.os }}, Go ${{ matrix.go }}${{ matrix.race && ', race' || '' }})"
|
name: "Integration (${{ matrix.os }}, Go ${{ matrix.go }}${{ matrix.race && ', race' || '' }})"
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v5
|
||||||
|
|||||||
@@ -670,7 +670,7 @@ runLoop:
|
|||||||
// Check for loss detection timeout.
|
// Check for loss detection timeout.
|
||||||
// This could cause packets to be declared lost, and retransmissions to be enqueued.
|
// This could cause packets to be declared lost, and retransmissions to be enqueued.
|
||||||
now := monotime.Now()
|
now := monotime.Now()
|
||||||
if timeout := c.sentPacketHandler.GetLossDetectionTimeout(); !timeout.IsZero() && timeout.Before(now) {
|
if timeout := c.sentPacketHandler.GetLossDetectionTimeout(); !timeout.IsZero() && !timeout.After(now) {
|
||||||
if err := c.sentPacketHandler.OnLossDetectionTimeout(now); err != nil {
|
if err := c.sentPacketHandler.OnLossDetectionTimeout(now); err != nil {
|
||||||
c.setCloseError(&closeError{err: err})
|
c.setCloseError(&closeError{err: err})
|
||||||
break runLoop
|
break runLoop
|
||||||
|
|||||||
@@ -3,84 +3,115 @@ package self_test
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"math"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/quic-go/quic-go"
|
"github.com/quic-go/quic-go"
|
||||||
quicproxy "github.com/quic-go/quic-go/integrationtests/tools/proxy"
|
|
||||||
"github.com/quic-go/quic-go/internal/protocol"
|
"github.com/quic-go/quic-go/internal/protocol"
|
||||||
|
"github.com/quic-go/quic-go/internal/synctest"
|
||||||
|
"github.com/quic-go/quic-go/testutils/simnet"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type droppingRouter struct {
|
||||||
|
simnet.PerfectRouter
|
||||||
|
|
||||||
|
Drop func(simnet.Packet) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *droppingRouter) SendPacket(p simnet.Packet) error {
|
||||||
|
if d.Drop(p) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return d.PerfectRouter.SendPacket(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ simnet.Router = &droppingRouter{}
|
||||||
|
|
||||||
func TestConnectionCloseRetransmission(t *testing.T) {
|
func TestConnectionCloseRetransmission(t *testing.T) {
|
||||||
server, err := quic.Listen(
|
synctest.Test(t, func(t *testing.T) {
|
||||||
newUDPConnLocalhost(t),
|
const rtt = 10 * time.Millisecond
|
||||||
getTLSConfig(),
|
serverAddr := &net.UDPAddr{IP: net.ParseIP("1.0.0.2"), Port: 9002}
|
||||||
getQuicConfig(&quic.Config{DisablePathMTUDiscovery: true}),
|
|
||||||
)
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
var drop atomic.Bool
|
var drop atomic.Bool
|
||||||
dropped := make(chan []byte, 100)
|
var mx sync.Mutex
|
||||||
proxy := &quicproxy.Proxy{
|
var dropped [][]byte
|
||||||
Conn: newUDPConnLocalhost(t),
|
n := &simnet.Simnet{
|
||||||
ServerAddr: server.Addr().(*net.UDPAddr),
|
Router: &droppingRouter{Drop: func(p simnet.Packet) bool {
|
||||||
DelayPacket: func(quicproxy.Direction, net.Addr, net.Addr, []byte) time.Duration {
|
shouldDrop := drop.Load() && p.From.String() == serverAddr.String()
|
||||||
return scaleDuration(5 * time.Millisecond) // 10ms RTT
|
if shouldDrop {
|
||||||
},
|
mx.Lock()
|
||||||
DropPacket: func(dir quicproxy.Direction, _, _ net.Addr, b []byte) bool {
|
dropped = append(dropped, p.Data)
|
||||||
if drop := drop.Load(); drop && dir == quicproxy.DirectionOutgoing {
|
mx.Unlock()
|
||||||
dropped <- b
|
}
|
||||||
return true
|
return shouldDrop
|
||||||
}
|
}},
|
||||||
return false
|
|
||||||
},
|
|
||||||
}
|
|
||||||
require.NoError(t, proxy.Start())
|
|
||||||
defer proxy.Close()
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
||||||
defer cancel()
|
|
||||||
conn, err := quic.Dial(ctx, newUDPConnLocalhost(t), proxy.LocalAddr(), getTLSClientConfig(), getQuicConfig(nil))
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer conn.CloseWithError(0, "")
|
|
||||||
|
|
||||||
sconn, err := server.Accept(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
drop.Store(true)
|
|
||||||
sconn.CloseWithError(1337, "closing")
|
|
||||||
|
|
||||||
// send 100 packets
|
|
||||||
for range 100 {
|
|
||||||
str, err := conn.OpenStream()
|
|
||||||
require.NoError(t, err)
|
|
||||||
_, err = str.Write([]byte("foobar"))
|
|
||||||
require.NoError(t, err)
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Expect retransmissions of the CONNECTION_CLOSE for the
|
|
||||||
// 1st, 2nd, 4th, 8th, 16th, 32th, 64th packet: 7 in total (+1 for the original packet)
|
|
||||||
var packets [][]byte
|
|
||||||
for range 8 {
|
|
||||||
select {
|
|
||||||
case p := <-dropped:
|
|
||||||
packets = append(packets, p)
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
t.Fatal("timeout waiting for CONNECTION_CLOSE retransmission")
|
|
||||||
}
|
}
|
||||||
}
|
settings := simnet.NodeBiDiLinkSettings{
|
||||||
|
Downlink: simnet.LinkSettings{BitsPerSecond: math.MaxInt, Latency: rtt / 4},
|
||||||
|
Uplink: simnet.LinkSettings{BitsPerSecond: math.MaxInt, Latency: rtt / 4},
|
||||||
|
}
|
||||||
|
clientConn := n.NewEndpoint(&net.UDPAddr{IP: net.ParseIP("1.0.0.1"), Port: 9001}, settings)
|
||||||
|
serverConn := n.NewEndpoint(serverAddr, settings)
|
||||||
|
require.NoError(t, n.Start())
|
||||||
|
defer n.Close()
|
||||||
|
|
||||||
// verify all retransmitted packets were identical
|
tr := &quic.Transport{Conn: serverConn}
|
||||||
for i := 1; i < len(packets); i++ {
|
defer tr.Close()
|
||||||
require.Equal(t, packets[0], packets[i])
|
server, err := tr.Listen(
|
||||||
}
|
getTLSConfig(),
|
||||||
|
getQuicConfig(&quic.Config{DisablePathMTUDiscovery: true}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
conn, err := quic.Dial(ctx, clientConn, server.Addr(), getTLSClientConfig(), getQuicConfig(nil))
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer conn.CloseWithError(0, "")
|
||||||
|
|
||||||
|
sconn, err := server.Accept(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
time.Sleep(rtt)
|
||||||
|
|
||||||
|
drop.Store(true)
|
||||||
|
sconn.CloseWithError(1337, "closing")
|
||||||
|
|
||||||
|
// send 100 packets
|
||||||
|
for range 100 {
|
||||||
|
str, err := conn.OpenStream()
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = str.Write([]byte("foobar"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// A closed connection will drop packets if a very short queue overflows.
|
||||||
|
// Waiting for one nanosecond makes synctest process the packet before advancing
|
||||||
|
// the synthetic clock.
|
||||||
|
time.Sleep(time.Nanosecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(rtt)
|
||||||
|
|
||||||
|
mx.Lock()
|
||||||
|
defer mx.Unlock()
|
||||||
|
|
||||||
|
// Expect retransmissions of the CONNECTION_CLOSE for the
|
||||||
|
// 1st, 2nd, 4th, 8th, 16th, 32th, 64th packet: 7 in total (+1 for the original packet)
|
||||||
|
require.Len(t, dropped, 8)
|
||||||
|
|
||||||
|
// verify all retransmitted packets were identical
|
||||||
|
for i := 1; i < len(dropped); i++ {
|
||||||
|
require.Equal(t, dropped[0], dropped[i])
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDrainServerAcceptQueue(t *testing.T) {
|
func TestDrainServerAcceptQueue(t *testing.T) {
|
||||||
|
|||||||
@@ -14,12 +14,18 @@ import (
|
|||||||
|
|
||||||
const ALPN = "quic-go integration tests"
|
const ALPN = "quic-go integration tests"
|
||||||
|
|
||||||
|
// use a very long validity period to cover the synthetic clock used in synctest
|
||||||
|
var (
|
||||||
|
notBefore = time.Date(1990, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||||
|
notAfter = time.Date(2100, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||||
|
)
|
||||||
|
|
||||||
func GenerateCA() (*x509.Certificate, crypto.PrivateKey, error) {
|
func GenerateCA() (*x509.Certificate, crypto.PrivateKey, error) {
|
||||||
certTempl := &x509.Certificate{
|
certTempl := &x509.Certificate{
|
||||||
SerialNumber: big.NewInt(2019),
|
SerialNumber: big.NewInt(2019),
|
||||||
Subject: pkix.Name{},
|
Subject: pkix.Name{},
|
||||||
NotBefore: time.Now(),
|
NotBefore: notBefore,
|
||||||
NotAfter: time.Now().Add(24 * time.Hour),
|
NotAfter: notAfter,
|
||||||
IsCA: true,
|
IsCA: true,
|
||||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
|
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
|
||||||
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
|
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
|
||||||
@@ -45,8 +51,8 @@ func GenerateLeafCert(ca *x509.Certificate, caPriv crypto.PrivateKey) (*x509.Cer
|
|||||||
SerialNumber: big.NewInt(1),
|
SerialNumber: big.NewInt(1),
|
||||||
DNSNames: []string{"localhost"},
|
DNSNames: []string{"localhost"},
|
||||||
IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1)},
|
IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1)},
|
||||||
NotBefore: time.Now(),
|
NotBefore: notBefore,
|
||||||
NotAfter: time.Now().Add(24 * time.Hour),
|
NotAfter: notAfter,
|
||||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
|
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
|
||||||
KeyUsage: x509.KeyUsageDigitalSignature,
|
KeyUsage: x509.KeyUsageDigitalSignature,
|
||||||
}
|
}
|
||||||
@@ -72,8 +78,8 @@ func GenerateTLSConfigWithLongCertChain(ca *x509.Certificate, caPrivateKey crypt
|
|||||||
certTempl := &x509.Certificate{
|
certTempl := &x509.Certificate{
|
||||||
SerialNumber: big.NewInt(2019),
|
SerialNumber: big.NewInt(2019),
|
||||||
Subject: pkix.Name{},
|
Subject: pkix.Name{},
|
||||||
NotBefore: time.Now(),
|
NotBefore: notBefore,
|
||||||
NotAfter: time.Now().Add(24 * time.Hour),
|
NotAfter: notAfter,
|
||||||
IsCA: true,
|
IsCA: true,
|
||||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
|
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
|
||||||
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
|
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
|
||||||
|
|||||||
Reference in New Issue
Block a user