add simnet package to simulate a net.PacketConn in memory (#5385)

* Implement simnet

* simnet: remove separate license

* simnet: remove go.mod, use standard require package

* simnet: add README

* simnet: use synctest wrapper in tests

* simnet: minor code cleanup

* simnet: expose Packet.Data

* simnet: explose Simnet.Router

* simnet: remove SimpleFirewallRouter

* simnet: remove stray fmt.Println in tests

* fix deadline check for write deadlines

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* simnet: fix SetReadDeadline logic

---------

Co-authored-by: Marco Munizaga <git@marcopolo.io>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Marten Seemann
2025-10-18 15:19:46 +08:00
committed by GitHub
parent f07d6939d0
commit 7772755df2
12 changed files with 1483 additions and 0 deletions

1
go.mod
View File

@@ -10,6 +10,7 @@ require (
golang.org/x/net v0.43.0
golang.org/x/sync v0.16.0
golang.org/x/sys v0.35.0
golang.org/x/time v0.12.0
)
require (

2
go.sum
View File

@@ -34,6 +34,8 @@ golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -0,0 +1,16 @@
# simnet
This package is based on @MarcoPolo's [simnet](https://github.com/marcopolo/simnet) package.
A small Go library for simulating packet networks in-process. It provides
drop-in `net.PacketConn` endpoints connected through configurable virtual links
with bandwidth, latency, and MTU constraints. Useful for testing networking code
without sockets or root privileges.
- **Drop-in API**: implements `net.PacketConn`
- **Realistic links**: per-direction bandwidth, latency, and MTU
- **Backpressure/buffering**: bandwidthdelay product aware queues
- **Routers**: perfect delivery, fixed-latency, simple firewall/NAT-like routing
- **Deterministic testing**: opt-in `synctest`-based tests for time control

View File

@@ -0,0 +1,56 @@
package simnet
import "sync"
type packetQueue struct {
byteCountLimit int
currentByteCount int
queue []packetWithDeliveryTime
closed bool
cond *sync.Cond
}
func newPacketQ(byteCountLimit int) *packetQueue {
q := &packetQueue{
byteCountLimit: byteCountLimit,
}
q.cond = sync.NewCond(&sync.Mutex{})
return q
}
func (q *packetQueue) Close() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.cond.Broadcast()
q.closed = true
}
func (q *packetQueue) Push(p packetWithDeliveryTime) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.closed {
return
}
if q.currentByteCount+len(p.Data) > q.byteCountLimit {
return
}
q.queue = append(q.queue, p)
q.currentByteCount += len(p.Data)
q.cond.Signal()
}
func (q *packetQueue) Pop() (packetWithDeliveryTime, bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.closed {
// Block until a packet is added
q.cond.Wait()
}
if q.closed {
return packetWithDeliveryTime{}, false
}
p := q.queue[0]
q.queue = q.queue[1:]
q.currentByteCount -= len(p.Data)
return p, true
}

View File

@@ -0,0 +1,145 @@
package simnet
import (
"bytes"
"sync/atomic"
"testing"
"time"
)
func TestPacketQueue_Basic(t *testing.T) {
q := newPacketQ(1000)
// Test adding and removing single packet
testPacket := packetWithDeliveryTime{Packet: Packet{Data: []byte("test packet")}, DeliveryTime: time.Now()}
q.Push(testPacket)
got, ok := q.Pop()
if !ok {
t.Error("Expected successful Pop, got not ok")
}
if !bytes.Equal(got.Data, testPacket.Data) {
t.Errorf("Expected packet %v, got %v", testPacket, got)
}
}
func TestPacketQueue_Order(t *testing.T) {
q := newPacketQ(1000)
packets := []packetWithDeliveryTime{
{Packet: Packet{Data: []byte("first")}, DeliveryTime: time.Now()},
{Packet: Packet{Data: []byte("second")}, DeliveryTime: time.Now()},
{Packet: Packet{Data: []byte("third")}, DeliveryTime: time.Now()},
}
for _, p := range packets {
q.Push(p)
}
for i, want := range packets {
got, ok := q.Pop()
if !ok {
t.Errorf("Pop %d: expected success, got not ok", i)
continue
}
if !bytes.Equal(got.Data, want.Data) {
t.Errorf("Pop %d: expected %v, got %v", i, want, got)
}
}
}
func TestPacketQueue_BlockedThenClose(t *testing.T) {
q := newPacketQ(1000)
go func() {
time.Sleep(10 * time.Millisecond)
q.Close()
}()
startTime := time.Now()
// Test Pop on empty queue
_, ok := q.Pop()
if ok {
t.Error("Expected closed queue")
}
dur := time.Since(startTime)
if dur < 10*time.Millisecond {
t.Errorf("Expected Pop to block for at least 10ms, got %v", dur)
}
}
func TestPacketQueue_Blocking(t *testing.T) {
q := newPacketQ(1000)
done := make(chan bool)
timeout := time.After(100 * time.Millisecond)
testPacket := Packet{Data: []byte("test packet")}
var readPacket atomic.Bool
// Start consumer before pushing any data
go func() {
packet, ok := q.Pop()
if !ok {
t.Error("Expected successful Pop, got not ok")
done <- true
return
}
readPacket.Store(true)
if !bytes.Equal(packet.Data, testPacket.Data) {
t.Errorf("Expected %v, got %v", testPacket, packet)
}
done <- true
}()
// Wait a bit to ensure consumer is blocked
time.Sleep(10 * time.Millisecond)
if readPacket.Load() {
t.Error("Consumer should not have read packet")
}
// Push data that should unblock consumer
q.Push(packetWithDeliveryTime{Packet: testPacket, DeliveryTime: time.Now()})
select {
case <-done:
// Success - consumer received the packet
case <-timeout:
t.Error("Test timed out - Pop did not unblock after Push")
}
if !readPacket.Load() {
t.Error("Consumer should have read packet")
}
}
func TestPacketQueue_Concurrent(t *testing.T) {
q := newPacketQ(1000)
done := make(chan bool)
// Start producer goroutine
go func() {
for i := 0; i < 100; i++ {
q.Push(packetWithDeliveryTime{Packet: Packet{Data: []byte{byte(i)}}, DeliveryTime: time.Now()})
time.Sleep(time.Millisecond)
}
done <- true
}()
// Start consumer goroutine
go func() {
count := 0
for count < 100 {
_, ok := q.Pop()
if ok {
count++
}
time.Sleep(time.Millisecond)
}
done <- true
}()
// Wait for both goroutines to finish
<-done
<-done
}

