From 5aa16eb1cff99ce247ea171ff301f696bffb19f5 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 16 Oct 2020 17:02:00 +0700 Subject: [PATCH] add some randomized tests for the outgoing streams map --- streams_map_outgoing_generic_test.go | 172 ++++++++++++++++++++++++++- 1 file changed, 169 insertions(+), 3 deletions(-) diff --git a/streams_map_outgoing_generic_test.go b/streams_map_outgoing_generic_test.go index a2dabd05c..f073f10ef 100644 --- a/streams_map_outgoing_generic_test.go +++ b/streams_map_outgoing_generic_test.go @@ -3,6 +3,11 @@ package quic import ( "context" "errors" + "fmt" + "math/rand" + "sort" + "sync" + "time" "github.com/golang/mock/gomock" "github.com/lucas-clemente/quic-go/internal/protocol" @@ -20,11 +25,11 @@ var _ = Describe("Streams Map (outgoing)", func() { // waitForEnqueued waits until there are n go routines waiting on OpenStreamSync() waitForEnqueued := func(n int) { - Eventually(func() bool { + Eventually(func() int { m.mutex.Lock() defer m.mutex.Unlock() - return len(m.openQueue) == n - }).Should(BeTrue()) + return len(m.openQueue) + }, 50*time.Millisecond, 50*time.Microsecond).Should(Equal(n)) } BeforeEach(func() { @@ -190,6 +195,45 @@ var _ = Describe("Streams Map (outgoing)", func() { Eventually(done2).Should(BeClosed()) }) + It("opens streams in the right order, when one of the contexts is canceled", func() { + mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() + done1 := make(chan struct{}) + go func() { + defer GinkgoRecover() + str, err := m.OpenStreamSync(context.Background()) + Expect(err).ToNot(HaveOccurred()) + Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1))) + close(done1) + }() + waitForEnqueued(1) + + done2 := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + defer GinkgoRecover() + _, err := m.OpenStreamSync(ctx) + Expect(err).To(MatchError(context.Canceled)) + close(done2) + }() + waitForEnqueued(2) + + done3 := make(chan struct{}) + go func() { + defer GinkgoRecover() + str, err := m.OpenStreamSync(context.Background()) + Expect(err).ToNot(HaveOccurred()) + Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2))) + close(done3) + }() + waitForEnqueued(3) + + cancel() + Eventually(done2).Should(BeClosed()) + m.SetMaxStream(1000) + Eventually(done1).Should(BeClosed()) + Eventually(done3).Should(BeClosed()) + }) + It("unblocks multiple OpenStreamSync calls at the same time", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() done := make(chan struct{}) @@ -319,4 +363,126 @@ var _ = Describe("Streams Map (outgoing)", func() { expectTooManyStreamsError(err) }) }) + + Context("randomized tests", func() { + It("opens streams", func() { + rand.Seed(GinkgoRandomSeed()) + const n = 100 + fmt.Fprintf(GinkgoWriter, "Opening %d streams concurrently.\n", n) + + // TODO(#2826): check stream limits sent in STREAMS_BLOCKED frames + mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() + done := make(map[int]chan struct{}) + for i := 1; i <= n; i++ { + c := make(chan struct{}) + done[i] = c + + go func(doneChan chan struct{}, id protocol.StreamNum) { + defer GinkgoRecover() + defer close(doneChan) + str, err := m.OpenStreamSync(context.Background()) + Expect(err).ToNot(HaveOccurred()) + Expect(str.(*mockGenericStream).num).To(Equal(id)) + }(c, protocol.StreamNum(i)) + waitForEnqueued(i) + } + + var limit int + for limit < n { + limit += rand.Intn(n/5) + 1 + fmt.Fprintf(GinkgoWriter, "Setting stream limit to %d.\n", limit) + m.SetMaxStream(protocol.StreamNum(limit)) + for i := 1; i <= n; i++ { + if i <= limit { + Eventually(done[i]).Should(BeClosed()) + } else { + Expect(done[i]).ToNot(BeClosed()) + } + } + str, err := m.OpenStream() + if limit <= n { + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal(errTooManyOpenStreams.Error())) + } else { + Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(n + 1))) + } + } + }) + + It("opens streams, when some of them are getting canceled", func() { + rand.Seed(GinkgoRandomSeed()) + const n = 100 + fmt.Fprintf(GinkgoWriter, "Opening %d streams concurrently.\n", n) + + // TODO(#2826): check stream limits sent in STREAMS_BLOCKED frames + mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() + + ctx, cancel := context.WithCancel(context.Background()) + streamsToCancel := make(map[protocol.StreamNum]struct{}) // used as a set + for i := 0; i < 10; i++ { + id := protocol.StreamNum(rand.Intn(n) + 1) + fmt.Fprintf(GinkgoWriter, "Canceling stream %d.\n", id) + streamsToCancel[id] = struct{}{} + } + + streamWillBeCanceled := func(id protocol.StreamNum) bool { + _, ok := streamsToCancel[id] + return ok + } + + var streamIDs []int + var mutex sync.Mutex + done := make(map[int]chan struct{}) + for i := 1; i <= n; i++ { + c := make(chan struct{}) + done[i] = c + + go func(doneChan chan struct{}, id protocol.StreamNum) { + defer GinkgoRecover() + defer close(doneChan) + cont := context.Background() + if streamWillBeCanceled(id) { + cont = ctx + } + str, err := m.OpenStreamSync(cont) + if streamWillBeCanceled(id) { + Expect(err).To(MatchError(context.Canceled)) + return + } + Expect(err).ToNot(HaveOccurred()) + mutex.Lock() + streamIDs = append(streamIDs, int(str.(*mockGenericStream).num)) + mutex.Unlock() + }(c, protocol.StreamNum(i)) + waitForEnqueued(i) + } + + cancel() + for id := range streamsToCancel { + Eventually(done[int(id)]).Should(BeClosed()) + } + var limit int + numStreams := n - len(streamsToCancel) + for limit < numStreams { + limit += rand.Intn(n/5) + 1 + fmt.Fprintf(GinkgoWriter, "Setting stream limit to %d.\n", limit) + m.SetMaxStream(protocol.StreamNum(limit)) + l := limit + if l > numStreams { + l = numStreams + } + Eventually(func() int { + mutex.Lock() + defer mutex.Unlock() + return len(streamIDs) + }).Should(Equal(l)) + // check that all stream IDs were used + Expect(streamIDs).To(HaveLen(l)) + sort.Ints(streamIDs) + for i := 0; i < l; i++ { + Expect(streamIDs[i]).To(Equal(i + 1)) + } + } + }) + }) })