From 643f0b4c6726415e065761892c872c7d76eb6f0f Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 15 Feb 2020 16:43:00 +0700 Subject: [PATCH] drop incoming packets when the server's receive queue is full --- internal/protocol/params.go | 3 +++ server.go | 8 +++++-- server_test.go | 46 +++++++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/internal/protocol/params.go b/internal/protocol/params.go index b6d6d2e7..8a7aad07 100644 --- a/internal/protocol/params.go +++ b/internal/protocol/params.go @@ -39,6 +39,9 @@ const DefaultMaxIncomingStreams = 100 // DefaultMaxIncomingUniStreams is the maximum number of unidirectional streams that a peer may open const DefaultMaxIncomingUniStreams = 100 +// MaxServerUnprocessedPackets is the max number of packets stored in the server that are not yet processed. +const MaxServerUnprocessedPackets = 1024 + // MaxSessionUnprocessedPackets is the max number of packets stored in each session that are not yet processed. const MaxSessionUnprocessedPackets = MaxCongestionWindowPackets diff --git a/server.go b/server.go index 65113547..33819eeb 100644 --- a/server.go +++ b/server.go @@ -179,7 +179,7 @@ func listen(conn net.PacketConn, tlsConf *tls.Config, config *Config, acceptEarl zeroRTTQueue: newZeroRTTQueue(), sessionQueue: make(chan quicSession), errorChan: make(chan struct{}), - receivedPackets: make(chan *receivedPacket, 1000), + receivedPackets: make(chan *receivedPacket, protocol.MaxServerUnprocessedPackets), newSession: newSession, logger: utils.DefaultLogger.WithPrefix("server"), acceptEarlySessions: acceptEarly, @@ -285,7 +285,11 @@ func (s *baseServer) Addr() net.Addr { } func (s *baseServer) handlePacket(p *receivedPacket) { - s.receivedPackets <- p + select { + case s.receivedPackets <- p: + default: + s.logger.Debugf("Dropping packet from %s (%d bytes). Server receive queue full.", p.remoteAddr, len(p.data)) + } } func (s *baseServer) handlePacketImpl(p *receivedPacket) bool /* was the packet handled */ { diff --git a/server_test.go b/server_test.go index b3349051..d5fe1403 100644 --- a/server_test.go +++ b/server_test.go @@ -11,6 +11,7 @@ import ( "runtime/pprof" "strings" "sync" + "sync/atomic" "time" "github.com/lucas-clemente/quic-go/qlog" @@ -438,6 +439,51 @@ var _ = Describe("Server", func() { Expect(createdSession).To(BeTrue()) }) + It("drops packets if the receive queue is full", func() { + phm.EXPECT().GetStatelessResetToken(gomock.Any()).AnyTimes() + phm.EXPECT().Add(gomock.Any(), gomock.Any()).AnyTimes() + + serv.config.AcceptToken = func(net.Addr, *Token) bool { return true } + acceptSession := make(chan struct{}) + var counter uint32 // to be used as an atomic, so we query it in Eventually + serv.newSession = func( + _ connection, + runner sessionRunner, + _ protocol.ConnectionID, + _ protocol.ConnectionID, + _ protocol.ConnectionID, + _ protocol.ConnectionID, + _ [16]byte, + _ *Config, + _ *tls.Config, + _ *handshake.TokenGenerator, + _ bool, + _ qlog.Tracer, + _ utils.Logger, + _ protocol.VersionNumber, + ) quicSession { + <-acceptSession + atomic.AddUint32(&counter, 1) + return nil + } + + serv.handlePacket(getInitial(protocol.ConnectionID{1, 2, 3, 4, 5, 6, 7, 8})) + var wg sync.WaitGroup + for i := 0; i < 3*protocol.MaxServerUnprocessedPackets; i++ { + wg.Add(1) + go func() { + defer GinkgoRecover() + defer wg.Done() + serv.handlePacket(getInitial(protocol.ConnectionID{1, 2, 3, 4, 5, 6, 7, 8})) + }() + } + wg.Wait() + + close(acceptSession) + Eventually(func() uint32 { return atomic.LoadUint32(&counter) }).Should(BeEquivalentTo(protocol.MaxServerUnprocessedPackets + 1)) + Consistently(func() uint32 { return atomic.LoadUint32(&counter) }).Should(BeEquivalentTo(protocol.MaxServerUnprocessedPackets + 1)) + }) + It("only creates a single session for a duplicate Initial", func() { serv.config.AcceptToken = func(_ net.Addr, _ *Token) bool { return true } var createdSession bool