149
testutils/simnet/router.go Normal file
View File

@@ -0,0 +1,149 @@
package simnet
import (
"errors"
"net"
"net/netip"
"sync"
"time"
)
type ipPortKey struct {
ip string
port uint16
isUDP bool
}
func (k *ipPortKey) FromNetAddr(addr net.Addr) error {
switch addr := addr.(type) {
case *net.UDPAddr:
*k = ipPortKey{
ip: string(addr.IP),
port: uint16(addr.Port),
isUDP: true,
}
return nil
case *net.TCPAddr:
*k = ipPortKey{
ip: string(addr.IP),
port: uint16(addr.Port),
isUDP: false,
}
return nil
default:
ip, err := netip.ParseAddrPort(addr.String())
if err != nil {
return err
}
*k = ipPortKey{
ip: string(ip.Addr().AsSlice()),
port: ip.Port(),
isUDP: addr.Network() == "udp",
}
return nil
}
}
type addrMap[V any] struct {
mu sync.Mutex
nodes map[ipPortKey]V
}
func (m *addrMap[V]) Get(addr net.Addr) (V, bool) {
m.mu.Lock()
defer m.mu.Unlock()
var v V
if len(m.nodes) == 0 {
return v, false
}
var k ipPortKey
if err := k.FromNetAddr(addr); err != nil {
return v, false
}
v, ok := m.nodes[k]
return v, ok
}
func (m *addrMap[V]) Set(addr net.Addr, v V) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.nodes == nil {
m.nodes = make(map[ipPortKey]V)
}
var k ipPortKey
if err := k.FromNetAddr(addr); err != nil {
return err
}
m.nodes[k] = v
return nil
}
func (m *addrMap[V]) Delete(addr net.Addr) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.nodes == nil {
m.nodes = make(map[ipPortKey]V)
}
var k ipPortKey
if err := k.FromNetAddr(addr); err != nil {
return err
}
delete(m.nodes, k)
return nil
}
// PerfectRouter is a router that has no latency or jitter and can route to
// every node
type PerfectRouter struct {
nodes addrMap[PacketReceiver]
}
// SendPacket implements Router.
func (r *PerfectRouter) SendPacket(p Packet) error {
conn, ok := r.nodes.Get(p.To)
if !ok {
return errors.New("unknown destination")
}
conn.RecvPacket(p)
return nil
}
func (r *PerfectRouter) AddNode(addr net.Addr, conn PacketReceiver) {
r.nodes.Set(addr, conn)
}
func (r *PerfectRouter) RemoveNode(addr net.Addr) {
r.nodes.Delete(addr)
}
var _ Router = &PerfectRouter{}
type DelayedPacketReceiver struct {
inner PacketReceiver
delay time.Duration
}
func (r *DelayedPacketReceiver) RecvPacket(p Packet) {
time.AfterFunc(r.delay, func() { r.inner.RecvPacket(p) })
}
type FixedLatencyRouter struct {
PerfectRouter
latency time.Duration
}
func (r *FixedLatencyRouter) SendPacket(p Packet) error {
return r.PerfectRouter.SendPacket(p)
}
func (r *FixedLatencyRouter) AddNode(addr net.Addr, conn PacketReceiver) {
r.PerfectRouter.AddNode(addr, &DelayedPacketReceiver{
inner: conn,
delay: r.latency,
})
}
var _ Router = &FixedLatencyRouter{}

250
testutils/simnet/simconn.go Normal file
View File

