diff --git a/integrationtests/self/stream_test.go b/integrationtests/self/stream_test.go index f56a8d80f..234c3a980 100644 --- a/integrationtests/self/stream_test.go +++ b/integrationtests/self/stream_test.go @@ -1,134 +1,149 @@ package self_test import ( - "crypto/tls" "fmt" - "io" "io/ioutil" + "net" "sync" - "github.com/lucas-clemente/quic-go/integrationtests/tools/testserver" - quic "github.com/lucas-clemente/quic-go" + "github.com/lucas-clemente/quic-go/integrationtests/tools/testserver" + "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/testdata" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) var _ = Describe("Stream tests", func() { - var server quic.Listener const numStreams = 300 - BeforeEach(func() { - var err error - server, err = quic.ListenAddr("localhost:0", testdata.GetTLSConfig(), nil) - Expect(err).ToNot(HaveOccurred()) - }) + var ( + server quic.Listener + serverAddr string + qconf *quic.Config + ) - AfterEach(func() { - server.Close() - }) + for _, v := range []protocol.VersionNumber{protocol.Version39, protocol.VersionTLS} { + version := v - runSendingPeer := func(sess quic.Session) { - var wg sync.WaitGroup - wg.Add(numStreams) - for i := 0; i < numStreams; i++ { - str, err := sess.OpenStreamSync() - Expect(err).ToNot(HaveOccurred()) - data := testserver.GeneratePRData(25 * i) - go func() { - defer GinkgoRecover() - _, err := str.Write(data) + Context(fmt.Sprintf("with QUIC %s", version), func() { + BeforeEach(func() { + var err error + qconf = &quic.Config{Versions: []protocol.VersionNumber{version}} + server, err = quic.ListenAddr("localhost:0", testdata.GetTLSConfig(), qconf) Expect(err).ToNot(HaveOccurred()) - Expect(str.Close()).To(Succeed()) - }() - go func() { - defer GinkgoRecover() - defer wg.Done() - dataRead, err := ioutil.ReadAll(str) + serverAddr = fmt.Sprintf("quic.clemente.io:%d", server.Addr().(*net.UDPAddr).Port) + }) + + AfterEach(func() { + server.Close() + }) + + runSendingPeer := func(sess quic.Session) { + var wg sync.WaitGroup + wg.Add(numStreams) + for i := 0; i < numStreams; i++ { + str, err := sess.OpenStreamSync() + Expect(err).ToNot(HaveOccurred()) + data := testserver.GeneratePRData(25 * i) + go func() { + defer GinkgoRecover() + _, err := str.Write(data) + Expect(err).ToNot(HaveOccurred()) + Expect(str.Close()).To(Succeed()) + }() + go func() { + defer GinkgoRecover() + defer wg.Done() + dataRead, err := ioutil.ReadAll(str) + Expect(err).ToNot(HaveOccurred()) + Expect(dataRead).To(Equal(data)) + }() + } + wg.Wait() + } + + runReceivingPeer := func(sess quic.Session) { + var wg sync.WaitGroup + wg.Add(numStreams) + for i := 0; i < numStreams; i++ { + str, err := sess.AcceptStream() + Expect(err).ToNot(HaveOccurred()) + go func() { + defer GinkgoRecover() + defer wg.Done() + // shouldn't use io.Copy here + // we should read from the stream as early as possible, to free flow control credit + data, err := ioutil.ReadAll(str) + Expect(err).ToNot(HaveOccurred()) + _, err = str.Write(data) + Expect(err).ToNot(HaveOccurred()) + Expect(str.Close()).To(Succeed()) + }() + } + wg.Wait() + } + + It(fmt.Sprintf("client opening %d streams to a server", numStreams), func() { + var sess quic.Session + go func() { + defer GinkgoRecover() + var err error + sess, err = server.Accept() + Expect(err).ToNot(HaveOccurred()) + runReceivingPeer(sess) + }() + + client, err := quic.DialAddr(serverAddr, nil, qconf) Expect(err).ToNot(HaveOccurred()) - Expect(dataRead).To(Equal(data)) - }() - } - wg.Wait() + runSendingPeer(client) + }) + + It(fmt.Sprintf("server opening %d streams to a client", numStreams), func() { + go func() { + defer GinkgoRecover() + sess, err := server.Accept() + Expect(err).ToNot(HaveOccurred()) + runSendingPeer(sess) + sess.Close(nil) + }() + + client, err := quic.DialAddr(serverAddr, nil, qconf) + Expect(err).ToNot(HaveOccurred()) + runReceivingPeer(client) + Eventually(client.Context().Done()).Should(BeClosed()) + }) + + It(fmt.Sprintf("client and server opening %d each and sending data to the peer", numStreams), func() { + done1 := make(chan struct{}) + go func() { + defer GinkgoRecover() + sess, err := server.Accept() + Expect(err).ToNot(HaveOccurred()) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + runReceivingPeer(sess) + close(done) + }() + runSendingPeer(sess) + <-done + close(done1) + }() + + client, err := quic.DialAddr(serverAddr, nil, qconf) + Expect(err).ToNot(HaveOccurred()) + done2 := make(chan struct{}) + go func() { + defer GinkgoRecover() + runSendingPeer(client) + close(done2) + }() + runReceivingPeer(client) + <-done1 + <-done2 + }) + }) } - - runReceivingPeer := func(sess quic.Session) { - var wg sync.WaitGroup - wg.Add(numStreams) - for i := 0; i < numStreams; i++ { - str, err := sess.AcceptStream() - Expect(err).ToNot(HaveOccurred()) - go func() { - defer GinkgoRecover() - defer wg.Done() - _, err := io.Copy(str, str) - Expect(err).ToNot(HaveOccurred()) - Expect(str.Close()).To(Succeed()) - }() - } - wg.Wait() - } - - It(fmt.Sprintf("client opening %d streams to a client", numStreams), func() { - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - sess, err := server.Accept() - Expect(err).ToNot(HaveOccurred()) - runReceivingPeer(sess) - close(done) - }() - - client, err := quic.DialAddr(server.Addr().String(), &tls.Config{InsecureSkipVerify: true}, nil) - Expect(err).ToNot(HaveOccurred()) - runSendingPeer(client) - <-done - }) - - It(fmt.Sprintf("server opening %d streams to a client", numStreams), func() { - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - sess, err := server.Accept() - Expect(err).ToNot(HaveOccurred()) - runSendingPeer(sess) - close(done) - }() - - client, err := quic.DialAddr(server.Addr().String(), &tls.Config{InsecureSkipVerify: true}, nil) - Expect(err).ToNot(HaveOccurred()) - runReceivingPeer(client) - <-done - }) - - It(fmt.Sprintf("client and server opening %d each and sending data to the peer", numStreams), func() { - done1 := make(chan struct{}) - go func() { - defer GinkgoRecover() - sess, err := server.Accept() - Expect(err).ToNot(HaveOccurred()) - done := make(chan struct{}) - go func() { - defer GinkgoRecover() - runReceivingPeer(sess) - close(done) - }() - runSendingPeer(sess) - <-done - close(done1) - }() - - client, err := quic.DialAddr(server.Addr().String(), &tls.Config{InsecureSkipVerify: true}, nil) - Expect(err).ToNot(HaveOccurred()) - done2 := make(chan struct{}) - go func() { - defer GinkgoRecover() - runSendingPeer(client) - close(done2) - }() - runReceivingPeer(client) - <-done1 - <-done2 - }) })