add a simple UDP proxy for integration tests

ref #167
This commit is contained in:
Marten Seemann
2016-06-05 17:22:30 +07:00
parent 981b465287
commit f34890e608
2 changed files with 262 additions and 0 deletions

View File

@@ -0,0 +1,121 @@
package integrationtests
import (
"net"
"strconv"
"sync"
)
// Connection is a UDP connection
type connection struct {
ClientAddr *net.UDPAddr // Address of the client
ServerConn *net.UDPConn // UDP connection to server
}
// UDPProxy is a UDP proxy
type UDPProxy struct {
serverAddr *net.UDPAddr
mutex sync.Mutex
proxyConn *net.UDPConn
// Mapping from client addresses (as host:port) to connection
clientDict map[string]*connection
}
// NewUDPProxy creates a new UDP proxy
func NewUDPProxy(proxyPort int, serverAddress string, serverPort int) (*UDPProxy, error) {
p := UDPProxy{
clientDict: make(map[string]*connection),
}
saddr, err := net.ResolveUDPAddr("udp", ":"+strconv.Itoa(proxyPort))
if err != nil {
return nil, err
}
pudp, err := net.ListenUDP("udp", saddr)
if err != nil {
return nil, err
}
p.proxyConn = pudp
srvaddr, err := net.ResolveUDPAddr("udp", serverAddress+":"+strconv.Itoa(serverPort))
if err != nil {
return nil, err
}
p.serverAddr = srvaddr
go p.runProxy()
return &p, nil
}
// Stop stops the UDP Proxy
func (p *UDPProxy) Stop() {
p.proxyConn.Close()
}
func (p *UDPProxy) newConnection(cliAddr *net.UDPAddr) (*connection, error) {
var conn connection
conn.ClientAddr = cliAddr
srvudp, err := net.DialUDP("udp", nil, p.serverAddr)
if err != nil {
return nil, err
}
conn.ServerConn = srvudp
return &conn, nil
}
// runProxy handles inputs to Proxy port
func (p *UDPProxy) runProxy() error {
buffer := make([]byte, 1500)
for {
n, cliaddr, err := p.proxyConn.ReadFromUDP(buffer[0:])
if err != nil {
return err
}
saddr := cliaddr.String()
p.mutex.Lock()
conn, ok := p.clientDict[saddr]
if !ok {
conn, err = p.newConnection(cliaddr)
if err != nil {
p.mutex.Unlock()
return err
}
p.clientDict[saddr] = conn
p.mutex.Unlock()
go p.runConnection(conn)
} else {
p.mutex.Unlock()
}
// Relay to server
_, err = conn.ServerConn.Write(buffer[0:n])
if err != nil {
return err
}
}
}
// runConnection handles packets from server to a single client
func (p *UDPProxy) runConnection(conn *connection) error {
buffer := make([]byte, 1500)
for {
n, err := conn.ServerConn.Read(buffer[0:])
if err != nil {
return err
}
// Relay it to client
_, err = p.proxyConn.WriteToUDP(buffer[0:n], conn.ClientAddr)
if err != nil {
return err
}
}
}

View File

@@ -0,0 +1,141 @@
package integrationtests
import (
"net"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
type packetData []byte
var _ = Describe("Integrationtests", func() {
It("sets up the UDPProxy", func() {
proxy, err := NewUDPProxy(13370, "localhost", 7331)
Expect(err).ToNot(HaveOccurred())
// check that port 13370 is in use
addr, err := net.ResolveUDPAddr("udp", ":13370")
Expect(err).ToNot(HaveOccurred())
_, err = net.ListenUDP("udp", addr)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError("listen udp :13370: bind: address already in use"))
proxy.Stop() // stopping is tested in the next test
})
It("stops the UDPProxy", func() {
proxy, err := NewUDPProxy(13370, "localhost", 7331)
Expect(err).ToNot(HaveOccurred())
proxy.Stop()
// check that port 13370 is not in use anymore
addr, err := net.ResolveUDPAddr("udp", ":13370")
Expect(err).ToNot(HaveOccurred())
ln, err := net.ListenUDP("udp", addr)
Expect(err).ToNot(HaveOccurred())
ln.Close()
})
Context("Proxy tests", func() {
var serverConn *net.UDPConn
var serverReceivedPackets []packetData
var serverNumPacketsSent int
var clientConn *net.UDPConn
var proxy *UDPProxy
BeforeEach(func() {
var err error
serverReceivedPackets = serverReceivedPackets[:0]
serverNumPacketsSent = 0
// setup a UDP server on port 7331
serverAddr, err := net.ResolveUDPAddr("udp", ":7331")
Expect(err).ToNot(HaveOccurred())
serverConn, err = net.ListenUDP("udp", serverAddr)
Expect(err).ToNot(HaveOccurred())
// setup the proxy
proxy, err = NewUDPProxy(10001, "localhost", 7331)
Expect(err).ToNot(HaveOccurred())
proxyAddr, err := net.ResolveUDPAddr("udp", ":10001")
Expect(err).ToNot(HaveOccurred())
go func() {
defer GinkgoRecover()
for {
buf := make([]byte, 1500)
n, addr, err2 := serverConn.ReadFromUDP(buf)
if err2 != nil {
return
}
data := buf[0:n]
serverReceivedPackets = append(serverReceivedPackets, packetData(data))
// echo each packet received back to the client
serverConn.WriteToUDP(data, addr)
serverNumPacketsSent++
}
}()
// setup the client
localAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
Expect(err).ToNot(HaveOccurred())
clientConn, err = net.DialUDP("udp", localAddr, proxyAddr)
Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
proxy.Stop()
serverConn.Close()
time.Sleep(time.Millisecond)
})
It("relays packets from the client to the server", func() {
_, err := clientConn.Write([]byte("foobar"))
Expect(err).ToNot(HaveOccurred())
time.Sleep(time.Millisecond)
_, err = clientConn.Write([]byte("decafbad"))
Expect(err).ToNot(HaveOccurred())
time.Sleep(time.Millisecond)
Expect(serverReceivedPackets).To(HaveLen(2))
Expect(serverReceivedPackets[0]).To(Equal(packetData("foobar")))
Expect(serverReceivedPackets[1]).To(Equal(packetData("decafbad")))
})
It("relays packets from the server to the client", func() {
_, err := clientConn.Write([]byte("foobar"))
Expect(err).ToNot(HaveOccurred())
time.Sleep(time.Millisecond)
_, err = clientConn.Write([]byte("decafbad"))
Expect(err).ToNot(HaveOccurred())
time.Sleep(time.Millisecond)
var clientReceivedPackets []packetData
// receive the packets echoed by the server on client side
go func() {
defer GinkgoRecover()
for {
buf := make([]byte, 1500)
n, _, err2 := clientConn.ReadFromUDP(buf)
if err2 != nil {
return
}
data := buf[0:n]
clientReceivedPackets = append(clientReceivedPackets, packetData(data))
}
}()
time.Sleep(time.Millisecond)
Expect(serverReceivedPackets).To(HaveLen(2))
Expect(serverNumPacketsSent).To(Equal(2))
Expect(clientReceivedPackets).To(HaveLen(2))
Expect(clientReceivedPackets[0]).To(Equal(packetData("foobar")))
Expect(clientReceivedPackets[1]).To(Equal(packetData("decafbad")))
})
})
})