@@ -0,0 +1,250 @@
package simnet
import (
"errors"
"net"
"slices"
"sync"
"sync/atomic"
"time"
)
var ErrDeadlineExceeded = errors.New("deadline exceeded")
type PacketReceiver interface {
RecvPacket(p Packet)
}
// Router handles routing of packets between simulated connections.
// Implementations are responsible for delivering packets to their destinations.
type Router interface {
SendPacket(p Packet) error
AddNode(addr net.Addr, receiver PacketReceiver)
}
type Packet struct {
To net.Addr
From net.Addr
Data []byte
}
// SimConn is a simulated network connection that implements net.PacketConn.
// It provides packet-based communication through a Router for testing and
// simulation purposes. All send/recv operations are handled through the
// Router's packet delivery mechanism.
type SimConn struct {
mu sync.Mutex
closed bool
closedChan chan struct{}
deadlineUpdated chan struct{}
packetsSent atomic.Uint64
packetsRcvd atomic.Uint64
bytesSent atomic.Int64
bytesRcvd atomic.Int64
router Router
myAddr *net.UDPAddr
myLocalAddr net.Addr
packetsToRead chan Packet
// Controls whether to block when receiving packets if our buffer is full.
// If false, drops packets.
recvBackPressure bool
readDeadline time.Time
writeDeadline time.Time
}
var _ net.PacketConn = &SimConn{}
// NewSimConn creates a new simulated connection that drops packets if the
// receive buffer is full.
func NewSimConn(addr *net.UDPAddr, rtr Router) *SimConn {
return newSimConn(addr, rtr, false)
}
// NewBlockingSimConn creates a new simulated connection that blocks if the
// receive buffer is full. Does not drop packets.
func NewBlockingSimConn(addr *net.UDPAddr, rtr Router) *SimConn {
return newSimConn(addr, rtr, true)
}
func newSimConn(addr *net.UDPAddr, rtr Router, block bool) *SimConn {
c := &SimConn{
recvBackPressure: block,
router: rtr,
myAddr: addr,
packetsToRead: make(chan Packet, 32),
closedChan: make(chan struct{}),
deadlineUpdated: make(chan struct{}, 1),
}
rtr.AddNode(addr, c)
return c
}
type ConnStats struct {
BytesSent int
BytesRcvd int
PacketsSent int
PacketsRcvd int
}
func (c *SimConn) Stats() ConnStats {
return ConnStats{
BytesSent: int(c.bytesSent.Load()),
BytesRcvd: int(c.bytesRcvd.Load()),
PacketsSent: int(c.packetsSent.Load()),
PacketsRcvd: int(c.packetsRcvd.Load()),
}
}
// SetReadBuffer only exists to quell the warning message from quic-go
func (c *SimConn) SetReadBuffer(n int) error {
return nil
}
// SetWriteBuffer only exists to quell the warning message from quic-go
func (c *SimConn) SetWriteBuffer(n int) error {
return nil
}
func (c *SimConn) RecvPacket(p Packet) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return
}
c.mu.Unlock()
c.packetsRcvd.Add(1)
c.bytesRcvd.Add(int64(len(p.Data)))
if c.recvBackPressure {
select {
case c.packetsToRead <- p:
case <-c.closedChan:
// if the connection is closed, drop the packet
return
}
} else {
select {
case c.packetsToRead <- p:
default:
// drop the packet if the channel is full
}
}
}
func (c *SimConn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil
}
c.closed = true
close(c.closedChan)
return nil
}
func (c *SimConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return 0, nil, net.ErrClosed
}
deadline := c.readDeadline
c.mu.Unlock()
if !deadline.IsZero() && !time.Now().Before(deadline) {
return 0, nil, ErrDeadlineExceeded
}
var pkt Packet
var deadlineTimer <-chan time.Time
if !deadline.IsZero() {
deadlineTimer = time.After(time.Until(deadline))
}
select {
case pkt = <-c.packetsToRead:
case <-c.closedChan:
return 0, nil, net.ErrClosed
case <-c.deadlineUpdated:
return c.ReadFrom(p)
case <-deadlineTimer:
return 0, nil, ErrDeadlineExceeded
}
n = copy(p, pkt.Data)
// if the provided buffer is not enough to read the whole packet, we drop
// the rest of the data. this is similar to what `recvfrom` does on Linux
// and macOS.
return n, pkt.From, nil
}
func (c *SimConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return 0, net.ErrClosed
}
deadline := c.writeDeadline
c.mu.Unlock()
if !deadline.IsZero() && !time.Now().Before(deadline) {
return 0, ErrDeadlineExceeded
}
c.packetsSent.Add(1)
c.bytesSent.Add(int64(len(p)))
pkt := Packet{
From: c.myAddr,
To: addr,
Data: slices.Clone(p),
}
return len(p), c.router.SendPacket(pkt)
}
func (c *SimConn) UnicastAddr() net.Addr {
return c.myAddr
}
func (c *SimConn) LocalAddr() net.Addr {
if c.myLocalAddr != nil {
return c.myLocalAddr
}
return c.myAddr
}
func (c *SimConn) SetDeadline(t time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
c.readDeadline = t
c.writeDeadline = t
select {
case c.deadlineUpdated <- struct{}{}:
default:
}
return nil
}
func (c *SimConn) SetReadDeadline(t time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
c.readDeadline = t
select {
case c.deadlineUpdated <- struct{}{}:
default:
}
return nil
}
func (c *SimConn) SetWriteDeadline(t time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
c.writeDeadline = t
return nil
}

View File

