diff --git a/integrationtests/self/cancelation_test.go b/integrationtests/self/cancelation_test.go index 08de80d0..7cd2c9d5 100644 --- a/integrationtests/self/cancelation_test.go +++ b/integrationtests/self/cancelation_test.go @@ -19,8 +19,9 @@ import ( ) var _ = Describe("Stream Cancelations", func() { + const numStreams = 80 + Context("canceling the read side", func() { - const numStreams = 80 var server quic.Listener runServer := func() <-chan int32 { @@ -125,7 +126,6 @@ var _ = Describe("Stream Cancelations", func() { // only read some data from about 1/3 of the streams if rand.Int31()%3 != 0 { length := int(rand.Int31n(int32(len(testserver.PRData) - 1))) - fmt.Fprintf(GinkgoWriter, "Reading %d of %d bytes from stream %d.\n", length, len(testserver.PRData), str.StreamID()) data, err := ioutil.ReadAll(io.LimitReader(str, int64(length))) Expect(err).ToNot(HaveOccurred()) Expect(str.CancelRead(quic.ErrorCode(str.StreamID()))).To(Succeed()) @@ -133,7 +133,6 @@ var _ = Describe("Stream Cancelations", func() { atomic.AddInt32(&canceledCounter, 1) return } - fmt.Fprintf(GinkgoWriter, "Reading all data from stream %d.\n", str.StreamID()) data, err := ioutil.ReadAll(str) Expect(err).ToNot(HaveOccurred()) Expect(data).To(Equal(testserver.PRData)) @@ -153,4 +152,108 @@ var _ = Describe("Stream Cancelations", func() { Expect(numStreams - clientCanceledCounter).To(BeNumerically(">", numStreams/10)) }) }) + + Context("canceling the write side", func() { + runClient := func(server quic.Listener) int32 /* number of canceled streams */ { + sess, err := quic.DialAddr( + fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port), + &tls.Config{RootCAs: testdata.GetRootCA()}, + &quic.Config{MaxIncomingUniStreams: numStreams / 2}, + ) + Expect(err).ToNot(HaveOccurred()) + + var wg sync.WaitGroup + var counter int32 + wg.Add(numStreams) + for i := 0; i < numStreams; i++ { + go func() { + defer GinkgoRecover() + defer wg.Done() + str, err := sess.AcceptUniStream() + Expect(err).ToNot(HaveOccurred()) + data, err := ioutil.ReadAll(str) + if err != nil { + atomic.AddInt32(&counter, 1) + Expect(err).To(MatchError(fmt.Sprintf("Stream %d was reset with error code %d", str.StreamID(), str.StreamID()))) + return + } + Expect(data).To(Equal(testserver.PRData)) + }() + } + wg.Wait() + + streamCount := atomic.LoadInt32(&counter) + fmt.Fprintf(GinkgoWriter, "Canceled writing on %d of %d streams\n", streamCount, numStreams) + Expect(streamCount).To(BeNumerically(">", numStreams/10)) + Expect(numStreams - streamCount).To(BeNumerically(">", numStreams/10)) + Expect(sess.Close()).To(Succeed()) + Expect(server.Close()).To(Succeed()) + return streamCount + } + + It("downloads when the server cancels some streams immediately", func() { + server, err := quic.ListenAddr("localhost:0", testdata.GetTLSConfig(), nil) + Expect(err).ToNot(HaveOccurred()) + + var canceledCounter int32 + go func() { + defer GinkgoRecover() + sess, err := server.Accept() + Expect(err).ToNot(HaveOccurred()) + for i := 0; i < numStreams; i++ { + go func() { + defer GinkgoRecover() + str, err := sess.OpenUniStreamSync() + Expect(err).ToNot(HaveOccurred()) + // cancel about 2/3 of the streams + if rand.Int31()%3 != 0 { + Expect(str.CancelWrite(quic.ErrorCode(str.StreamID()))).To(Succeed()) + atomic.AddInt32(&canceledCounter, 1) + return + } + _, err = str.Write(testserver.PRData) + Expect(err).ToNot(HaveOccurred()) + Expect(str.Close()).To(Succeed()) + }() + } + }() + + clientCanceledStreams := runClient(server) + Expect(clientCanceledStreams).To(Equal(atomic.LoadInt32(&canceledCounter))) + }) + + It("downloads when the server cancels some streams after sending some data", func() { + server, err := quic.ListenAddr("localhost:0", testdata.GetTLSConfig(), nil) + Expect(err).ToNot(HaveOccurred()) + + var canceledCounter int32 + go func() { + defer GinkgoRecover() + sess, err := server.Accept() + Expect(err).ToNot(HaveOccurred()) + for i := 0; i < numStreams; i++ { + go func() { + defer GinkgoRecover() + str, err := sess.OpenUniStreamSync() + Expect(err).ToNot(HaveOccurred()) + // only write some data from about 1/3 of the streams, then cancel + if rand.Int31()%3 != 0 { + length := int(rand.Int31n(int32(len(testserver.PRData) - 1))) + _, err = str.Write(testserver.PRData[:length]) + Expect(err).ToNot(HaveOccurred()) + Expect(str.CancelWrite(quic.ErrorCode(str.StreamID()))).To(Succeed()) + atomic.AddInt32(&canceledCounter, 1) + return + } + _, err = str.Write(testserver.PRData) + Expect(err).ToNot(HaveOccurred()) + Expect(str.Close()).To(Succeed()) + }() + } + }() + + clientCanceledStreams := runClient(server) + Expect(clientCanceledStreams).To(Equal(atomic.LoadInt32(&canceledCounter))) + }) + }) })