From 00d5d6f839aa37b0626eb10f3fe8106ed6ff02b4 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 12 Jan 2025 00:14:25 -0800 Subject: [PATCH] migrate the send queue tests away from Ginkgo (#4861) --- send_queue_test.go | 285 ++++++++++++++++++++++----------------------- 1 file changed, 141 insertions(+), 144 deletions(-) diff --git a/send_queue_test.go b/send_queue_test.go index f5513062a..d8d903b51 100644 --- a/send_queue_test.go +++ b/send_queue_test.go @@ -2,174 +2,171 @@ package quic import ( "errors" + "testing" + "time" "github.com/quic-go/quic-go/internal/protocol" + "github.com/stretchr/testify/require" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" "go.uber.org/mock/gomock" ) -var _ = Describe("Send Queue", func() { - var q sender - var c *MockSendConn +func getPacketWithContents(b []byte) *packetBuffer { + buf := getPacketBuffer() + buf.Data = buf.Data[:len(b)] + copy(buf.Data, b) + return buf +} - BeforeEach(func() { - c = NewMockSendConn(mockCtrl) - q = newSendQueue(c) - }) +func TestSendQueueSendOnePacket(t *testing.T) { + mockCtrl := gomock.NewController(t) + c := NewMockSendConn(mockCtrl) + q := newSendQueue(c) - getPacket := func(b []byte) *packetBuffer { - buf := getPacketBuffer() - buf.Data = buf.Data[:len(b)] - copy(buf.Data, b) - return buf + written := make(chan struct{}) + c.EXPECT().Write([]byte("foobar"), uint16(10), protocol.ECT1).Do( + func([]byte, uint16, protocol.ECN) error { close(written); return nil }, + ) + + done := make(chan struct{}) + go func() { + q.Run() + close(done) + }() + + q.Send(getPacketWithContents([]byte("foobar")), 10, protocol.ECT1) + + select { + case <-written: + case <-time.After(time.Second): + t.Fatal("timeout") } - It("sends a packet", func() { - p := getPacket([]byte("foobar")) - q.Send(p, 10, protocol.ECT1) // make sure the packet size is passed through to the conn + q.Close() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout") + } +} - written := make(chan struct{}) - c.EXPECT().Write([]byte("foobar"), uint16(10), protocol.ECT1).Do(func([]byte, uint16, protocol.ECN) error { close(written); return nil }) - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - q.Run() - close(done) - }() +func TestSendQueueBlocking(t *testing.T) { + mockCtrl := gomock.NewController(t) + c := NewMockSendConn(mockCtrl) + q := newSendQueue(c) - Eventually(written).Should(BeClosed()) - q.Close() - Eventually(done).Should(BeClosed()) - }) - - It("panics when Send() is called although there's no space in the queue", func() { - for i := 0; i < sendQueueCapacity; i++ { - Expect(q.WouldBlock()).To(BeFalse()) - q.Send(getPacket([]byte("foobar")), 6, protocol.ECNNon) - } - Expect(q.WouldBlock()).To(BeTrue()) - Expect(func() { q.Send(getPacket([]byte("raboof")), 6, protocol.ECNNon) }).To(Panic()) - }) - - It("signals when sending is possible again", func() { - Expect(q.WouldBlock()).To(BeFalse()) - q.Send(getPacket([]byte("foobar1")), 6, protocol.ECNNon) - Consistently(q.Available()).ShouldNot(Receive()) - - // now start sending out packets. This should free up queue space. - c.EXPECT().Write(gomock.Any(), gomock.Any(), protocol.ECNNon).MinTimes(1).MaxTimes(2) - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - q.Run() - close(done) - }() - - Eventually(q.Available()).Should(Receive()) - Expect(q.WouldBlock()).To(BeFalse()) - Expect(func() { q.Send(getPacket([]byte("foobar2")), 7, protocol.ECNNon) }).ToNot(Panic()) - - q.Close() - Eventually(done).Should(BeClosed()) - }) - - It("signals when sending is possible again, when the first write succeeded", func() { - write := make(chan struct{}, 1) - written := make(chan struct{}, 100) - // now start sending out packets. This should free up queue space. - c.EXPECT().Write(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func([]byte, uint16, protocol.ECN) error { - written <- struct{}{} - <-write + blockWrite := make(chan struct{}) + written := make(chan struct{}, 1) + c.EXPECT().Write(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func([]byte, uint16, protocol.ECN) error { + select { + case written <- struct{}{}: + default: + } + <-blockWrite return nil - }).AnyTimes() - // allow the first packet to be sent immediately - write <- struct{}{} + }, + ).AnyTimes() - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - q.Run() - close(done) - }() + done := make(chan struct{}) + go func() { + q.Run() + close(done) + }() - q.Send(getPacket([]byte("foobar")), 6, protocol.ECNNon) - <-written - - // now fill up the send queue - for i := 0; i < sendQueueCapacity; i++ { - Expect(q.WouldBlock()).To(BeFalse()) - q.Send(getPacket([]byte("foobar")), 6, protocol.ECNNon) + // +1, since one packet will be queued in the Write call + for i := 0; i < sendQueueCapacity+1; i++ { + require.False(t, q.WouldBlock()) + q.Send(getPacketWithContents([]byte("foobar")), 10, protocol.ECT1) + // make sure that the first packet is actually enqueued in the Write call + if i == 0 { + select { + case <-written: + case <-time.After(time.Second): + t.Fatal("timeout") + } } - // One more packet is queued when it's picked up by Run and written to the connection. - // In this test, it's blocked on write channel in the mocked Write call. - <-written - Eventually(q.WouldBlock()).Should(BeFalse()) - q.Send(getPacket([]byte("foobar")), 6, protocol.ECNNon) + } + require.True(t, q.WouldBlock()) + select { + case <-q.Available(): + t.Fatal("should not be available") + default: + } + require.Panics(t, func() { q.Send(getPacketWithContents([]byte("foobar")), 10, protocol.ECT1) }) - Expect(q.WouldBlock()).To(BeTrue()) - Consistently(q.Available()).ShouldNot(Receive()) - write <- struct{}{} - Eventually(q.Available()).Should(Receive()) - - // test shutdown - for i := 0; i < sendQueueCapacity; i++ { - write <- struct{}{} - } + // allow one packet to be sent + blockWrite <- struct{}{} + select { + case <-written: + case <-time.After(time.Second): + t.Fatal("timeout") + } + select { + case <-q.Available(): + require.False(t, q.WouldBlock()) + case <-time.After(time.Second): + t.Fatal("timeout") + } + // when calling Close, all packets are first sent out + closed := make(chan struct{}) + go func() { q.Close() - Eventually(done).Should(BeClosed()) - }) + close(closed) + }() - It("does not block pending send after the queue has stopped running", func() { - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - q.Run() - close(done) - }() + select { + case <-closed: + t.Fatal("Close should have blocked") + case <-time.After(scaleDuration(10 * time.Millisecond)): + } - // the run loop exits if there is a write error - testErr := errors.New("test error") - c.EXPECT().Write(gomock.Any(), gomock.Any(), gomock.Any()).Return(testErr) - q.Send(getPacket([]byte("foobar")), 6, protocol.ECNNon) - Eventually(done).Should(BeClosed()) + for i := 0; i < sendQueueCapacity; i++ { + blockWrite <- struct{}{} + } + select { + case <-closed: + case <-time.After(time.Second): + t.Fatal("timeout") + } + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout") + } +} - sent := make(chan struct{}) - go func() { - defer GinkgoRecover() - q.Send(getPacket([]byte("raboof")), 6, protocol.ECNNon) - q.Send(getPacket([]byte("quux")), 4, protocol.ECNNon) - close(sent) - }() +func TestSendQueueWriteError(t *testing.T) { + mockCtrl := gomock.NewController(t) + c := NewMockSendConn(mockCtrl) + q := newSendQueue(c) - Eventually(sent).Should(BeClosed()) - }) + c.EXPECT().Write(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("test error")) + q.Send(getPacketWithContents([]byte("foobar")), 6, protocol.ECNNon) - It("blocks Close() until the packet has been sent out", func() { - written := make(chan []byte) - c.EXPECT().Write(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(p []byte, _ uint16, _ protocol.ECN) error { written <- p; return nil }) - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - q.Run() - close(done) - }() + errChan := make(chan error, 1) + go func() { errChan <- q.Run() }() - q.Send(getPacket([]byte("foobar")), 6, protocol.ECNNon) + select { + case err := <-errChan: + require.EqualError(t, err, "test error") + case <-time.After(time.Second): + t.Fatal("timeout") + } - closed := make(chan struct{}) - go func() { - defer GinkgoRecover() - q.Close() - close(closed) - }() + // further calls to Send should not block + sent := make(chan struct{}) + go func() { + defer close(sent) + for i := 0; i < 2*sendQueueCapacity; i++ { + q.Send(getPacketWithContents([]byte("raboof")), 6, protocol.ECNNon) + } + }() - Consistently(closed).ShouldNot(BeClosed()) - // now write the packet - Expect(written).To(Receive()) - Eventually(done).Should(BeClosed()) - Eventually(closed).Should(BeClosed()) - }) -}) + select { + case <-sent: + case <-time.After(time.Second): + t.Fatal("timeout") + } +}