forked from quic-go/quic-go
add an integration test for datagram transfers
This commit is contained in:
141
integrationtests/self/datagram_test.go
Normal file
141
integrationtests/self/datagram_test.go
Normal file
@@ -0,0 +1,141 @@
|
|||||||
|
package self_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
mrand "math/rand"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lucas-clemente/quic-go"
|
||||||
|
quicproxy "github.com/lucas-clemente/quic-go/integrationtests/tools/proxy"
|
||||||
|
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Datagram test", func() {
|
||||||
|
for _, v := range protocol.SupportedVersions {
|
||||||
|
version := v
|
||||||
|
|
||||||
|
Context(fmt.Sprintf("with QUIC version %s", version), func() {
|
||||||
|
const num = 100
|
||||||
|
|
||||||
|
var (
|
||||||
|
proxy *quicproxy.QuicProxy
|
||||||
|
serverConn, clientConn *net.UDPConn
|
||||||
|
dropped, total int32
|
||||||
|
)
|
||||||
|
|
||||||
|
startServerAndProxy := func() {
|
||||||
|
addr, err := net.ResolveUDPAddr("udp", "localhost:0")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
serverConn, err = net.ListenUDP("udp", addr)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
ln, err := quic.Listen(
|
||||||
|
serverConn,
|
||||||
|
getTLSConfig(),
|
||||||
|
getQuicConfig(&quic.Config{
|
||||||
|
EnableDatagrams: true,
|
||||||
|
Versions: []protocol.VersionNumber{version},
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
sess, err := ln.Accept(context.Background())
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(sess.ConnectionState().SupportsDatagrams).To(BeTrue())
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(num)
|
||||||
|
for i := 0; i < num; i++ {
|
||||||
|
go func(i int) {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
defer wg.Done()
|
||||||
|
b := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint64(b, uint64(i))
|
||||||
|
Expect(sess.SendMessage(b)).To(Succeed())
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
serverPort := ln.Addr().(*net.UDPAddr).Port
|
||||||
|
proxy, err = quicproxy.NewQuicProxy("localhost:0", &quicproxy.Opts{
|
||||||
|
RemoteAddr: fmt.Sprintf("localhost:%d", serverPort),
|
||||||
|
// drop 10% of Short Header packets sent from the server
|
||||||
|
DropPacket: func(dir quicproxy.Direction, packet []byte) bool {
|
||||||
|
if dir == quicproxy.DirectionIncoming {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// don't drop Long Header packets
|
||||||
|
if packet[0]&0x80 == 1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
drop := mrand.Int()%10 == 0
|
||||||
|
if drop {
|
||||||
|
atomic.AddInt32(&dropped, 1)
|
||||||
|
}
|
||||||
|
atomic.AddInt32(&total, 1)
|
||||||
|
return drop
|
||||||
|
},
|
||||||
|
})
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
addr, err := net.ResolveUDPAddr("udp", "localhost:0")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
clientConn, err = net.ListenUDP("udp", addr)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
Expect(proxy.Close()).To(Succeed())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("sends datagrams", func() {
|
||||||
|
startServerAndProxy()
|
||||||
|
raddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", proxy.LocalPort()))
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
sess, err := quic.Dial(
|
||||||
|
clientConn,
|
||||||
|
raddr,
|
||||||
|
fmt.Sprintf("localhost:%d", proxy.LocalPort()),
|
||||||
|
getTLSClientConfig(),
|
||||||
|
getQuicConfig(&quic.Config{
|
||||||
|
EnableDatagrams: true,
|
||||||
|
Versions: []protocol.VersionNumber{version},
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(sess.ConnectionState().SupportsDatagrams).To(BeTrue())
|
||||||
|
var counter int
|
||||||
|
for {
|
||||||
|
// Close the session if no message is received for 100 ms.
|
||||||
|
timer := time.AfterFunc(scaleDuration(100*time.Millisecond), func() {
|
||||||
|
sess.CloseWithError(0, "")
|
||||||
|
})
|
||||||
|
if _, err := sess.ReceiveMessage(); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
timer.Stop()
|
||||||
|
counter++
|
||||||
|
}
|
||||||
|
|
||||||
|
numDropped := int(atomic.LoadInt32(&dropped))
|
||||||
|
expVal := num - numDropped
|
||||||
|
fmt.Fprintf(GinkgoWriter, "Dropped %d out of %d packets.\n", numDropped, atomic.LoadInt32(&total))
|
||||||
|
fmt.Fprintf(GinkgoWriter, "Received %d out of %d sent datagrams.\n", counter, num)
|
||||||
|
Expect(counter).To(And(
|
||||||
|
BeNumerically(">", expVal*9/10),
|
||||||
|
BeNumerically("<", num),
|
||||||
|
))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
mrand "math/rand"
|
mrand "math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -317,6 +318,15 @@ func debugLog() bool {
|
|||||||
return len(logFileName) > 0
|
return len(logFileName) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func scaleDuration(d time.Duration) time.Duration {
|
||||||
|
scaleFactor := 1
|
||||||
|
if f, err := strconv.Atoi(os.Getenv("TIMESCALE_FACTOR")); err == nil { // parsing "" errors, so this works fine if the env is not set
|
||||||
|
scaleFactor = f
|
||||||
|
}
|
||||||
|
Expect(scaleFactor).ToNot(BeZero())
|
||||||
|
return time.Duration(scaleFactor) * d
|
||||||
|
}
|
||||||
|
|
||||||
func TestSelf(t *testing.T) {
|
func TestSelf(t *testing.T) {
|
||||||
RegisterFailHandler(Fail)
|
RegisterFailHandler(Fail)
|
||||||
RunSpecs(t, "Self integration tests")
|
RunSpecs(t, "Self integration tests")
|
||||||
|
|||||||
@@ -8,9 +8,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
mrand "math/rand"
|
mrand "math/rand"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
|
||||||
"runtime/pprof"
|
"runtime/pprof"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@@ -181,15 +179,6 @@ var _ = Describe("Timeout tests", func() {
|
|||||||
Context("timing out at the right time", func() {
|
Context("timing out at the right time", func() {
|
||||||
var idleTimeout time.Duration
|
var idleTimeout time.Duration
|
||||||
|
|
||||||
scaleDuration := func(d time.Duration) time.Duration {
|
|
||||||
scaleFactor := 1
|
|
||||||
if f, err := strconv.Atoi(os.Getenv("TIMESCALE_FACTOR")); err == nil { // parsing "" errors, so this works fine if the env is not set
|
|
||||||
scaleFactor = f
|
|
||||||
}
|
|
||||||
Expect(scaleFactor).ToNot(BeZero())
|
|
||||||
return time.Duration(scaleFactor) * d
|
|
||||||
}
|
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
idleTimeout = scaleDuration(100 * time.Millisecond)
|
idleTimeout = scaleDuration(100 * time.Millisecond)
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user