add an integration tests where both sides of the stream are canceled

This commit is contained in:
Marten Seemann
2019-01-23 18:58:03 +07:00
parent 5447625e79
commit 665d6d495e

View File

@@ -256,4 +256,168 @@ var _ = Describe("Stream Cancelations", func() {
Expect(clientCanceledStreams).To(Equal(atomic.LoadInt32(&canceledCounter)))
})
})
Context("canceling both read and write side", func() {
It("downloads data when both sides cancel streams immediately", func() {
server, err := quic.ListenAddr("localhost:0", testdata.GetTLSConfig(), nil)
Expect(err).ToNot(HaveOccurred())
done := make(chan struct{})
go func() {
defer GinkgoRecover()
var wg sync.WaitGroup
wg.Add(numStreams)
sess, err := server.Accept()
Expect(err).ToNot(HaveOccurred())
for i := 0; i < numStreams; i++ {
go func() {
defer GinkgoRecover()
defer wg.Done()
str, err := sess.OpenUniStreamSync()
Expect(err).ToNot(HaveOccurred())
// cancel about half of the streams
if rand.Int31()%2 == 0 {
Expect(str.CancelWrite(quic.ErrorCode(str.StreamID()))).To(Succeed())
return
}
if _, err = str.Write(testserver.PRData); err != nil {
Expect(err).To(MatchError(fmt.Sprintf("Stream %d was reset with error code %d", str.StreamID(), str.StreamID())))
return
}
Expect(str.Close()).To(Succeed())
}()
}
wg.Wait()
close(done)
}()
sess, err := quic.DialAddr(
fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port),
&tls.Config{RootCAs: testdata.GetRootCA()},
&quic.Config{MaxIncomingUniStreams: numStreams / 2},
)
Expect(err).ToNot(HaveOccurred())
var wg sync.WaitGroup
var counter int32
wg.Add(numStreams)
for i := 0; i < numStreams; i++ {
go func() {
defer GinkgoRecover()
defer wg.Done()
str, err := sess.AcceptUniStream()
Expect(err).ToNot(HaveOccurred())
// cancel around half of the streams
if rand.Int31()%2 == 0 {
Expect(str.CancelRead(quic.ErrorCode(str.StreamID()))).To(Succeed())
return
}
data, err := ioutil.ReadAll(str)
if err != nil {
Expect(err).To(MatchError(fmt.Sprintf("Stream %d was reset with error code %d", str.StreamID(), str.StreamID())))
return
}
atomic.AddInt32(&counter, 1)
Expect(data).To(Equal(testserver.PRData))
}()
}
wg.Wait()
count := atomic.LoadInt32(&counter)
Expect(count).To(BeNumerically(">", numStreams/15))
fmt.Fprintf(GinkgoWriter, "Successfully read from %d of %d streams.\n", count, numStreams)
Expect(sess.Close()).To(Succeed())
Eventually(done).Should(BeClosed())
Expect(server.Close()).To(Succeed())
})
It("downloads data when both sides cancel streams after a while", func() {
server, err := quic.ListenAddr("localhost:0", testdata.GetTLSConfig(), nil)
Expect(err).ToNot(HaveOccurred())
done := make(chan struct{})
go func() {
defer GinkgoRecover()
var wg sync.WaitGroup
wg.Add(numStreams)
sess, err := server.Accept()
Expect(err).ToNot(HaveOccurred())
for i := 0; i < numStreams; i++ {
go func() {
defer GinkgoRecover()
defer wg.Done()
str, err := sess.OpenUniStreamSync()
Expect(err).ToNot(HaveOccurred())
// cancel about half of the streams
length := len(testserver.PRData)
if rand.Int31()%2 == 0 {
length = int(rand.Int31n(int32(len(testserver.PRData) - 1)))
}
if _, err = str.Write(testserver.PRData[:length]); err != nil {
Expect(err).To(MatchError(fmt.Sprintf("Stream %d was reset with error code %d", str.StreamID(), str.StreamID())))
return
}
if length < len(testserver.PRData) {
Expect(str.CancelWrite(quic.ErrorCode(str.StreamID()))).To(Succeed())
} else {
Expect(str.Close()).To(Succeed())
}
}()
}
wg.Wait()
close(done)
}()
sess, err := quic.DialAddr(
fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port),
&tls.Config{RootCAs: testdata.GetRootCA()},
&quic.Config{MaxIncomingUniStreams: numStreams / 2},
)
Expect(err).ToNot(HaveOccurred())
var wg sync.WaitGroup
var counter int32
wg.Add(numStreams)
for i := 0; i < numStreams; i++ {
go func() {
defer GinkgoRecover()
defer wg.Done()
str, err := sess.AcceptUniStream()
Expect(err).ToNot(HaveOccurred())
r := io.Reader(str)
length := len(testserver.PRData)
// cancel around half of the streams
if rand.Int31()%2 == 0 {
length = int(rand.Int31n(int32(len(testserver.PRData) - 1)))
r = io.LimitReader(str, int64(length))
}
data, err := ioutil.ReadAll(r)
if err != nil {
Expect(err).To(MatchError(fmt.Sprintf("Stream %d was reset with error code %d", str.StreamID(), str.StreamID())))
return
}
Expect(data).To(Equal(testserver.PRData[:length]))
if length < len(testserver.PRData) {
Expect(str.CancelRead(quic.ErrorCode(str.StreamID()))).To(Succeed())
return
}
atomic.AddInt32(&counter, 1)
Expect(data).To(Equal(testserver.PRData))
}()
}
wg.Wait()
count := atomic.LoadInt32(&counter)
Expect(count).To(BeNumerically(">", numStreams/15))
fmt.Fprintf(GinkgoWriter, "Successfully read from %d of %d streams.\n", count, numStreams)
Expect(sess.Close()).To(Succeed())
Eventually(done).Should(BeClosed())
Expect(server.Close()).To(Succeed())
})
})
})