@@ -0,0 +1,189 @@
package simnet
import (
"crypto/rand"
"net"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func randomPublicIPv4() net.IP {
start:
ip := make([]byte, 4)
rand.Read(ip[:])
if net.IP(ip).IsPrivate() || net.IP(ip).IsLoopback() || net.IP(ip).IsLinkLocalUnicast() {
goto start
}
return ip
}
func TestSimConnBasicConnectivity(t *testing.T) {
router := &PerfectRouter{}
// Create two endpoints
addr1 := &net.UDPAddr{IP: randomPublicIPv4(), Port: 1234}
addr2 := &net.UDPAddr{IP: randomPublicIPv4(), Port: 1234}
conn1 := NewSimConn(addr1, router)
conn2 := NewSimConn(addr2, router)
// Test sending data from conn1 to conn2
testData := []byte("hello world")
n, err := conn1.WriteTo(testData, addr2)
require.NoError(t, err)
require.Equal(t, len(testData), n)
// Read data from conn2
buf := make([]byte, 1024)
n, addr, err := conn2.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, testData, buf[:n])
require.Equal(t, addr1, addr)
// Check stats
stats1 := conn1.Stats()
require.Equal(t, len(testData), stats1.BytesSent)
require.Equal(t, 1, stats1.PacketsSent)
stats2 := conn2.Stats()
require.Equal(t, len(testData), stats2.BytesRcvd)
require.Equal(t, 1, stats2.PacketsRcvd)
}
func TestSimConnDeadlines(t *testing.T) {
router := &PerfectRouter{}
addr1 := &net.UDPAddr{IP: randomPublicIPv4(), Port: 1234}
conn := NewSimConn(addr1, router)
t.Run("read deadline", func(t *testing.T) {
deadline := time.Now().Add(10 * time.Millisecond)
err := conn.SetReadDeadline(deadline)
require.NoError(t, err)
buf := make([]byte, 1024)
_, _, err = conn.ReadFrom(buf)
require.ErrorIs(t, err, ErrDeadlineExceeded)
})
t.Run("write deadline", func(t *testing.T) {
deadline := time.Now().Add(-time.Second) // Already expired
err := conn.SetWriteDeadline(deadline)
require.NoError(t, err)
_, err = conn.WriteTo([]byte("test"), &net.UDPAddr{})
require.ErrorIs(t, err, ErrDeadlineExceeded)
})
}
func TestSimConnClose(t *testing.T) {
router := &PerfectRouter{}
addr1 := &net.UDPAddr{IP: randomPublicIPv4(), Port: 1234}
conn := NewSimConn(addr1, router)
err := conn.Close()
require.NoError(t, err)
// Verify operations fail after close
_, err = conn.WriteTo([]byte("test"), addr1)
require.ErrorIs(t, err, net.ErrClosed)
buf := make([]byte, 1024)
_, _, err = conn.ReadFrom(buf)
require.ErrorIs(t, err, net.ErrClosed)
// Second close should not error
err = conn.Close()
require.NoError(t, err)
}
func TestSimConnDeadlinesWithLatency(t *testing.T) {
router := &FixedLatencyRouter{
PerfectRouter: PerfectRouter{},
latency: 100 * time.Millisecond,
}
addr1 := &net.UDPAddr{IP: randomPublicIPv4(), Port: 1234}
addr2 := &net.UDPAddr{IP: randomPublicIPv4(), Port: 1234}
conn1 := NewSimConn(addr1, router)
conn2 := NewSimConn(addr2, router)
reset := func() {
router.RemoveNode(addr1)
router.RemoveNode(addr2)
conn1 = NewSimConn(addr1, router)
conn2 = NewSimConn(addr2, router)
}
t.Run("write succeeds within deadline", func(t *testing.T) {
deadline := time.Now().Add(200 * time.Millisecond)
err := conn1.SetWriteDeadline(deadline)
require.NoError(t, err)
n, err := conn1.WriteTo([]byte("test"), addr2)
require.NoError(t, err)
require.Equal(t, 4, n)
reset()
})
t.Run("write fails after past deadline", func(t *testing.T) {
deadline := time.Now().Add(-time.Second) // Already expired
err := conn1.SetWriteDeadline(deadline)
require.NoError(t, err)
_, err = conn1.WriteTo([]byte("test"), addr2)
require.ErrorIs(t, err, ErrDeadlineExceeded)
reset()
})
t.Run("read succeeds within deadline", func(t *testing.T) {
// Reset deadline and send a message
conn2.SetReadDeadline(time.Time{})
testData := []byte("hello")
deadline := time.Now().Add(200 * time.Millisecond)
conn1.SetWriteDeadline(deadline)
_, err := conn1.WriteTo(testData, addr2)
require.NoError(t, err)
// Set read deadline and try to read
deadline = time.Now().Add(200 * time.Millisecond)
err = conn2.SetReadDeadline(deadline)
require.NoError(t, err)
buf := make([]byte, 1024)
n, addr, err := conn2.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, addr1, addr)
require.Equal(t, testData, buf[:n])
reset()
})
t.Run("read fails after deadline", func(t *testing.T) {
defer reset()
// Set a short deadline
deadline := time.Now().Add(50 * time.Millisecond) // Less than router latency
err := conn2.SetReadDeadline(deadline)
require.NoError(t, err)
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
// Send data after setting deadline
_, err := conn1.WriteTo([]byte("test"), addr2)
require.NoError(t, err)
}()
// Read should fail due to deadline
buf := make([]byte, 1024)
_, _, err = conn2.ReadFrom(buf)
require.ErrorIs(t, err, ErrDeadlineExceeded)
})
}

239
testutils/simnet/simlink.go Normal file
View File

