forked from quic-go/quic-go
use synctest to make the send queue tests fully deterministic (#5302)
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/quic-go/quic-go/internal/protocol"
|
||||
"github.com/quic-go/quic-go/internal/synctest"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -20,156 +21,173 @@ func getPacketWithContents(b []byte) *packetBuffer {
|
||||
}
|
||||
|
||||
func TestSendQueueSendOnePacket(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
c := NewMockSendConn(mockCtrl)
|
||||
q := newSendQueue(c)
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
c := NewMockSendConn(mockCtrl)
|
||||
q := newSendQueue(c)
|
||||
|
||||
written := make(chan struct{})
|
||||
c.EXPECT().Write([]byte("foobar"), uint16(10), protocol.ECT1).Do(
|
||||
func([]byte, uint16, protocol.ECN) error { close(written); return nil },
|
||||
)
|
||||
written := make(chan struct{})
|
||||
c.EXPECT().Write([]byte("foobar"), uint16(10), protocol.ECT1).Do(
|
||||
func([]byte, uint16, protocol.ECN) error { close(written); return nil },
|
||||
)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
q.Run()
|
||||
close(done)
|
||||
}()
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
q.Run()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
q.Send(getPacketWithContents([]byte("foobar")), 10, protocol.ECT1)
|
||||
q.Send(getPacketWithContents([]byte("foobar")), 10, protocol.ECT1)
|
||||
synctest.Wait()
|
||||
|
||||
select {
|
||||
case <-written:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
select {
|
||||
case <-written:
|
||||
default:
|
||||
t.Fatal("write should have returned")
|
||||
}
|
||||
|
||||
q.Close()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
q.Close()
|
||||
synctest.Wait()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
default:
|
||||
t.Fatal("Run should have returned")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestSendQueueBlocking(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
c := NewMockSendConn(mockCtrl)
|
||||
q := newSendQueue(c)
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
c := NewMockSendConn(mockCtrl)
|
||||
q := newSendQueue(c)
|
||||
|
||||
blockWrite := make(chan struct{})
|
||||
written := make(chan struct{}, 1)
|
||||
c.EXPECT().Write(gomock.Any(), gomock.Any(), gomock.Any()).Do(
|
||||
func([]byte, uint16, protocol.ECN) error {
|
||||
select {
|
||||
case written <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
<-blockWrite
|
||||
return nil
|
||||
},
|
||||
).AnyTimes()
|
||||
blockWrite := make(chan struct{})
|
||||
written := make(chan struct{}, 1)
|
||||
c.EXPECT().Write(gomock.Any(), gomock.Any(), gomock.Any()).Do(
|
||||
func([]byte, uint16, protocol.ECN) error {
|
||||
select {
|
||||
case written <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
<-blockWrite
|
||||
return nil
|
||||
},
|
||||
).AnyTimes()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
q.Run()
|
||||
close(done)
|
||||
}()
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
q.Run()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// +1, since one packet will be queued in the Write call
|
||||
for i := 0; i < sendQueueCapacity+1; i++ {
|
||||
require.False(t, q.WouldBlock())
|
||||
q.Send(getPacketWithContents([]byte("foobar")), 10, protocol.ECT1)
|
||||
// make sure that the first packet is actually enqueued in the Write call
|
||||
if i == 0 {
|
||||
select {
|
||||
case <-written:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
// +1, since one packet will be queued in the Write call
|
||||
for i := range sendQueueCapacity + 1 {
|
||||
require.False(t, q.WouldBlock())
|
||||
q.Send(getPacketWithContents([]byte("foobar")), 10, protocol.ECT1)
|
||||
// make sure that the first packet is actually enqueued in the Write call
|
||||
if i == 0 {
|
||||
select {
|
||||
case <-written:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
require.True(t, q.WouldBlock())
|
||||
select {
|
||||
case <-q.Available():
|
||||
t.Fatal("should not be available")
|
||||
default:
|
||||
}
|
||||
require.Panics(t, func() { q.Send(getPacketWithContents([]byte("foobar")), 10, protocol.ECT1) })
|
||||
require.True(t, q.WouldBlock())
|
||||
select {
|
||||
case <-q.Available():
|
||||
t.Fatal("should not be available")
|
||||
default:
|
||||
}
|
||||
require.Panics(t, func() { q.Send(getPacketWithContents([]byte("foobar")), 10, protocol.ECT1) })
|
||||
|
||||
// allow one packet to be sent
|
||||
blockWrite <- struct{}{}
|
||||
select {
|
||||
case <-written:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
select {
|
||||
case <-q.Available():
|
||||
require.False(t, q.WouldBlock())
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
|
||||
// when calling Close, all packets are first sent out
|
||||
closed := make(chan struct{})
|
||||
go func() {
|
||||
q.Close()
|
||||
close(closed)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-closed:
|
||||
t.Fatal("Close should have blocked")
|
||||
case <-time.After(scaleDuration(10 * time.Millisecond)):
|
||||
}
|
||||
|
||||
for i := 0; i < sendQueueCapacity; i++ {
|
||||
// allow one packet to be sent
|
||||
blockWrite <- struct{}{}
|
||||
}
|
||||
select {
|
||||
case <-closed:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
select {
|
||||
case <-written:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
select {
|
||||
case <-q.Available():
|
||||
require.False(t, q.WouldBlock())
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
|
||||
// when calling Close, all packets are first sent out
|
||||
closed := make(chan struct{})
|
||||
go func() {
|
||||
q.Close()
|
||||
close(closed)
|
||||
}()
|
||||
|
||||
synctest.Wait()
|
||||
|
||||
select {
|
||||
case <-closed:
|
||||
t.Fatal("Close should have blocked")
|
||||
default:
|
||||
}
|
||||
|
||||
for range sendQueueCapacity {
|
||||
blockWrite <- struct{}{}
|
||||
}
|
||||
synctest.Wait()
|
||||
|
||||
select {
|
||||
case <-closed:
|
||||
default:
|
||||
t.Fatal("Close should have returned")
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
default:
|
||||
t.Fatal("Run should have returned")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestSendQueueWriteError(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
c := NewMockSendConn(mockCtrl)
|
||||
q := newSendQueue(c)
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
c := NewMockSendConn(mockCtrl)
|
||||
q := newSendQueue(c)
|
||||
|
||||
c.EXPECT().Write(gomock.Any(), gomock.Any(), gomock.Any()).Return(assert.AnError)
|
||||
q.Send(getPacketWithContents([]byte("foobar")), 6, protocol.ECNNon)
|
||||
c.EXPECT().Write(gomock.Any(), gomock.Any(), gomock.Any()).Return(assert.AnError)
|
||||
q.Send(getPacketWithContents([]byte("foobar")), 6, protocol.ECNNon)
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
go func() { errChan <- q.Run() }()
|
||||
errChan := make(chan error, 1)
|
||||
go func() { errChan <- q.Run() }()
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
require.ErrorIs(t, err, assert.AnError)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
synctest.Wait()
|
||||
|
||||
// further calls to Send should not block
|
||||
sent := make(chan struct{})
|
||||
go func() {
|
||||
defer close(sent)
|
||||
for i := 0; i < 2*sendQueueCapacity; i++ {
|
||||
q.Send(getPacketWithContents([]byte("raboof")), 6, protocol.ECNNon)
|
||||
select {
|
||||
case err := <-errChan:
|
||||
require.ErrorIs(t, err, assert.AnError)
|
||||
default:
|
||||
t.Fatal("Run should have returned")
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-sent:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
// further calls to Send should not block
|
||||
sent := make(chan struct{})
|
||||
go func() {
|
||||
defer close(sent)
|
||||
for range 2 * sendQueueCapacity {
|
||||
q.Send(getPacketWithContents([]byte("raboof")), 6, protocol.ECNNon)
|
||||
}
|
||||
}()
|
||||
|
||||
synctest.Wait()
|
||||
|
||||
select {
|
||||
case <-sent:
|
||||
default:
|
||||
t.Fatal("Send should have returned")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestSendQueueSendProbe(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user