From 1aa35722a164142c03376438b137befb7572d3ed Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 16 Oct 2020 11:26:08 +0700 Subject: [PATCH 1/3] speed up the unit tests in the outgoing streams map --- streams_map_outgoing_generic_test.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/streams_map_outgoing_generic_test.go b/streams_map_outgoing_generic_test.go index 49dac7811..a2dabd05c 100644 --- a/streams_map_outgoing_generic_test.go +++ b/streams_map_outgoing_generic_test.go @@ -18,6 +18,15 @@ var _ = Describe("Streams Map (outgoing)", func() { mockSender *MockStreamSender ) + // waitForEnqueued waits until there are n go routines waiting on OpenStreamSync() + waitForEnqueued := func(n int) { + Eventually(func() bool { + m.mutex.Lock() + defer m.mutex.Unlock() + return len(m.openQueue) == n + }).Should(BeTrue()) + } + BeforeEach(func() { newItem = func(num protocol.StreamNum) item { return &mockGenericStream{num: num} @@ -124,8 +133,8 @@ var _ = Describe("Streams Map (outgoing)", func() { Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1))) close(done) }() + waitForEnqueued(1) - Consistently(done).ShouldNot(BeClosed()) m.SetMaxStream(1) Eventually(done).Should(BeClosed()) }) @@ -140,12 +149,12 @@ var _ = Describe("Streams Map (outgoing)", func() { Expect(err).To(MatchError("context canceled")) close(done) }() + waitForEnqueued(1) - Consistently(done).ShouldNot(BeClosed()) cancel() Eventually(done).Should(BeClosed()) - // make sure that the next stream openend is stream 1 + // make sure that the next stream opened is stream 1 m.SetMaxStream(1000) str, err := m.OpenStream() Expect(err).ToNot(HaveOccurred()) @@ -162,7 +171,8 @@ var _ = Describe("Streams Map (outgoing)", func() { Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1))) close(done1) }() - Consistently(done1).ShouldNot(BeClosed()) + waitForEnqueued(1) + done2 := make(chan struct{}) go func() { defer GinkgoRecover() @@ -171,7 +181,7 @@ var _ = Describe("Streams Map (outgoing)", func() { Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2))) close(done2) }() - Consistently(done2).ShouldNot(BeClosed()) + waitForEnqueued(2) m.SetMaxStream(1) Eventually(done1).Should(BeClosed()) @@ -195,14 +205,14 @@ var _ = Describe("Streams Map (outgoing)", func() { Expect(err).ToNot(HaveOccurred()) done <- struct{}{} }() - Consistently(done).ShouldNot(Receive()) + waitForEnqueued(2) go func() { defer GinkgoRecover() _, err := m.OpenStreamSync(context.Background()) Expect(err).To(MatchError("test done")) done <- struct{}{} }() - Consistently(done).ShouldNot(Receive()) + waitForEnqueued(3) m.SetMaxStream(2) Eventually(done).Should(Receive()) @@ -223,7 +233,7 @@ var _ = Describe("Streams Map (outgoing)", func() { Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1))) close(openedSync) }() - Consistently(openedSync).ShouldNot(BeClosed()) + waitForEnqueued(1) start := make(chan struct{}) openend := make(chan struct{}) From e94b5e8234c81a43092ce300e5a19a038bdc2754 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 16 Oct 2020 11:47:11 +0700 Subject: [PATCH 2/3] fix busy-looping in OpenStreamSync --- streams_map_outgoing_bidi.go | 12 +++++++++--- streams_map_outgoing_generic.go | 12 +++++++++--- streams_map_outgoing_uni.go | 12 +++++++++--- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/streams_map_outgoing_bidi.go b/streams_map_outgoing_bidi.go index 963d4ea0e..62524abe4 100644 --- a/streams_map_outgoing_bidi.go +++ b/streams_map_outgoing_bidi.go @@ -106,6 +106,7 @@ func (m *outgoingBidiStreamsMap) OpenStreamSync(ctx context.Context) (streamI, e } str := m.openStream() delete(m.openQueue, queuePos) + m.lowestInQueue = queuePos + 1 m.unblockOpenSync() return str, nil } @@ -172,8 +173,10 @@ func (m *outgoingBidiStreamsMap) SetMaxStream(num protocol.StreamNum) { m.maxStream = num m.blockedSent = false m.unblockOpenSync() + // TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame } +// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream func (m *outgoingBidiStreamsMap) unblockOpenSync() { if len(m.openQueue) == 0 { return @@ -183,9 +186,12 @@ func (m *outgoingBidiStreamsMap) unblockOpenSync() { if !ok { // entry was deleted because the context was canceled continue } - close(c) - m.openQueue[qp] = nil - m.lowestInQueue = qp + 1 + // unblockOpenSync is called both from OpenStreamSync and from SetMaxStream. + // It's sufficient to only unblock OpenStreamSync once. + select { + case c <- struct{}{}: + default: + } return } } diff --git a/streams_map_outgoing_generic.go b/streams_map_outgoing_generic.go index 23f1c6de2..ed34be6be 100644 --- a/streams_map_outgoing_generic.go +++ b/streams_map_outgoing_generic.go @@ -104,6 +104,7 @@ func (m *outgoingItemsMap) OpenStreamSync(ctx context.Context) (item, error) { } str := m.openStream() delete(m.openQueue, queuePos) + m.lowestInQueue = queuePos + 1 m.unblockOpenSync() return str, nil } @@ -170,8 +171,10 @@ func (m *outgoingItemsMap) SetMaxStream(num protocol.StreamNum) { m.maxStream = num m.blockedSent = false m.unblockOpenSync() + // TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame } +// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream func (m *outgoingItemsMap) unblockOpenSync() { if len(m.openQueue) == 0 { return @@ -181,9 +184,12 @@ func (m *outgoingItemsMap) unblockOpenSync() { if !ok { // entry was deleted because the context was canceled continue } - close(c) - m.openQueue[qp] = nil - m.lowestInQueue = qp + 1 + // unblockOpenSync is called both from OpenStreamSync and from SetMaxStream. + // It's sufficient to only unblock OpenStreamSync once. + select { + case c <- struct{}{}: + default: + } return } } diff --git a/streams_map_outgoing_uni.go b/streams_map_outgoing_uni.go index 4f4520898..bffd21493 100644 --- a/streams_map_outgoing_uni.go +++ b/streams_map_outgoing_uni.go @@ -106,6 +106,7 @@ func (m *outgoingUniStreamsMap) OpenStreamSync(ctx context.Context) (sendStreamI } str := m.openStream() delete(m.openQueue, queuePos) + m.lowestInQueue = queuePos + 1 m.unblockOpenSync() return str, nil } @@ -172,8 +173,10 @@ func (m *outgoingUniStreamsMap) SetMaxStream(num protocol.StreamNum) { m.maxStream = num m.blockedSent = false m.unblockOpenSync() + // TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame } +// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream func (m *outgoingUniStreamsMap) unblockOpenSync() { if len(m.openQueue) == 0 { return @@ -183,9 +186,12 @@ func (m *outgoingUniStreamsMap) unblockOpenSync() { if !ok { // entry was deleted because the context was canceled continue } - close(c) - m.openQueue[qp] = nil - m.lowestInQueue = qp + 1 + // unblockOpenSync is called both from OpenStreamSync and from SetMaxStream. + // It's sufficient to only unblock OpenStreamSync once. + select { + case c <- struct{}{}: + default: + } return } } From 5aa16eb1cff99ce247ea171ff301f696bffb19f5 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 16 Oct 2020 17:02:00 +0700 Subject: [PATCH 3/3] add some randomized tests for the outgoing streams map --- streams_map_outgoing_generic_test.go | 172 ++++++++++++++++++++++++++- 1 file changed, 169 insertions(+), 3 deletions(-) diff --git a/streams_map_outgoing_generic_test.go b/streams_map_outgoing_generic_test.go index a2dabd05c..f073f10ef 100644 --- a/streams_map_outgoing_generic_test.go +++ b/streams_map_outgoing_generic_test.go @@ -3,6 +3,11 @@ package quic import ( "context" "errors" + "fmt" + "math/rand" + "sort" + "sync" + "time" "github.com/golang/mock/gomock" "github.com/lucas-clemente/quic-go/internal/protocol" @@ -20,11 +25,11 @@ var _ = Describe("Streams Map (outgoing)", func() { // waitForEnqueued waits until there are n go routines waiting on OpenStreamSync() waitForEnqueued := func(n int) { - Eventually(func() bool { + Eventually(func() int { m.mutex.Lock() defer m.mutex.Unlock() - return len(m.openQueue) == n - }).Should(BeTrue()) + return len(m.openQueue) + }, 50*time.Millisecond, 50*time.Microsecond).Should(Equal(n)) } BeforeEach(func() { @@ -190,6 +195,45 @@ var _ = Describe("Streams Map (outgoing)", func() { Eventually(done2).Should(BeClosed()) }) + It("opens streams in the right order, when one of the contexts is canceled", func() { + mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() + done1 := make(chan struct{}) + go func() { + defer GinkgoRecover() + str, err := m.OpenStreamSync(context.Background()) + Expect(err).ToNot(HaveOccurred()) + Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1))) + close(done1) + }() + waitForEnqueued(1) + + done2 := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + defer GinkgoRecover() + _, err := m.OpenStreamSync(ctx) + Expect(err).To(MatchError(context.Canceled)) + close(done2) + }() + waitForEnqueued(2) + + done3 := make(chan struct{}) + go func() { + defer GinkgoRecover() + str, err := m.OpenStreamSync(context.Background()) + Expect(err).ToNot(HaveOccurred()) + Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2))) + close(done3) + }() + waitForEnqueued(3) + + cancel() + Eventually(done2).Should(BeClosed()) + m.SetMaxStream(1000) + Eventually(done1).Should(BeClosed()) + Eventually(done3).Should(BeClosed()) + }) + It("unblocks multiple OpenStreamSync calls at the same time", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() done := make(chan struct{}) @@ -319,4 +363,126 @@ var _ = Describe("Streams Map (outgoing)", func() { expectTooManyStreamsError(err) }) }) + + Context("randomized tests", func() { + It("opens streams", func() { + rand.Seed(GinkgoRandomSeed()) + const n = 100 + fmt.Fprintf(GinkgoWriter, "Opening %d streams concurrently.\n", n) + + // TODO(#2826): check stream limits sent in STREAMS_BLOCKED frames + mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() + done := make(map[int]chan struct{}) + for i := 1; i <= n; i++ { + c := make(chan struct{}) + done[i] = c + + go func(doneChan chan struct{}, id protocol.StreamNum) { + defer GinkgoRecover() + defer close(doneChan) + str, err := m.OpenStreamSync(context.Background()) + Expect(err).ToNot(HaveOccurred()) + Expect(str.(*mockGenericStream).num).To(Equal(id)) + }(c, protocol.StreamNum(i)) + waitForEnqueued(i) + } + + var limit int + for limit < n { + limit += rand.Intn(n/5) + 1 + fmt.Fprintf(GinkgoWriter, "Setting stream limit to %d.\n", limit) + m.SetMaxStream(protocol.StreamNum(limit)) + for i := 1; i <= n; i++ { + if i <= limit { + Eventually(done[i]).Should(BeClosed()) + } else { + Expect(done[i]).ToNot(BeClosed()) + } + } + str, err := m.OpenStream() + if limit <= n { + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal(errTooManyOpenStreams.Error())) + } else { + Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(n + 1))) + } + } + }) + + It("opens streams, when some of them are getting canceled", func() { + rand.Seed(GinkgoRandomSeed()) + const n = 100 + fmt.Fprintf(GinkgoWriter, "Opening %d streams concurrently.\n", n) + + // TODO(#2826): check stream limits sent in STREAMS_BLOCKED frames + mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes() + + ctx, cancel := context.WithCancel(context.Background()) + streamsToCancel := make(map[protocol.StreamNum]struct{}) // used as a set + for i := 0; i < 10; i++ { + id := protocol.StreamNum(rand.Intn(n) + 1) + fmt.Fprintf(GinkgoWriter, "Canceling stream %d.\n", id) + streamsToCancel[id] = struct{}{} + } + + streamWillBeCanceled := func(id protocol.StreamNum) bool { + _, ok := streamsToCancel[id] + return ok + } + + var streamIDs []int + var mutex sync.Mutex + done := make(map[int]chan struct{}) + for i := 1; i <= n; i++ { + c := make(chan struct{}) + done[i] = c + + go func(doneChan chan struct{}, id protocol.StreamNum) { + defer GinkgoRecover() + defer close(doneChan) + cont := context.Background() + if streamWillBeCanceled(id) { + cont = ctx + } + str, err := m.OpenStreamSync(cont) + if streamWillBeCanceled(id) { + Expect(err).To(MatchError(context.Canceled)) + return + } + Expect(err).ToNot(HaveOccurred()) + mutex.Lock() + streamIDs = append(streamIDs, int(str.(*mockGenericStream).num)) + mutex.Unlock() + }(c, protocol.StreamNum(i)) + waitForEnqueued(i) + } + + cancel() + for id := range streamsToCancel { + Eventually(done[int(id)]).Should(BeClosed()) + } + var limit int + numStreams := n - len(streamsToCancel) + for limit < numStreams { + limit += rand.Intn(n/5) + 1 + fmt.Fprintf(GinkgoWriter, "Setting stream limit to %d.\n", limit) + m.SetMaxStream(protocol.StreamNum(limit)) + l := limit + if l > numStreams { + l = numStreams + } + Eventually(func() int { + mutex.Lock() + defer mutex.Unlock() + return len(streamIDs) + }).Should(Equal(l)) + // check that all stream IDs were used + Expect(streamIDs).To(HaveLen(l)) + sort.Ints(streamIDs) + for i := 0; i < l; i++ { + Expect(streamIDs[i]).To(Equal(i + 1)) + } + } + }) + }) })