@@ -0,0 +1,239 @@
package simnet
import (
"context"
"math"
"net"
"sync"
"time"
"golang.org/x/time/rate"
)
// Creates a new RateLimiter with the following parameters:
// bandwidth (in bits/sec).
// burstSize is in Bytes
func newRateLimiter(bandwidth int, burstSize int) *rate.Limiter {
// Convert bandwidth from bits/sec to bytes/sec
bytesPerSecond := rate.Limit(float64(bandwidth) / 8.0)
return rate.NewLimiter(bytesPerSecond, burstSize)
}
// LinkSettings defines the network characteristics for a simulated link direction.
// These settings control bandwidth, latency, and MTU for either uplink or downlink traffic.
type LinkSettings struct {
// BitsPerSecond specifies the bandwidth limit in bits per second.
// This controls the rate at which data can be transmitted over the link.
BitsPerSecond int
// Latency specifies the network delay to add to each packet.
// This simulates the time it takes for a packet to travel across the network.
Latency time.Duration
// MTU (Maximum Transmission Unit) specifies the maximum packet size in bytes.
// Packets larger than this size will be dropped by the simulated link.
MTU int
}
type packetWithDeliveryTime struct {
Packet
DeliveryTime time.Time
}
type latencyLink struct {
Out func(p Packet)
In chan *packetWithDeliveryTime
// q is technically unbounded here, but in practice is bounded by the bandwidth-delay product
q []*packetWithDeliveryTime
}
func newLatencyLink(out func(p Packet)) *latencyLink {
return &latencyLink{
In: make(chan *packetWithDeliveryTime),
Out: out,
}
}
func (l *latencyLink) Start(wg *sync.WaitGroup) {
defer wg.Done()
nextEvent := time.NewTimer(time.Second)
nextEvent.Stop()
for {
select {
case p, ok := <-l.In:
if !ok {
return
}
if !time.Now().Before(p.DeliveryTime) {
l.Out(p.Packet)
continue
}
l.q = append(l.q, p)
if len(l.q) == 1 {
nextEvent.Reset(time.Until(l.q[0].DeliveryTime))
}
case <-nextEvent.C:
if len(l.q) == 0 {
continue
}
nextPacket := l.q[0]
if nextPacket.DeliveryTime.After(time.Now()) {
nextEvent.Reset(time.Until(nextPacket.DeliveryTime))
continue
}
l.Out(nextPacket.Packet)
l.q = l.q[1:]
if len(l.q) > 0 {
nextEvent.Reset(time.Until(l.q[0].DeliveryTime))
}
}
}
}
// SimulatedLink simulates a bidirectional network link with configurable bandwidth,
// latency, and MTU settings for both uplink and downlink directions.
//
// The link provides realistic network behavior by:
// - Rate limiting packets based on bandwidth settings
// - Adding configurable latency to packet delivery
// - Enforcing MTU limits (dropping oversized packets)
// - Buffering packets up to the bandwidth-delay product
//
// Usage:
//
// link := &SimulatedLink{
// UplinkSettings: LinkSettings{BitsPerSecond: 1000000, Latency: 50*time.Millisecond, MTU: 1400},
// DownlinkSettings: LinkSettings{BitsPerSecond: 1000000, Latency: 50*time.Millisecond, MTU: 1400},
// UploadPacket: upstream,
// DownloadPacket: downstream,
// }
// link.Start()
// defer link.Close()
type SimulatedLink struct {
// Internal state for lifecycle management
closed chan struct{} // signals shutdown to background goroutines
wg sync.WaitGroup // ensures clean shutdown of all goroutines
// Packet queues with buffering based on bandwidth-delay product
downstream *packetQueue // buffers packets flowing to DownloadPacket
upstream *packetQueue // buffers packets flowing to UploadPacket
// Rate limiters enforce bandwidth constraints
upLimiter *rate.Limiter // limits uplink bandwidth
downLimiter *rate.Limiter // limits downlink bandwidth
// Latency simulators add realistic network delays
upLatency *latencyLink // adds latency to uplink packets
downLatency *latencyLink // adds latency to downlink packets
// Configuration for link characteristics
UplinkSettings LinkSettings // bandwidth, latency, MTU for uplink direction
DownlinkSettings LinkSettings // bandwidth, latency, MTU for downlink direction
// Packet routing interfaces
UploadPacket Router // Handles packets sent out
downloadPacket PacketReceiver // Handles packets received
}
func delayPacketHandling(limiter *rate.Limiter, p packetWithDeliveryTime) {
// WaitN blocks until the limiter permits len(p.buf) tokens
limiter.WaitN(context.Background(), len(p.Data))
}
func (l *SimulatedLink) backgroundDownlink() {
defer l.wg.Done()
defer close(l.downLatency.In)
for {
p, ok := l.downstream.Pop()
if !ok {
return
}
delayPacketHandling(l.downLimiter, p)
l.downLatency.In <- &p
}
}
func (l *SimulatedLink) backgroundUplink() {
defer l.wg.Done()
defer close(l.upLatency.In)
for {
p, ok := l.upstream.Pop()
if !ok {
return
}
delayPacketHandling(l.upLimiter, p)
l.upLatency.In <- &p
}
}
func calculateBDP(mtu, bandwidth int, latency time.Duration) int {
bdpBytes := (float64(bandwidth) / 8) * float64(latency.Seconds())
// If we straddle the packet boundary, round up to the nearest MTU
mtusWorth := int(math.Ceil(bdpBytes / float64(mtu)))
return mtusWorth * mtu
}
func (l *SimulatedLink) AddNode(addr net.Addr, receiver PacketReceiver) {
l.downloadPacket = receiver
}
func (l *SimulatedLink) Start() {
if l.downloadPacket == nil {
panic("SimulatedLink.Start() called without having added a packet receiver")
}
l.closed = make(chan struct{})
// Sane defaults
if l.DownlinkSettings.MTU == 0 {
l.DownlinkSettings.MTU = 1400
}
if l.UplinkSettings.MTU == 0 {
l.UplinkSettings.MTU = 1400
}
downBDP := calculateBDP(l.DownlinkSettings.MTU, l.DownlinkSettings.BitsPerSecond, l.DownlinkSettings.Latency)
upBDP := calculateBDP(l.UplinkSettings.MTU, l.UplinkSettings.BitsPerSecond, l.UplinkSettings.Latency)
l.downstream = newPacketQ(downBDP)
l.upstream = newPacketQ(upBDP)
const burstSizeInPackets = 16
l.upLimiter = newRateLimiter(l.UplinkSettings.BitsPerSecond, l.UplinkSettings.MTU*burstSizeInPackets)
l.downLimiter = newRateLimiter(l.DownlinkSettings.BitsPerSecond, l.DownlinkSettings.MTU*burstSizeInPackets)
l.upLatency = newLatencyLink(func(p Packet) { _ = l.UploadPacket.SendPacket(p) })
l.downLatency = newLatencyLink(func(p Packet) { l.downloadPacket.RecvPacket(p) })
l.wg.Add(4)
// TODO: Can we coalesce these into a single goroutine? Is it worth it?
go l.upLatency.Start(&l.wg)
go l.downLatency.Start(&l.wg)
go l.backgroundDownlink()
go l.backgroundUplink()
}
func (l *SimulatedLink) Close() error {
close(l.closed)
l.downstream.Close()
l.upstream.Close()
l.wg.Wait()
return nil
}
func (l *SimulatedLink) SendPacket(p Packet) error {
if len(p.Data) > l.UplinkSettings.MTU {
// Dropping packet if it's too large for the link
return nil
}
l.upstream.Push(packetWithDeliveryTime{Packet: p, DeliveryTime: time.Now().Add(l.UplinkSettings.Latency)})
return nil
}
func (l *SimulatedLink) RecvPacket(p Packet) {
if len(p.Data) > l.DownlinkSettings.MTU {
// Dropping packet if it's too large for the link
return
}
l.downstream.Push(packetWithDeliveryTime{Packet: p, DeliveryTime: time.Now().Add(l.DownlinkSettings.Latency)})
}

