From 1e09cc1ef4e7a42acaccbad9ebe73b7416c66cb3 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 23 Jan 2019 15:55:44 +0700 Subject: [PATCH] add an integration test for canceling the read side of streams --- integrationtests/self/cancelation_test.go | 156 ++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 integrationtests/self/cancelation_test.go diff --git a/integrationtests/self/cancelation_test.go b/integrationtests/self/cancelation_test.go new file mode 100644 index 000000000..08de80d04 --- /dev/null +++ b/integrationtests/self/cancelation_test.go @@ -0,0 +1,156 @@ +package self_test + +import ( + "crypto/tls" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net" + "sync" + "sync/atomic" + + quic "github.com/lucas-clemente/quic-go" + "github.com/lucas-clemente/quic-go/integrationtests/tools/testserver" + "github.com/lucas-clemente/quic-go/internal/testdata" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Stream Cancelations", func() { + Context("canceling the read side", func() { + const numStreams = 80 + var server quic.Listener + + runServer := func() <-chan int32 { + numCanceledStreamsChan := make(chan int32) + var err error + server, err = quic.ListenAddr("localhost:0", testdata.GetTLSConfig(), nil) + Expect(err).ToNot(HaveOccurred()) + + var canceledCounter int32 + go func() { + defer GinkgoRecover() + var wg sync.WaitGroup + wg.Add(numStreams) + sess, err := server.Accept() + Expect(err).ToNot(HaveOccurred()) + for i := 0; i < numStreams; i++ { + go func() { + defer GinkgoRecover() + defer wg.Done() + str, err := sess.OpenUniStreamSync() + Expect(err).ToNot(HaveOccurred()) + if _, err = str.Write(testserver.PRData); err != nil { + Expect(err).To(MatchError(fmt.Sprintf("Stream %d was reset with error code %d", str.StreamID(), str.StreamID()))) + atomic.AddInt32(&canceledCounter, 1) + return + } + Expect(str.Close()).To(Succeed()) + }() + } + wg.Wait() + numCanceledStreamsChan <- atomic.LoadInt32(&canceledCounter) + }() + return numCanceledStreamsChan + } + + AfterEach(func() { + Expect(server.Close()).To(Succeed()) + }) + + It("downloads when the client immediately cancels most streams", func() { + serverCanceledCounterChan := runServer() + sess, err := quic.DialAddr( + fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port), + &tls.Config{RootCAs: testdata.GetRootCA()}, + &quic.Config{MaxIncomingUniStreams: numStreams / 2}, + ) + Expect(err).ToNot(HaveOccurred()) + + var canceledCounter int32 + var wg sync.WaitGroup + wg.Add(numStreams) + for i := 0; i < numStreams; i++ { + go func() { + defer GinkgoRecover() + defer wg.Done() + str, err := sess.AcceptUniStream() + Expect(err).ToNot(HaveOccurred()) + // cancel around 2/3 of the streams + if rand.Int31()%3 != 0 { + atomic.AddInt32(&canceledCounter, 1) + Expect(str.CancelRead(quic.ErrorCode(str.StreamID()))).To(Succeed()) + return + } + data, err := ioutil.ReadAll(str) + Expect(err).ToNot(HaveOccurred()) + Expect(data).To(Equal(testserver.PRData)) + }() + } + wg.Wait() + + var serverCanceledCounter int32 + Eventually(serverCanceledCounterChan).Should(Receive(&serverCanceledCounter)) + Expect(sess.Close()).To(Succeed()) + + clientCanceledCounter := atomic.LoadInt32(&canceledCounter) + // The server will only count a stream as being reset if learns about the cancelation before it finished writing all data. + Expect(clientCanceledCounter).To(BeNumerically(">=", serverCanceledCounter)) + fmt.Fprintf(GinkgoWriter, "Canceled reading on %d of %d streams.\n", clientCanceledCounter, numStreams) + Expect(clientCanceledCounter).To(BeNumerically(">", numStreams/10)) + Expect(numStreams - clientCanceledCounter).To(BeNumerically(">", numStreams/10)) + }) + + It("downloads when the client cancels streams after reading from them for a bit", func() { + serverCanceledCounterChan := runServer() + + sess, err := quic.DialAddr( + fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port), + &tls.Config{RootCAs: testdata.GetRootCA()}, + &quic.Config{MaxIncomingUniStreams: numStreams / 2}, + ) + Expect(err).ToNot(HaveOccurred()) + + var canceledCounter int32 + var wg sync.WaitGroup + wg.Add(numStreams) + for i := 0; i < numStreams; i++ { + go func() { + defer GinkgoRecover() + defer wg.Done() + str, err := sess.AcceptUniStream() + Expect(err).ToNot(HaveOccurred()) + // only read some data from about 1/3 of the streams + if rand.Int31()%3 != 0 { + length := int(rand.Int31n(int32(len(testserver.PRData) - 1))) + fmt.Fprintf(GinkgoWriter, "Reading %d of %d bytes from stream %d.\n", length, len(testserver.PRData), str.StreamID()) + data, err := ioutil.ReadAll(io.LimitReader(str, int64(length))) + Expect(err).ToNot(HaveOccurred()) + Expect(str.CancelRead(quic.ErrorCode(str.StreamID()))).To(Succeed()) + Expect(data).To(Equal(testserver.PRData[:length])) + atomic.AddInt32(&canceledCounter, 1) + return + } + fmt.Fprintf(GinkgoWriter, "Reading all data from stream %d.\n", str.StreamID()) + data, err := ioutil.ReadAll(str) + Expect(err).ToNot(HaveOccurred()) + Expect(data).To(Equal(testserver.PRData)) + }() + } + wg.Wait() + + var serverCanceledCounter int32 + Eventually(serverCanceledCounterChan).Should(Receive(&serverCanceledCounter)) + Expect(sess.Close()).To(Succeed()) + + clientCanceledCounter := atomic.LoadInt32(&canceledCounter) + // The server will only count a stream as being reset if learns about the cancelation before it finished writing all data. + Expect(clientCanceledCounter).To(BeNumerically(">=", serverCanceledCounter)) + fmt.Fprintf(GinkgoWriter, "Canceled reading on %d of %d streams.\n", clientCanceledCounter, numStreams) + Expect(clientCanceledCounter).To(BeNumerically(">", numStreams/10)) + Expect(numStreams - clientCanceledCounter).To(BeNumerically(">", numStreams/10)) + }) + }) +})