proxy: add source and destination address to delay and drop callbacks (#4964)

This commit is contained in:
Marten Seemann
2025-02-22 12:21:40 +01:00
committed by GitHub
parent eb2f986a06
commit 6033030017
19 changed files with 65 additions and 53 deletions

View File

@@ -139,10 +139,10 @@ func (d Direction) Is(dir Direction) bool {
}
// DropCallback is a callback that determines which packet gets dropped.
type DropCallback func(dir Direction, packet []byte) bool
type DropCallback func(dir Direction, from, to net.Addr, packet []byte) bool
// DelayCallback is a callback that determines how much delay to apply to a packet.
type DelayCallback func(dir Direction, packet []byte) time.Duration
type DelayCallback func(dir Direction, from, to net.Addr, packet []byte) time.Duration
// Proxy is a QUIC proxy that can drop and delay packets.
type Proxy struct {
@@ -267,7 +267,7 @@ func (p *Proxy) runProxy() error {
}
p.mutex.Unlock()
if p.DropPacket != nil && p.DropPacket(DirectionIncoming, raw) {
if p.DropPacket != nil && p.DropPacket(DirectionIncoming, cliaddr, conn.ServerAddr, raw) {
if p.logger.Debug() {
p.logger.Debugf("dropping incoming packet(%d bytes)", n)
}
@@ -276,7 +276,7 @@ func (p *Proxy) runProxy() error {
var delay time.Duration
if p.DelayPacket != nil {
delay = p.DelayPacket(DirectionIncoming, raw)
delay = p.DelayPacket(DirectionIncoming, cliaddr, conn.ServerAddr, raw)
}
if delay == 0 {
if p.logger.Debug() {
@@ -301,7 +301,7 @@ func (p *Proxy) runOutgoingConnection(conn *connection) error {
go func() {
for {
buffer := make([]byte, protocol.MaxPacketBufferSize)
n, err := conn.GetServerConn().Read(buffer)
n, addr, err := conn.GetServerConn().ReadFrom(buffer)
if err != nil {
// when the connection is switched out, we set a deadline on the old connection,
// in order to return it immediately
@@ -312,7 +312,7 @@ func (p *Proxy) runOutgoingConnection(conn *connection) error {
}
raw := buffer[0:n]
if p.DropPacket != nil && p.DropPacket(DirectionOutgoing, raw) {
if p.DropPacket != nil && p.DropPacket(DirectionOutgoing, addr, conn.ClientAddr, raw) {
if p.logger.Debug() {
p.logger.Debugf("dropping outgoing packet(%d bytes)", n)
}
@@ -321,7 +321,7 @@ func (p *Proxy) runOutgoingConnection(conn *connection) error {
var delay time.Duration
if p.DelayPacket != nil {
delay = p.DelayPacket(DirectionOutgoing, raw)
delay = p.DelayPacket(DirectionOutgoing, addr, conn.ClientAddr, raw)
}
if delay == 0 {
if p.logger.Debug() {

View File

@@ -142,13 +142,16 @@ func TestDropIncomingPackets(t *testing.T) {
const numPackets = 6
serverAddr, serverReceivedPackets := runServer(t)
var counter atomic.Int32
var fromAddr, toAddr atomic.Pointer[net.Addr]
proxy := Proxy{
Conn: newUPDConnLocalhost(t),
ServerAddr: serverAddr,
DropPacket: func(d Direction, _ []byte) bool {
DropPacket: func(d Direction, from, to net.Addr, _ []byte) bool {
if d != DirectionIncoming {
return false
}
fromAddr.Store(&from)
toAddr.Store(&to)
return counter.Add(1)%2 == 1
},
}
@@ -174,19 +177,25 @@ func TestDropIncomingPackets(t *testing.T) {
t.Fatalf("received unexpected packet")
case <-time.After(100 * time.Millisecond):
}
require.Equal(t, *fromAddr.Load(), clientConn.LocalAddr())
require.Equal(t, *toAddr.Load(), serverAddr)
}
func TestDropOutgoingPackets(t *testing.T) {
const numPackets = 6
serverAddr, serverReceivedPackets := runServer(t)
var counter atomic.Int32
var fromAddr, toAddr atomic.Pointer[net.Addr]
proxy := Proxy{
Conn: newUPDConnLocalhost(t),
ServerAddr: serverAddr,
DropPacket: func(d Direction, _ []byte) bool {
DropPacket: func(d Direction, from, to net.Addr, _ []byte) bool {
if d != DirectionOutgoing {
return false
}
fromAddr.Store(&from)
toAddr.Store(&to)
return counter.Add(1)%2 == 1
},
}
@@ -225,6 +234,9 @@ func TestDropOutgoingPackets(t *testing.T) {
case <-time.After(100 * time.Millisecond):
}
require.Len(t, serverReceivedPackets, numPackets)
require.Equal(t, *fromAddr.Load(), serverAddr)
require.Equal(t, *toAddr.Load(), clientConn.LocalAddr())
}
func TestDelayIncomingPackets(t *testing.T) {
@@ -235,7 +247,7 @@ func TestDelayIncomingPackets(t *testing.T) {
proxy := Proxy{
Conn: newUPDConnLocalhost(t),
ServerAddr: serverAddr,
DelayPacket: func(d Direction, _ []byte) time.Duration {
DelayPacket: func(d Direction, _, _ net.Addr, _ []byte) time.Duration {
// delay packet 1 by 200 ms
// delay packet 2 by 400 ms
// ...
@@ -282,7 +294,7 @@ func TestPacketReordering(t *testing.T) {
proxy := Proxy{
Conn: newUPDConnLocalhost(t),
ServerAddr: serverAddr,
DelayPacket: func(d Direction, _ []byte) time.Duration {
DelayPacket: func(d Direction, _, _ net.Addr, _ []byte) time.Duration {
// delay packet 1 by 600 ms
// delay packet 2 by 400 ms
// delay packet 3 by 200 ms
@@ -321,7 +333,7 @@ func TestConstantDelay(t *testing.T) { // no reordering expected here
proxy := Proxy{
Conn: newUPDConnLocalhost(t),
ServerAddr: serverAddr,
DelayPacket: func(d Direction, _ []byte) time.Duration {
DelayPacket: func(d Direction, _, _ net.Addr, _ []byte) time.Duration {
if d == DirectionOutgoing {
return 0
}
@@ -359,7 +371,7 @@ func TestDelayOutgoingPackets(t *testing.T) {
proxy := Proxy{
Conn: newUPDConnLocalhost(t),
ServerAddr: serverAddr,
DelayPacket: func(d Direction, _ []byte) time.Duration {
DelayPacket: func(d Direction, _, _ net.Addr, _ []byte) time.Duration {
// delay packet 1 by 200 ms
// delay packet 2 by 400 ms
// ...