View File

@@ -0,0 +1,219 @@
package simnet
import (
"fmt"
"math"
"net"
"testing"
"time"
"github.com/quic-go/quic-go/internal/synctest"
)
type testRouter struct {
onSend func(p Packet)
onRecv func(p Packet)
}
func (r *testRouter) SendPacket(p Packet) error {
r.onSend(p)
return nil
}
func (r *testRouter) RecvPacket(p Packet) {
r.onRecv(p)
}
func (r *testRouter) AddNode(addr net.Addr, receiver PacketReceiver) {
r.onRecv = receiver.RecvPacket
}
const Mibps = 1_000_000
func TestBandwidthLimiterAndLatency(t *testing.T) {
for _, testUpload := range []bool{true, false} {
t.Run(fmt.Sprintf("testing upload=%t", testUpload), func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
const expectedSpeed = 10 * Mibps
const expectedLatency = 10 * time.Millisecond
const MTU = 1400
linkSettings := LinkSettings{
BitsPerSecond: expectedSpeed,
MTU: MTU,
Latency: expectedLatency,
}
recvStartTimeChan := make(chan time.Time, 1)
recvStarted := false
bytesRead := 0
packetHandler := func(p Packet) {
if !recvStarted {
recvStarted = true
recvStartTimeChan <- time.Now()
}
bytesRead += len(p.Data)
}
router := &testRouter{}
if testUpload {
router.onSend = packetHandler
} else {
router.onRecv = packetHandler
}
link := SimulatedLink{
UplinkSettings: linkSettings,
DownlinkSettings: linkSettings,
UploadPacket: router,
downloadPacket: router,
}
link.Start()
// Send 10MiB of data
chunk := make([]byte, MTU)
bytesSent := 0
sendStartTime := time.Now()
{
totalBytes := 10 << 20
// Blast a bunch of packets
for bytesSent < totalBytes {
// This sleep shouldn't limit the speed. 1400 Bytes/100us = 14KB/ms = 14MB/s = 14*8 Mbps
// but it acts as a simple pacer to avoid just dropping the packets when the link is saturated.
time.Sleep(100 * time.Microsecond)
if testUpload {
_ = link.SendPacket(Packet{Data: chunk})
} else {
link.RecvPacket(Packet{Data: chunk})
}
bytesSent += len(chunk)
}
}
// Wait for delayed packets to be sent
time.Sleep(40 * time.Millisecond)
fmt.Printf("sent: %d\n", bytesSent)
link.Close()
fmt.Printf("bytesRead: %d\n", bytesRead)
recvStartTime := <-recvStartTimeChan
duration := time.Since(recvStartTime)
observedLatency := recvStartTime.Sub(sendStartTime)
percentErrorLatency := math.Abs(observedLatency.Seconds()-expectedLatency.Seconds()) / expectedLatency.Seconds()
t.Logf("observed latency: %s, expected latency: %s, percent error: %f\n", observedLatency, expectedLatency, percentErrorLatency)
if percentErrorLatency > 0.20 {
t.Fatalf("observed latency %s is wrong", observedLatency)
}
observedSpeed := 8 * float64(bytesRead) / duration.Seconds()
t.Logf("observed speed: %f Mbps over %s\n", observedSpeed/Mibps, duration)
percentErrorSpeed := math.Abs(observedSpeed-float64(expectedSpeed)) / float64(expectedSpeed)
t.Logf("observed speed: %f Mbps, expected speed: %d Mbps, percent error: %f\n", observedSpeed/Mibps, expectedSpeed/Mibps, percentErrorSpeed)
if percentErrorSpeed > 0.20 {
t.Fatalf("observed speed %f Mbps is too far from expected speed %d Mbps. Percent error: %f", observedSpeed/Mibps, expectedSpeed/Mibps, percentErrorSpeed)
}
})
})
}
}
type linkAdapter struct {
link PacketReceiver
}
var _ Router = &linkAdapter{}
// AddNode implements Router.
func (c *linkAdapter) AddNode(addr net.Addr, receiver PacketReceiver) {
c.link = receiver
}
// SendPacket implements Router.
func (c *linkAdapter) SendPacket(p Packet) error {
c.link.RecvPacket(p)
return nil
}
func TestBandwidthLimiterAndLatencyConnectedLinks(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
const expectedSpeed = 100 * Mibps
const latencyOfOneLink = 10 * time.Millisecond
const expectedLatency = 2 * latencyOfOneLink
const MTU = 1400
linkSettings := LinkSettings{
BitsPerSecond: expectedSpeed,
MTU: MTU,
Latency: latencyOfOneLink,
}
recvStartTimeChan := make(chan time.Time, 1)
recvStarted := false
bytesRead := 0
packetHandler := func(p Packet) {
if !recvStarted {
recvStarted = true
recvStartTimeChan <- time.Now()
}
bytesRead += len(p.Data)
}
r := &testRouter{
onRecv: packetHandler,
}
link2 := SimulatedLink{
UplinkSettings: linkSettings,
DownlinkSettings: linkSettings,
downloadPacket: r,
}
link1 := SimulatedLink{
UplinkSettings: linkSettings,
DownlinkSettings: linkSettings,
UploadPacket: &linkAdapter{link: &link2},
downloadPacket: &testRouter{},
}
link1.Start()
link2.Start()
// Send 10MiB of data
chunk := make([]byte, MTU)
bytesSent := 0
sendStartTime := time.Now()
{
totalBytes := 10 << 20
// Blast a bunch of packets
for bytesSent < totalBytes {
time.Sleep(100 * time.Microsecond)
_ = link1.SendPacket(Packet{Data: chunk})
bytesSent += len(chunk)
}
}
// Wait for delayed packets to be sent
time.Sleep(40 * time.Millisecond)
t.Logf("sent: %d", bytesSent)
link1.Close()
link2.Close()
t.Logf("bytesRead: %d", bytesRead)
recvStartTime := <-recvStartTimeChan
duration := time.Since(recvStartTime)
observedLatency := recvStartTime.Sub(sendStartTime)
percentErrorLatency := math.Abs(observedLatency.Seconds()-expectedLatency.Seconds()) / expectedLatency.Seconds()
t.Logf("observed latency: %s, expected latency: %s, percent error: %f\n", observedLatency, expectedLatency, percentErrorLatency)
if percentErrorLatency > 0.20 {
t.Fatalf("observed latency %s is wrong", observedLatency)
}
observedSpeed := 8 * float64(bytesRead) / duration.Seconds()
t.Logf("observed speed: %f Mbps over %s\n", observedSpeed/Mibps, duration)
percentErrorSpeed := math.Abs(observedSpeed-float64(expectedSpeed)) / float64(expectedSpeed)
t.Logf("observed speed: %f Mbps, expected speed: %d Mbps, percent error: %f\n", observedSpeed/Mibps, expectedSpeed/Mibps, percentErrorSpeed)
if percentErrorSpeed > 0.20 {
t.Fatalf("observed speed %f Mbps is too far from expected speed %d Mbps. Percent error: %f", observedSpeed/Mibps, expectedSpeed/Mibps, percentErrorSpeed)
}
})
}

