forked from quic-go/quic-go
add an integration tests that cancels streams frequently
This commit is contained in:
@@ -631,4 +631,92 @@ var _ = Describe("Stream Cancelations", func() {
|
||||
Expect(server.Close()).To(Succeed())
|
||||
})
|
||||
})
|
||||
|
||||
It("doesn't run into any errors when streams are canceled all the time", func() {
|
||||
const maxIncomingStreams = 1000
|
||||
server, err := quic.ListenAddr(
|
||||
"localhost:0",
|
||||
getTLSConfig(),
|
||||
getQuicConfig(&quic.Config{MaxIncomingStreams: maxIncomingStreams, MaxIdleTimeout: 10 * time.Second}),
|
||||
)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2 * 4 * maxIncomingStreams)
|
||||
handleStream := func(str quic.Stream) {
|
||||
str.SetDeadline(time.Now().Add(time.Second))
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if rand.Int31()%2 == 0 {
|
||||
defer GinkgoRecover()
|
||||
io.ReadAll(str)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if rand.Int31()%2 == 0 {
|
||||
str.Write([]byte("foobar"))
|
||||
if rand.Int31()%2 == 0 {
|
||||
str.Close()
|
||||
}
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// Make sure we at least send out *something* for the last stream,
|
||||
// otherwise the peer might never receive this anything for this stream.
|
||||
if rand.Int31()%2 == 0 || str.StreamID() == 4*(maxIncomingStreams-1) {
|
||||
str.CancelWrite(1234)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if rand.Int31()%2 == 0 {
|
||||
str.CancelRead(1234)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
serverRunning := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
defer close(serverRunning)
|
||||
conn, err := server.Accept(context.Background())
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
for {
|
||||
str, err := conn.AcceptStream(context.Background())
|
||||
if err != nil {
|
||||
// Make sure the session is closed regularly.
|
||||
Expect(err).To(BeAssignableToTypeOf(&quic.ApplicationError{}))
|
||||
return
|
||||
}
|
||||
handleStream(str)
|
||||
}
|
||||
}()
|
||||
|
||||
sess, err := quic.DialAddr(
|
||||
fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port),
|
||||
getTLSClientConfig(),
|
||||
getQuicConfig(&quic.Config{}),
|
||||
)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
for i := 0; i < maxIncomingStreams; i++ {
|
||||
str, err := sess.OpenStreamSync(context.Background())
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
handleStream(str)
|
||||
}
|
||||
|
||||
// We don't expect to accept any stream here.
|
||||
// We're just making sure the session stays open and there's no error.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
_, err = sess.AcceptStream(ctx)
|
||||
Expect(err).To(MatchError(context.DeadlineExceeded))
|
||||
|
||||
wg.Wait()
|
||||
|
||||
Expect(sess.CloseWithError(0, "")).To(Succeed())
|
||||
Eventually(serverRunning).Should(BeClosed())
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user