From b022d04b0fa7de5d2815a5935c11916bcb5bdbfa Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 2 Jan 2021 15:49:08 +0800 Subject: [PATCH] make the OpenStreamSync cancelation test less flaky --- integrationtests/self/cancelation_test.go | 41 ++++++++++++----------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/integrationtests/self/cancelation_test.go b/integrationtests/self/cancelation_test.go index 2c6ffbcaf..6280236af 100644 --- a/integrationtests/self/cancelation_test.go +++ b/integrationtests/self/cancelation_test.go @@ -432,7 +432,7 @@ var _ = Describe("Stream Cancelations", func() { Context("canceling the context", func() { It("downloads data when the receiving peer cancels the context for accepting streams", func() { - server, err := quic.ListenAddr("localhost:0", getTLSConfig(), nil) + server, err := quic.ListenAddr("localhost:0", getTLSConfig(), getQuicConfig(nil)) Expect(err).ToNot(HaveOccurred()) go func() { @@ -499,31 +499,33 @@ var _ = Describe("Stream Cancelations", func() { }) It("downloads data when the sending peer cancels the context for opening streams", func() { - server, err := quic.ListenAddr("localhost:0", getTLSConfig(), nil) + const ( + numStreams = 15 + maxIncomingStreams = 5 + ) + server, err := quic.ListenAddr("localhost:0", getTLSConfig(), getQuicConfig(nil)) Expect(err).ToNot(HaveOccurred()) + msg := make(chan struct{}, 1) var numCanceled int32 go func() { defer GinkgoRecover() + defer close(msg) sess, err := server.Accept(context.Background()) Expect(err).ToNot(HaveOccurred()) var numOpened int - ticker := time.NewTicker(250 * time.Microsecond) for numOpened < numStreams { - <-ticker.C - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), scaleDuration(20*time.Millisecond)) defer cancel() - // cancel accepting half of the streams - shouldCancel := rand.Int31()%2 == 0 - - if shouldCancel { - time.AfterFunc(5*time.Millisecond, cancel) - } str, err := sess.OpenUniStreamSync(ctx) if err != nil { + Expect(err).To(MatchError(context.DeadlineExceeded)) atomic.AddInt32(&numCanceled, 1) - Expect(err).To(MatchError("context canceled")) + select { + case msg <- struct{}{}: + default: + } continue } numOpened++ @@ -539,30 +541,29 @@ var _ = Describe("Stream Cancelations", func() { sess, err := quic.DialAddr( fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port), getTLSClientConfig(), - getQuicConfig(&quic.Config{MaxIncomingUniStreams: 5}), + getQuicConfig(&quic.Config{MaxIncomingUniStreams: maxIncomingStreams}), ) Expect(err).ToNot(HaveOccurred()) var wg sync.WaitGroup wg.Add(numStreams) - ticker := time.NewTicker(10 * time.Millisecond) for i := 0; i < numStreams; i++ { - <-ticker.C - go func() { + <-msg + str, err := sess.AcceptUniStream(context.Background()) + Expect(err).ToNot(HaveOccurred()) + go func(str quic.ReceiveStream) { defer GinkgoRecover() - str, err := sess.AcceptUniStream(context.Background()) - Expect(err).ToNot(HaveOccurred()) data, err := ioutil.ReadAll(str) Expect(err).ToNot(HaveOccurred()) Expect(data).To(Equal(PRData)) wg.Done() - }() + }(str) } wg.Wait() count := atomic.LoadInt32(&numCanceled) fmt.Fprintf(GinkgoWriter, "Canceled OpenStreamSync %d times\n", count) - Expect(count).To(BeNumerically(">", numStreams/5)) + Expect(count).To(BeNumerically(">=", numStreams-maxIncomingStreams)) Expect(sess.CloseWithError(0, "")).To(Succeed()) Expect(server.Close()).To(Succeed()) })