View File

@@ -0,0 +1,59 @@
package simnet
import (
"errors"
"fmt"
"net"
)
// Simnet is a simulated network that manages connections between nodes
// with configurable network conditions.
type Simnet struct {
Router Router
links []*SimulatedLink
}
// NodeBiDiLinkSettings defines the bidirectional link settings for a network node.
// It specifies separate configurations for downlink (incoming) and uplink (outgoing)
// traffic, allowing asymmetric network conditions to be simulated.
type NodeBiDiLinkSettings struct {
// Downlink configures the settings for incoming traffic to this node
Downlink LinkSettings
// Uplink configures the settings for outgoing traffic from this node
Uplink LinkSettings
}
func (n *Simnet) Start() error {
for _, link := range n.links {
link.Start()
}
return nil
}
func (n *Simnet) Close() error {
var errs error
for _, link := range n.links {
err := link.Close()
if err != nil {
errs = errors.Join(errs, err)
}
}
if errs != nil {
return fmt.Errorf("failed to close some links: %w", errs)
}
return nil
}
func (n *Simnet) NewEndpoint(addr *net.UDPAddr, linkSettings NodeBiDiLinkSettings) *SimConn {
link := &SimulatedLink{
DownlinkSettings: linkSettings.Downlink,
UplinkSettings: linkSettings.Uplink,
UploadPacket: n.Router,
}
c := NewBlockingSimConn(addr, link)
n.links = append(n.links, link)
n.Router.AddNode(addr, link)
return c
}

