From 344feb9ea58b3e7cf1259f8dbd922b2941a13d3a Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 16 Oct 2021 17:45:43 +0200 Subject: [PATCH 1/2] don't unlock the receive stream mutex for copying from STREAM frames --- receive_stream.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/receive_stream.go b/receive_stream.go index cca3f709..f9a1e066 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -166,13 +166,10 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame)) } - s.mutex.Unlock() - m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:]) s.readPosInFrame += m bytesRead += m - s.mutex.Lock() // when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream if !s.resetRemotely { s.flowController.AddBytesRead(protocol.ByteCount(m)) From 2c8b939d4c2df4d95165ba49ca8edab3480d6e4f Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 16 Oct 2021 19:00:26 +0200 Subject: [PATCH 2/2] add an integration tests that cancels streams frequently --- integrationtests/self/cancelation_test.go | 88 +++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/integrationtests/self/cancelation_test.go b/integrationtests/self/cancelation_test.go index 97697c84..651bb06e 100644 --- a/integrationtests/self/cancelation_test.go +++ b/integrationtests/self/cancelation_test.go @@ -631,4 +631,92 @@ var _ = Describe("Stream Cancelations", func() { Expect(server.Close()).To(Succeed()) }) }) + + It("doesn't run into any errors when streams are canceled all the time", func() { + const maxIncomingStreams = 1000 + server, err := quic.ListenAddr( + "localhost:0", + getTLSConfig(), + getQuicConfig(&quic.Config{MaxIncomingStreams: maxIncomingStreams, MaxIdleTimeout: 10 * time.Second}), + ) + Expect(err).ToNot(HaveOccurred()) + + var wg sync.WaitGroup + wg.Add(2 * 4 * maxIncomingStreams) + handleStream := func(str quic.Stream) { + str.SetDeadline(time.Now().Add(time.Second)) + go func() { + defer wg.Done() + if rand.Int31()%2 == 0 { + defer GinkgoRecover() + io.ReadAll(str) + } + }() + go func() { + defer wg.Done() + if rand.Int31()%2 == 0 { + str.Write([]byte("foobar")) + if rand.Int31()%2 == 0 { + str.Close() + } + } + }() + go func() { + defer wg.Done() + // Make sure we at least send out *something* for the last stream, + // otherwise the peer might never receive this anything for this stream. + if rand.Int31()%2 == 0 || str.StreamID() == 4*(maxIncomingStreams-1) { + str.CancelWrite(1234) + } + }() + go func() { + defer wg.Done() + if rand.Int31()%2 == 0 { + str.CancelRead(1234) + } + }() + } + + serverRunning := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(serverRunning) + conn, err := server.Accept(context.Background()) + Expect(err).ToNot(HaveOccurred()) + for { + str, err := conn.AcceptStream(context.Background()) + if err != nil { + // Make sure the session is closed regularly. + Expect(err).To(BeAssignableToTypeOf(&quic.ApplicationError{})) + return + } + handleStream(str) + } + }() + + sess, err := quic.DialAddr( + fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port), + getTLSClientConfig(), + getQuicConfig(&quic.Config{}), + ) + Expect(err).ToNot(HaveOccurred()) + + for i := 0; i < maxIncomingStreams; i++ { + str, err := sess.OpenStreamSync(context.Background()) + Expect(err).ToNot(HaveOccurred()) + handleStream(str) + } + + // We don't expect to accept any stream here. + // We're just making sure the session stays open and there's no error. + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + _, err = sess.AcceptStream(ctx) + Expect(err).To(MatchError(context.DeadlineExceeded)) + + wg.Wait() + + Expect(sess.CloseWithError(0, "")).To(Succeed()) + Eventually(serverRunning).Should(BeClosed()) + }) })