diff --git a/integrationtests/self/cancelation_test.go b/integrationtests/self/cancelation_test.go index 97697c84..651bb06e 100644 --- a/integrationtests/self/cancelation_test.go +++ b/integrationtests/self/cancelation_test.go @@ -631,4 +631,92 @@ var _ = Describe("Stream Cancelations", func() { Expect(server.Close()).To(Succeed()) }) }) + + It("doesn't run into any errors when streams are canceled all the time", func() { + const maxIncomingStreams = 1000 + server, err := quic.ListenAddr( + "localhost:0", + getTLSConfig(), + getQuicConfig(&quic.Config{MaxIncomingStreams: maxIncomingStreams, MaxIdleTimeout: 10 * time.Second}), + ) + Expect(err).ToNot(HaveOccurred()) + + var wg sync.WaitGroup + wg.Add(2 * 4 * maxIncomingStreams) + handleStream := func(str quic.Stream) { + str.SetDeadline(time.Now().Add(time.Second)) + go func() { + defer wg.Done() + if rand.Int31()%2 == 0 { + defer GinkgoRecover() + io.ReadAll(str) + } + }() + go func() { + defer wg.Done() + if rand.Int31()%2 == 0 { + str.Write([]byte("foobar")) + if rand.Int31()%2 == 0 { + str.Close() + } + } + }() + go func() { + defer wg.Done() + // Make sure we at least send out *something* for the last stream, + // otherwise the peer might never receive this anything for this stream. + if rand.Int31()%2 == 0 || str.StreamID() == 4*(maxIncomingStreams-1) { + str.CancelWrite(1234) + } + }() + go func() { + defer wg.Done() + if rand.Int31()%2 == 0 { + str.CancelRead(1234) + } + }() + } + + serverRunning := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(serverRunning) + conn, err := server.Accept(context.Background()) + Expect(err).ToNot(HaveOccurred()) + for { + str, err := conn.AcceptStream(context.Background()) + if err != nil { + // Make sure the session is closed regularly. + Expect(err).To(BeAssignableToTypeOf(&quic.ApplicationError{})) + return + } + handleStream(str) + } + }() + + sess, err := quic.DialAddr( + fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port), + getTLSClientConfig(), + getQuicConfig(&quic.Config{}), + ) + Expect(err).ToNot(HaveOccurred()) + + for i := 0; i < maxIncomingStreams; i++ { + str, err := sess.OpenStreamSync(context.Background()) + Expect(err).ToNot(HaveOccurred()) + handleStream(str) + } + + // We don't expect to accept any stream here. + // We're just making sure the session stays open and there's no error. + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + _, err = sess.AcceptStream(ctx) + Expect(err).To(MatchError(context.DeadlineExceeded)) + + wg.Wait() + + Expect(sess.CloseWithError(0, "")).To(Succeed()) + Eventually(serverRunning).Should(BeClosed()) + }) })