View File

@@ -0,0 +1,158 @@
package simnet
import (
"math"
"net"
"testing"
"time"
"github.com/quic-go/quic-go/internal/synctest"
"github.com/stretchr/testify/require"
)
const oneMbps = 1_000_000
func newConn(simnet *Simnet, address *net.UDPAddr, linkSettings NodeBiDiLinkSettings) *SimConn {
return simnet.NewEndpoint(address, linkSettings)
}
func TestSimpleSimNet(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
router := &Simnet{Router: &PerfectRouter{}}
const bandwidth = 10 * oneMbps
const latency = 10 * time.Millisecond
linkSettings := NodeBiDiLinkSettings{
Downlink: LinkSettings{
BitsPerSecond: bandwidth,
Latency: latency / 2,
},
Uplink: LinkSettings{
BitsPerSecond: bandwidth,
Latency: latency / 2,
},
}
addressA := net.UDPAddr{
IP: net.ParseIP("1.0.0.1"),
Port: 8000,
}
connA := newConn(router, &addressA, linkSettings)
addressB := net.UDPAddr{
IP: net.ParseIP("1.0.0.2"),
Port: 8000,
}
connB := newConn(router, &addressB, linkSettings)
router.Start()
defer router.Close()
start := time.Now()
connA.WriteTo([]byte("hello"), &addressB)
buf := make([]byte, 1024)
n, from, err := connB.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, "hello", string(buf[:n]))
require.Equal(t, addressA.String(), from.String())
observedLatency := time.Since(start)
expectedLatency := latency
percentDiff := math.Abs(float64(observedLatency-expectedLatency) / float64(expectedLatency))
t.Logf("observed latency: %v, expected latency: %v, percent diff: %v", observedLatency, expectedLatency, percentDiff)
if percentDiff > 0.30 {
t.Fatalf("latency is wrong: %v. percent off: %v", observedLatency, percentDiff)
}
})
}
func TestSimNetBandwidth(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
router := &Simnet{Router: &PerfectRouter{}}
const bandwidth = 40 * oneMbps
const latency = 10 * time.Millisecond
const MTU = 1200
linkSettings := NodeBiDiLinkSettings{
Downlink: LinkSettings{
BitsPerSecond: bandwidth,
MTU: MTU,
Latency: latency / 2,
},
Uplink: LinkSettings{
BitsPerSecond: bandwidth,
MTU: MTU,
Latency: latency / 2,
},
}
addressA := net.UDPAddr{
IP: net.ParseIP("1.0.0.1"),
Port: 8000,
}
connA := newConn(router, &addressA, linkSettings)
addressB := net.UDPAddr{
IP: net.ParseIP("1.0.0.2"),
Port: 8000,
}
connB := newConn(router, &addressB, linkSettings)
err := router.Start()
require.NoError(t, err)
defer router.Close()
readDone := make(chan struct{})
bytesRead := 0
start := time.Now()
var observedLatency time.Duration
var readDuration time.Duration
go func() {
defer close(readDone)
buf := make([]byte, MTU)
var startReadTime time.Time
for {
n, _, err := connB.ReadFrom(buf)
if observedLatency == 0 {
startReadTime = time.Now()
observedLatency = time.Since(start)
}
bytesRead += n
if err != nil {
readDuration = time.Since(startReadTime)
return
}
}
}()
totalBytes := 10 << 20
bytesSent := 0
chunk := make([]byte, MTU)
for bytesSent < totalBytes {
time.Sleep(100 * time.Microsecond)
connA.WriteTo(chunk, &addressB)
bytesSent += len(chunk)
}
connB.Close()
<-readDone
expectedLatency := latency
percentDiff := math.Abs(float64(observedLatency-expectedLatency) / float64(expectedLatency))
t.Logf("observed latency: %v, expected latency: %v, percent diff: %v", observedLatency, expectedLatency, percentDiff)
if percentDiff > 0.30 {
t.Fatalf("latency is wrong: %v. percent off: %v", observedLatency, percentDiff)
}
observedBandwidth := float64(bytesRead*8) / readDuration.Seconds()
expectedBandwidth := float64(bandwidth)
t.Logf("sent bytes: %d", bytesSent)
t.Logf("read bytes: %d", bytesRead)
percentDiffBandwidth := math.Abs(observedBandwidth-expectedBandwidth) / expectedBandwidth
t.Logf("observed bandwidth: %v mbps, expected bandwidth: %v mbps, percent diff: %v", observedBandwidth/oneMbps, expectedBandwidth/oneMbps, percentDiffBandwidth)
if percentDiffBandwidth > 0.20 {
t.Fatalf("bandwidth is wrong: %v. percent off: %v", observedBandwidth, percentDiffBandwidth)
}
})
}