forked from quic-go/quic-go
use synctest to make the datagram queue tests fully deterministic (#5305)
This commit is contained in:
@@ -3,8 +3,8 @@ package quic
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/quic-go/quic-go/internal/synctest"
|
||||
"github.com/quic-go/quic-go/internal/utils"
|
||||
"github.com/quic-go/quic-go/internal/wire"
|
||||
|
||||
@@ -27,43 +27,49 @@ func TestDatagramQueuePeekAndPop(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDatagramQueueSendQueueLength(t *testing.T) {
|
||||
queue := newDatagramQueue(func() {}, utils.DefaultLogger)
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
queue := newDatagramQueue(func() {}, utils.DefaultLogger)
|
||||
|
||||
for i := 0; i < maxDatagramSendQueueLen; i++ {
|
||||
require.NoError(t, queue.Add(&wire.DatagramFrame{Data: []byte{0}}))
|
||||
}
|
||||
errChan := make(chan error, 1)
|
||||
go func() { errChan <- queue.Add(&wire.DatagramFrame{Data: []byte("foobar")}) }()
|
||||
for range maxDatagramSendQueueLen {
|
||||
require.NoError(t, queue.Add(&wire.DatagramFrame{Data: []byte{0}}))
|
||||
}
|
||||
errChan := make(chan error, 1)
|
||||
go func() { errChan <- queue.Add(&wire.DatagramFrame{Data: []byte("foobar")}) }()
|
||||
|
||||
select {
|
||||
case <-errChan:
|
||||
t.Fatal("expected to not receive error")
|
||||
case <-time.After(scaleDuration(10 * time.Millisecond)):
|
||||
}
|
||||
synctest.Wait()
|
||||
|
||||
// peeking doesn't remove the datagram from the queue...
|
||||
require.NotNil(t, queue.Peek())
|
||||
select {
|
||||
case <-errChan:
|
||||
t.Fatal("expected to not receive error")
|
||||
case <-time.After(scaleDuration(10 * time.Millisecond)):
|
||||
}
|
||||
select {
|
||||
case <-errChan:
|
||||
t.Fatal("expected to not receive error")
|
||||
default:
|
||||
}
|
||||
|
||||
// ...but popping does
|
||||
queue.Pop()
|
||||
select {
|
||||
case err := <-errChan:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
// pop all the remaining datagrams
|
||||
for i := 1; i < maxDatagramSendQueueLen; i++ {
|
||||
// peeking doesn't remove the datagram from the queue...
|
||||
require.NotNil(t, queue.Peek())
|
||||
synctest.Wait()
|
||||
select {
|
||||
case <-errChan:
|
||||
t.Fatal("expected to not receive error")
|
||||
default:
|
||||
}
|
||||
|
||||
// ...but popping does
|
||||
queue.Pop()
|
||||
}
|
||||
f := queue.Peek()
|
||||
require.NotNil(t, f)
|
||||
require.Equal(t, &wire.DatagramFrame{Data: []byte("foobar")}, f)
|
||||
synctest.Wait()
|
||||
select {
|
||||
case err := <-errChan:
|
||||
require.NoError(t, err)
|
||||
default:
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
// pop all the remaining datagrams
|
||||
for range maxDatagramSendQueueLen - 1 {
|
||||
queue.Pop()
|
||||
}
|
||||
f := queue.Peek()
|
||||
require.NotNil(t, f)
|
||||
require.Equal(t, &wire.DatagramFrame{Data: []byte("foobar")}, f)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDatagramQueueReceive(t *testing.T) {
|
||||
@@ -81,81 +87,94 @@ func TestDatagramQueueReceive(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDatagramQueueReceiveBlocking(t *testing.T) {
|
||||
queue := newDatagramQueue(func() {}, utils.DefaultLogger)
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
queue := newDatagramQueue(func() {}, utils.DefaultLogger)
|
||||
|
||||
// block until a new frame is received
|
||||
type result struct {
|
||||
data []byte
|
||||
err error
|
||||
}
|
||||
resultChan := make(chan result, 1)
|
||||
go func() {
|
||||
data, err := queue.Receive(context.Background())
|
||||
resultChan <- result{data, err}
|
||||
}()
|
||||
// block until a new frame is received
|
||||
type result struct {
|
||||
data []byte
|
||||
err error
|
||||
}
|
||||
resultChan := make(chan result, 1)
|
||||
go func() {
|
||||
data, err := queue.Receive(context.Background())
|
||||
resultChan <- result{data, err}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-resultChan:
|
||||
t.Fatal("expected to not receive result")
|
||||
case <-time.After(scaleDuration(10 * time.Millisecond)):
|
||||
}
|
||||
queue.HandleDatagramFrame(&wire.DatagramFrame{Data: []byte("foobar")})
|
||||
select {
|
||||
case result := <-resultChan:
|
||||
require.NoError(t, result.err)
|
||||
require.Equal(t, []byte("foobar"), result.data)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
synctest.Wait()
|
||||
|
||||
// unblock when the context is canceled
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := queue.Receive(ctx)
|
||||
errChan <- err
|
||||
}()
|
||||
select {
|
||||
case <-errChan:
|
||||
t.Fatal("expected to not receive error")
|
||||
case <-time.After(scaleDuration(10 * time.Millisecond)):
|
||||
}
|
||||
cancel()
|
||||
select {
|
||||
case err := <-errChan:
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
select {
|
||||
case <-resultChan:
|
||||
t.Fatal("expected to not receive result")
|
||||
default:
|
||||
}
|
||||
queue.HandleDatagramFrame(&wire.DatagramFrame{Data: []byte("foobar")})
|
||||
synctest.Wait()
|
||||
select {
|
||||
case result := <-resultChan:
|
||||
require.NoError(t, result.err)
|
||||
require.Equal(t, []byte("foobar"), result.data)
|
||||
default:
|
||||
t.Fatal("should have received a datagram frame")
|
||||
}
|
||||
|
||||
// unblock when the context is canceled
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := queue.Receive(ctx)
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
synctest.Wait()
|
||||
select {
|
||||
case <-errChan:
|
||||
t.Fatal("expected to not receive error")
|
||||
default:
|
||||
}
|
||||
|
||||
cancel()
|
||||
synctest.Wait()
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
default:
|
||||
t.Fatal("should have received a context canceled error")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestDatagramQueueClose(t *testing.T) {
|
||||
queue := newDatagramQueue(func() {}, utils.DefaultLogger)
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
queue := newDatagramQueue(func() {}, utils.DefaultLogger)
|
||||
|
||||
for i := 0; i < maxDatagramSendQueueLen; i++ {
|
||||
require.NoError(t, queue.Add(&wire.DatagramFrame{Data: []byte{0}}))
|
||||
}
|
||||
errChan1 := make(chan error, 1)
|
||||
go func() { errChan1 <- queue.Add(&wire.DatagramFrame{Data: []byte("foobar")}) }()
|
||||
errChan2 := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := queue.Receive(context.Background())
|
||||
errChan2 <- err
|
||||
}()
|
||||
for range maxDatagramSendQueueLen {
|
||||
require.NoError(t, queue.Add(&wire.DatagramFrame{Data: []byte{0}}))
|
||||
}
|
||||
errChan1 := make(chan error, 1)
|
||||
go func() { errChan1 <- queue.Add(&wire.DatagramFrame{Data: []byte("foobar")}) }()
|
||||
errChan2 := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := queue.Receive(context.Background())
|
||||
errChan2 <- err
|
||||
}()
|
||||
|
||||
queue.CloseWithError(assert.AnError)
|
||||
queue.CloseWithError(assert.AnError)
|
||||
synctest.Wait()
|
||||
|
||||
select {
|
||||
case err := <-errChan1:
|
||||
require.ErrorIs(t, err, assert.AnError)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
select {
|
||||
case err := <-errChan1:
|
||||
require.ErrorIs(t, err, assert.AnError)
|
||||
default:
|
||||
t.Fatal("should have received an error")
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errChan2:
|
||||
require.ErrorIs(t, err, assert.AnError)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
select {
|
||||
case err := <-errChan2:
|
||||
require.ErrorIs(t, err, assert.AnError)
|
||||
default:
|
||||
t.Fatal("should have received an error")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user