forked from quic-go/quic-go
Merge pull request #3290 from lucas-clemente/fix-receive-stream-mutex
don't unlock the receive stream mutex for copying from STREAM frames
This commit is contained in:
@@ -631,4 +631,92 @@ var _ = Describe("Stream Cancelations", func() {
|
||||
Expect(server.Close()).To(Succeed())
|
||||
})
|
||||
})
|
||||
|
||||
It("doesn't run into any errors when streams are canceled all the time", func() {
|
||||
const maxIncomingStreams = 1000
|
||||
server, err := quic.ListenAddr(
|
||||
"localhost:0",
|
||||
getTLSConfig(),
|
||||
getQuicConfig(&quic.Config{MaxIncomingStreams: maxIncomingStreams, MaxIdleTimeout: 10 * time.Second}),
|
||||
)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2 * 4 * maxIncomingStreams)
|
||||
handleStream := func(str quic.Stream) {
|
||||
str.SetDeadline(time.Now().Add(time.Second))
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if rand.Int31()%2 == 0 {
|
||||
defer GinkgoRecover()
|
||||
io.ReadAll(str)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if rand.Int31()%2 == 0 {
|
||||
str.Write([]byte("foobar"))
|
||||
if rand.Int31()%2 == 0 {
|
||||
str.Close()
|
||||
}
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// Make sure we at least send out *something* for the last stream,
|
||||
// otherwise the peer might never receive this anything for this stream.
|
||||
if rand.Int31()%2 == 0 || str.StreamID() == 4*(maxIncomingStreams-1) {
|
||||
str.CancelWrite(1234)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if rand.Int31()%2 == 0 {
|
||||
str.CancelRead(1234)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
serverRunning := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
defer close(serverRunning)
|
||||
conn, err := server.Accept(context.Background())
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
for {
|
||||
str, err := conn.AcceptStream(context.Background())
|
||||
if err != nil {
|
||||
// Make sure the session is closed regularly.
|
||||
Expect(err).To(BeAssignableToTypeOf(&quic.ApplicationError{}))
|
||||
return
|
||||
}
|
||||
handleStream(str)
|
||||
}
|
||||
}()
|
||||
|
||||
sess, err := quic.DialAddr(
|
||||
fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port),
|
||||
getTLSClientConfig(),
|
||||
getQuicConfig(&quic.Config{}),
|
||||
)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
for i := 0; i < maxIncomingStreams; i++ {
|
||||
str, err := sess.OpenStreamSync(context.Background())
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
handleStream(str)
|
||||
}
|
||||
|
||||
// We don't expect to accept any stream here.
|
||||
// We're just making sure the session stays open and there's no error.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
_, err = sess.AcceptStream(ctx)
|
||||
Expect(err).To(MatchError(context.DeadlineExceeded))
|
||||
|
||||
wg.Wait()
|
||||
|
||||
Expect(sess.CloseWithError(0, "")).To(Succeed())
|
||||
Eventually(serverRunning).Should(BeClosed())
|
||||
})
|
||||
})
|
||||
|
||||
@@ -166,13 +166,10 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
|
||||
return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
|
||||
}
|
||||
|
||||
s.mutex.Unlock()
|
||||
|
||||
m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
|
||||
s.readPosInFrame += m
|
||||
bytesRead += m
|
||||
|
||||
s.mutex.Lock()
|
||||
// when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream
|
||||
if !s.resetRemotely {
|
||||
s.flowController.AddBytesRead(protocol.ByteCount(m))
|
||||
|
||||
Reference in New Issue
Block a user