From f34890e60887f48a081cc187400367ef7fff885b Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 5 Jun 2016 17:22:30 +0700 Subject: [PATCH] add a simple UDP proxy for integration tests ref #167 --- integrationtests/udp_proxy.go | 121 +++++++++++++++++++++++++ integrationtests/udp_proxy_test.go | 141 +++++++++++++++++++++++++++++ 2 files changed, 262 insertions(+) create mode 100644 integrationtests/udp_proxy.go create mode 100644 integrationtests/udp_proxy_test.go diff --git a/integrationtests/udp_proxy.go b/integrationtests/udp_proxy.go new file mode 100644 index 000000000..26f7a0a16 --- /dev/null +++ b/integrationtests/udp_proxy.go @@ -0,0 +1,121 @@ +package integrationtests + +import ( + "net" + "strconv" + "sync" +) + +// Connection is a UDP connection +type connection struct { + ClientAddr *net.UDPAddr // Address of the client + ServerConn *net.UDPConn // UDP connection to server +} + +// UDPProxy is a UDP proxy +type UDPProxy struct { + serverAddr *net.UDPAddr + mutex sync.Mutex + + proxyConn *net.UDPConn + + // Mapping from client addresses (as host:port) to connection + clientDict map[string]*connection +} + +// NewUDPProxy creates a new UDP proxy +func NewUDPProxy(proxyPort int, serverAddress string, serverPort int) (*UDPProxy, error) { + p := UDPProxy{ + clientDict: make(map[string]*connection), + } + + saddr, err := net.ResolveUDPAddr("udp", ":"+strconv.Itoa(proxyPort)) + if err != nil { + return nil, err + } + pudp, err := net.ListenUDP("udp", saddr) + if err != nil { + return nil, err + } + p.proxyConn = pudp + + srvaddr, err := net.ResolveUDPAddr("udp", serverAddress+":"+strconv.Itoa(serverPort)) + if err != nil { + return nil, err + } + p.serverAddr = srvaddr + + go p.runProxy() + + return &p, nil +} + +// Stop stops the UDP Proxy +func (p *UDPProxy) Stop() { + p.proxyConn.Close() +} + +func (p *UDPProxy) newConnection(cliAddr *net.UDPAddr) (*connection, error) { + var conn connection + conn.ClientAddr = cliAddr + srvudp, err := net.DialUDP("udp", nil, p.serverAddr) + if err != nil { + return nil, err + } + conn.ServerConn = srvudp + return &conn, nil +} + +// runProxy handles inputs to Proxy port +func (p *UDPProxy) runProxy() error { + buffer := make([]byte, 1500) + + for { + n, cliaddr, err := p.proxyConn.ReadFromUDP(buffer[0:]) + if err != nil { + return err + } + + saddr := cliaddr.String() + + p.mutex.Lock() + conn, ok := p.clientDict[saddr] + + if !ok { + conn, err = p.newConnection(cliaddr) + if err != nil { + p.mutex.Unlock() + return err + } + p.clientDict[saddr] = conn + p.mutex.Unlock() + go p.runConnection(conn) + } else { + p.mutex.Unlock() + } + + // Relay to server + _, err = conn.ServerConn.Write(buffer[0:n]) + if err != nil { + return err + } + } +} + +// runConnection handles packets from server to a single client +func (p *UDPProxy) runConnection(conn *connection) error { + buffer := make([]byte, 1500) + + for { + n, err := conn.ServerConn.Read(buffer[0:]) + if err != nil { + return err + } + + // Relay it to client + _, err = p.proxyConn.WriteToUDP(buffer[0:n], conn.ClientAddr) + if err != nil { + return err + } + } +} diff --git a/integrationtests/udp_proxy_test.go b/integrationtests/udp_proxy_test.go new file mode 100644 index 000000000..746d225f5 --- /dev/null +++ b/integrationtests/udp_proxy_test.go @@ -0,0 +1,141 @@ +package integrationtests + +import ( + "net" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +type packetData []byte + +var _ = Describe("Integrationtests", func() { + It("sets up the UDPProxy", func() { + proxy, err := NewUDPProxy(13370, "localhost", 7331) + Expect(err).ToNot(HaveOccurred()) + + // check that port 13370 is in use + addr, err := net.ResolveUDPAddr("udp", ":13370") + Expect(err).ToNot(HaveOccurred()) + _, err = net.ListenUDP("udp", addr) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("listen udp :13370: bind: address already in use")) + + proxy.Stop() // stopping is tested in the next test + }) + + It("stops the UDPProxy", func() { + proxy, err := NewUDPProxy(13370, "localhost", 7331) + Expect(err).ToNot(HaveOccurred()) + proxy.Stop() + + // check that port 13370 is not in use anymore + addr, err := net.ResolveUDPAddr("udp", ":13370") + Expect(err).ToNot(HaveOccurred()) + ln, err := net.ListenUDP("udp", addr) + Expect(err).ToNot(HaveOccurred()) + ln.Close() + }) + + Context("Proxy tests", func() { + var serverConn *net.UDPConn + var serverReceivedPackets []packetData + var serverNumPacketsSent int + var clientConn *net.UDPConn + var proxy *UDPProxy + + BeforeEach(func() { + var err error + serverReceivedPackets = serverReceivedPackets[:0] + serverNumPacketsSent = 0 + + // setup a UDP server on port 7331 + serverAddr, err := net.ResolveUDPAddr("udp", ":7331") + Expect(err).ToNot(HaveOccurred()) + serverConn, err = net.ListenUDP("udp", serverAddr) + Expect(err).ToNot(HaveOccurred()) + + // setup the proxy + proxy, err = NewUDPProxy(10001, "localhost", 7331) + Expect(err).ToNot(HaveOccurred()) + proxyAddr, err := net.ResolveUDPAddr("udp", ":10001") + Expect(err).ToNot(HaveOccurred()) + + go func() { + defer GinkgoRecover() + + for { + buf := make([]byte, 1500) + n, addr, err2 := serverConn.ReadFromUDP(buf) + if err2 != nil { + return + } + data := buf[0:n] + serverReceivedPackets = append(serverReceivedPackets, packetData(data)) + + // echo each packet received back to the client + serverConn.WriteToUDP(data, addr) + serverNumPacketsSent++ + } + }() + + // setup the client + localAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + Expect(err).ToNot(HaveOccurred()) + clientConn, err = net.DialUDP("udp", localAddr, proxyAddr) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + proxy.Stop() + serverConn.Close() + time.Sleep(time.Millisecond) + }) + + It("relays packets from the client to the server", func() { + _, err := clientConn.Write([]byte("foobar")) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(time.Millisecond) + _, err = clientConn.Write([]byte("decafbad")) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(time.Millisecond) + Expect(serverReceivedPackets).To(HaveLen(2)) + Expect(serverReceivedPackets[0]).To(Equal(packetData("foobar"))) + Expect(serverReceivedPackets[1]).To(Equal(packetData("decafbad"))) + }) + + It("relays packets from the server to the client", func() { + _, err := clientConn.Write([]byte("foobar")) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(time.Millisecond) + _, err = clientConn.Write([]byte("decafbad")) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(time.Millisecond) + + var clientReceivedPackets []packetData + + // receive the packets echoed by the server on client side + go func() { + defer GinkgoRecover() + + for { + buf := make([]byte, 1500) + n, _, err2 := clientConn.ReadFromUDP(buf) + if err2 != nil { + return + } + data := buf[0:n] + clientReceivedPackets = append(clientReceivedPackets, packetData(data)) + } + }() + + time.Sleep(time.Millisecond) + Expect(serverReceivedPackets).To(HaveLen(2)) + Expect(serverNumPacketsSent).To(Equal(2)) + Expect(clientReceivedPackets).To(HaveLen(2)) + Expect(clientReceivedPackets[0]).To(Equal(packetData("foobar"))) + Expect(clientReceivedPackets[1]).To(Equal(packetData("decafbad"))) + }) + }) +})