From 10a541bfc0a0c2046842bb2e386be8162e158c48 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 28 Jan 2025 04:54:04 +0100 Subject: [PATCH] use a ringbuffer to store received packets in the connection (#4928) Reduces memory usage by replacing a fixed-capacity channel (256) with a ring buffer (initial capacity: 8). The buffer grows to 256 only when many packets are enqueued. --- connection.go | 111 +++++++++++++++++++++++++++++++------------------- 1 file changed, 69 insertions(+), 42 deletions(-) diff --git a/connection.go b/connection.go index 5202ff58c..623295691 100644 --- a/connection.go +++ b/connection.go @@ -19,6 +19,7 @@ import ( "github.com/quic-go/quic-go/internal/protocol" "github.com/quic-go/quic-go/internal/qerr" "github.com/quic-go/quic-go/internal/utils" + "github.com/quic-go/quic-go/internal/utils/ringbuffer" "github.com/quic-go/quic-go/internal/wire" "github.com/quic-go/quic-go/logging" ) @@ -154,8 +155,10 @@ type connection struct { oneRTTStream *cryptoStream // only set for the server cryptoStreamHandler cryptoStreamHandler - receivedPackets chan receivedPacket - sendingScheduled chan struct{} + notifyReceivedPacket chan struct{} + sendingScheduled chan struct{} + receivedPacketMx sync.Mutex + receivedPackets ringbuffer.RingBuffer[receivedPacket] // closeChan is used to notify the run loop that it should terminate closeChan chan struct{} @@ -478,7 +481,8 @@ func (s *connection) preSetup() { s.perspective, ) s.framer = newFramer(s.connFlowController) - s.receivedPackets = make(chan receivedPacket, protocol.MaxConnUnprocessedPackets) + s.receivedPackets.Init(8) + s.notifyReceivedPacket = make(chan struct{}, 1) s.closeChan = make(chan struct{}, 1) s.sendingScheduled = make(chan struct{}, 1) s.handshakeCompleteChan = make(chan struct{}) @@ -496,18 +500,14 @@ func (s *connection) run() (err error) { defer func() { s.ctxCancel(err) }() defer func() { - // Drain queued packets that will never be processed. - for { - select { - case p, ok := <-s.receivedPackets: - if !ok { - return - } - p.buffer.Decrement() - p.buffer.MaybeRelease() - default: - return - } + // drain queued packets that will never be processed + s.receivedPacketMx.Lock() + defer s.receivedPacketMx.Unlock() + + for !s.receivedPackets.Empty() { + p := s.receivedPackets.PopFront() + p.buffer.Decrement() + p.buffer.MaybeRelease() } }() @@ -574,8 +574,8 @@ runLoop: // We do all the interesting stuff after the switch statement, so // nothing to see here. case <-sendQueueAvailable: - case firstPacket := <-s.receivedPackets: - wasProcessed, err := s.handlePackets(firstPacket) + case <-s.notifyReceivedPacket: + wasProcessed, err := s.handlePackets() if err != nil { s.setCloseError(&closeError{err: err}) break runLoop @@ -784,32 +784,45 @@ func (s *connection) handleHandshakeConfirmed(now time.Time) error { return nil } -func (s *connection) handlePackets(firstPacket receivedPacket) (wasProcessed bool, _ error) { - wasProcessed, err := s.handleOnePacket(firstPacket) - if err != nil { - return false, err - } - // only process a single packet at a time before handshake completion - if !s.handshakeComplete { - return wasProcessed, nil - } +func (s *connection) handlePackets() (wasProcessed bool, _ error) { // Now process all packets in the receivedPackets channel. // Limit the number of packets to the length of the receivedPackets channel, // so we eventually get a chance to send out an ACK when receiving a lot of packets. - numPackets := len(s.receivedPackets) -receiveLoop: + s.receivedPacketMx.Lock() + numPackets := s.receivedPackets.Len() + if numPackets == 0 { + s.receivedPacketMx.Unlock() + return false, nil + } + + var hasMorePackets bool for i := 0; i < numPackets; i++ { + if i > 0 { + s.receivedPacketMx.Lock() + } + p := s.receivedPackets.PopFront() + hasMorePackets = !s.receivedPackets.Empty() + s.receivedPacketMx.Unlock() + + processed, err := s.handleOnePacket(p) + if err != nil { + return false, err + } + if processed { + wasProcessed = true + } + if !hasMorePackets { + break + } + // only process a single packet at a time before handshake completion + if !s.handshakeComplete { + break + } + } + if hasMorePackets { select { - case p := <-s.receivedPackets: - processed, err := s.handleOnePacket(p) - if err != nil { - return false, err - } - if processed { - wasProcessed = true - } + case s.notifyReceivedPacket <- struct{}{}: default: - break receiveLoop } } return wasProcessed, nil @@ -1392,14 +1405,22 @@ func (s *connection) handleFrame( // handlePacket is called by the server with a new packet func (s *connection) handlePacket(p receivedPacket) { + s.receivedPacketMx.Lock() // Discard packets once the amount of queued packets is larger than // the channel size, protocol.MaxConnUnprocessedPackets - select { - case s.receivedPackets <- p: - default: + if s.receivedPackets.Len() >= protocol.MaxConnUnprocessedPackets { if s.tracer != nil && s.tracer.DroppedPacket != nil { s.tracer.DroppedPacket(logging.PacketTypeNotDetermined, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropDOSPrevention) } + s.receivedPacketMx.Unlock() + return + } + s.receivedPackets.PushBack(p) + s.receivedPacketMx.Unlock() + + select { + case s.notifyReceivedPacket <- struct{}{}: + default: } } @@ -1978,7 +1999,10 @@ func (s *connection) sendPacketsWithoutGSO(now time.Time) error { return nil } // Prioritize receiving of packets over sending out more packets. - if len(s.receivedPackets) > 0 { + s.receivedPacketMx.Lock() + hasPackets := !s.receivedPackets.Empty() + s.receivedPacketMx.Unlock() + if hasPackets { s.pacingDeadline = deadlineSendImmediately return nil } @@ -2036,7 +2060,10 @@ func (s *connection) sendPacketsWithGSO(now time.Time) error { } // Prioritize receiving of packets over sending out more packets. - if len(s.receivedPackets) > 0 { + s.receivedPacketMx.Lock() + hasPackets := !s.receivedPackets.Empty() + s.receivedPacketMx.Unlock() + if hasPackets { s.pacingDeadline = deadlineSendImmediately return nil }