From da27fcf33f2afcf60a56c2cdf102953b5f10f998 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 13 Aug 2025 16:45:14 +0200 Subject: [PATCH] expose basic connection stats via Conn.ConnectionStats (#5281) * Add ConnectionStats * remove for loop * Add comments * Update comments --------- Co-authored-by: Marco Munizaga --- connection.go | 60 ++++++++++++++- internal/ackhandler/ackhandler.go | 3 +- internal/ackhandler/sent_packet_handler.go | 9 +++ .../ackhandler/sent_packet_handler_test.go | 19 ++++- internal/congestion/cubic_sender.go | 8 ++ internal/congestion/cubic_sender_test.go | 4 + internal/protocol/protocol.go | 3 + internal/utils/connstats.go | 14 ++++ internal/utils/rtt_stats.go | 77 +++++++++++++------ 9 files changed, 169 insertions(+), 28 deletions(-) create mode 100644 internal/utils/connstats.go diff --git a/connection.go b/connection.go index 710125f0b..1560bb151 100644 --- a/connection.go +++ b/connection.go @@ -127,7 +127,8 @@ type Conn struct { connIDManager *connIDManager connIDGenerator *connIDGenerator - rttStats *utils.RTTStats + rttStats *utils.RTTStats + connStats utils.ConnectionStats cryptoStreamManager *cryptoStreamManager sentPacketHandler ackhandler.SentPacketHandler @@ -286,6 +287,7 @@ var newConnection = func( 0, protocol.ByteCount(s.config.InitialPacketSize), s.rttStats, + &s.connStats, clientAddressValidated, s.conn.capabilities().ECN, s.perspective, @@ -400,6 +402,7 @@ var newClientConnection = func( initialPacketNumber, protocol.ByteCount(s.config.InitialPacketSize), s.rttStats, + &s.connStats, false, // has no effect s.conn.capabilities().ECN, s.perspective, @@ -733,6 +736,61 @@ func (c *Conn) ConnectionState() ConnectionState { return c.connState } +// ConnectionStats contains statistics about the QUIC connection +type ConnectionStats struct { + // MinRTT is the estimate of the minimum RTT observed on the active network + // path. + MinRTT time.Duration + // LatestRTT is the last RTT sample observed on the active network path. + LatestRTT time.Duration + // SmoothedRTT is an exponentially weighted moving average of an endpoint's + // RTT samples. See https://www.rfc-editor.org/rfc/rfc9002#section-5.3 + SmoothedRTT time.Duration + // MeanDeviation estimates the variation in the RTT samples using a mean + // variation. See https://www.rfc-editor.org/rfc/rfc9002#section-5.3 + MeanDeviation time.Duration + + // BytesSent is the number of bytes sent on the underlying connection, + // including retransmissions. Does not include UDP or any other outer + // framing. + BytesSent uint64 + // PacketsSent is the number of packets sent on the underlying connection, + // including those that are determined to have been lost. + PacketsSent uint64 + // BytesReceived is the number of total bytes received on the underlying + // connection, including duplicate data for streams. Does not include UDP or + // any other outer framing. + BytesReceived uint64 + // PacketsReceived is the number of total packets received on the underlying + // connection, including packets that were not processable. + PacketsReceived uint64 + // BytesLost is the number of bytes lost on the underlying connection (does + // not monotonically increase, because packets that are declared lost can + // subsequently be received). Does not include UDP or any other outer + // framing. + BytesLost uint64 + // PacketsLost is the number of packets lost on the underlying connection + // (does not monotonically increase, because packets that are declared lost + // can subsequently be received). + PacketsLost uint64 +} + +func (c *Conn) ConnectionStats() ConnectionStats { + return ConnectionStats{ + MinRTT: c.rttStats.MinRTT(), + LatestRTT: c.rttStats.LatestRTT(), + SmoothedRTT: c.rttStats.SmoothedRTT(), + MeanDeviation: c.rttStats.MeanDeviation(), + + BytesSent: c.connStats.BytesSent.Load(), + PacketsSent: c.connStats.PacketsSent.Load(), + BytesReceived: c.connStats.BytesReceived.Load(), + PacketsReceived: c.connStats.PacketsReceived.Load(), + BytesLost: c.connStats.BytesLost.Load(), + PacketsLost: c.connStats.PacketsLost.Load(), + } +} + // Time when the connection should time out func (c *Conn) nextIdleTimeoutTime() time.Time { idleTimeout := max(c.idleTimeout, c.rttStats.PTO(true)*3) diff --git a/internal/ackhandler/ackhandler.go b/internal/ackhandler/ackhandler.go index 6f890b4d3..f1bac3276 100644 --- a/internal/ackhandler/ackhandler.go +++ b/internal/ackhandler/ackhandler.go @@ -13,12 +13,13 @@ func NewAckHandler( initialPacketNumber protocol.PacketNumber, initialMaxDatagramSize protocol.ByteCount, rttStats *utils.RTTStats, + connStats *utils.ConnectionStats, clientAddressValidated bool, enableECN bool, pers protocol.Perspective, tracer *logging.ConnectionTracer, logger utils.Logger, ) (SentPacketHandler, ReceivedPacketHandler) { - sph := newSentPacketHandler(initialPacketNumber, initialMaxDatagramSize, rttStats, clientAddressValidated, enableECN, pers, tracer, logger) + sph := newSentPacketHandler(initialPacketNumber, initialMaxDatagramSize, rttStats, connStats, clientAddressValidated, enableECN, pers, tracer, logger) return sph, newReceivedPacketHandler(sph, logger) } diff --git a/internal/ackhandler/sent_packet_handler.go b/internal/ackhandler/sent_packet_handler.go index a53be7d70..90f1dec71 100644 --- a/internal/ackhandler/sent_packet_handler.go +++ b/internal/ackhandler/sent_packet_handler.go @@ -90,6 +90,7 @@ type sentPacketHandler struct { congestion congestion.SendAlgorithmWithDebugInfos rttStats *utils.RTTStats + connStats *utils.ConnectionStats // The number of times a PTO has been sent without receiving an ack. ptoCount uint32 @@ -121,6 +122,7 @@ func newSentPacketHandler( initialPN protocol.PacketNumber, initialMaxDatagramSize protocol.ByteCount, rttStats *utils.RTTStats, + connStats *utils.ConnectionStats, clientAddressValidated bool, enableECN bool, pers protocol.Perspective, @@ -130,6 +132,7 @@ func newSentPacketHandler( congestion := congestion.NewCubicSender( congestion.DefaultClock{}, rttStats, + connStats, initialMaxDatagramSize, true, // use Reno tracer, @@ -142,6 +145,7 @@ func newSentPacketHandler( handshakePackets: newPacketNumberSpace(0, false), appDataPackets: newPacketNumberSpace(0, true), rttStats: rttStats, + connStats: connStats, congestion: congestion, perspective: pers, tracer: tracer, @@ -216,6 +220,7 @@ func (h *sentPacketHandler) DropPackets(encLevel protocol.EncryptionLevel, now t } func (h *sentPacketHandler) ReceivedBytes(n protocol.ByteCount, t time.Time) { + h.connStats.BytesReceived.Add(uint64(n)) wasAmplificationLimit := h.isAmplificationLimited() h.bytesReceived += n if wasAmplificationLimit && !h.isAmplificationLimited() { @@ -224,6 +229,7 @@ func (h *sentPacketHandler) ReceivedBytes(n protocol.ByteCount, t time.Time) { } func (h *sentPacketHandler) ReceivedPacket(l protocol.EncryptionLevel, t time.Time) { + h.connStats.PacketsReceived.Add(1) if h.perspective == protocol.PerspectiveServer && l == protocol.EncryptionHandshake && !h.peerAddressValidated { h.peerAddressValidated = true h.setLossDetectionTimer(t) @@ -253,6 +259,8 @@ func (h *sentPacketHandler) SentPacket( isPathProbePacket bool, ) { h.bytesSent += size + h.connStats.BytesSent.Add(uint64(size)) + h.connStats.PacketsSent.Add(1) pnSpace := h.getPacketNumberSpace(encLevel) if h.logger.Debug() && (pnSpace.history.HasOutstandingPackets() || pnSpace.history.HasOutstandingPathProbes()) { @@ -992,6 +1000,7 @@ func (h *sentPacketHandler) MigratedPath(now time.Time, initialMaxDatagramSize p h.congestion = congestion.NewCubicSender( congestion.DefaultClock{}, h.rttStats, + h.connStats, initialMaxDatagramSize, true, // use Reno h.tracer, diff --git a/internal/ackhandler/sent_packet_handler_test.go b/internal/ackhandler/sent_packet_handler_test.go index 54d5e6335..65309ffbc 100644 --- a/internal/ackhandler/sent_packet_handler_test.go +++ b/internal/ackhandler/sent_packet_handler_test.go @@ -107,6 +107,7 @@ func testSentPacketHandlerSendAndAcknowledge(t *testing.T, encLevel protocol.Enc 0, 1200, &utils.RTTStats{}, + &utils.ConnectionStats{}, false, false, protocol.PerspectiveClient, @@ -160,6 +161,7 @@ func TestSentPacketHandlerAcknowledgeSkippedPacket(t *testing.T) { 0, 1200, &utils.RTTStats{}, + &utils.ConnectionStats{}, false, false, protocol.PerspectiveClient, @@ -214,6 +216,7 @@ func testSentPacketHandlerRTTs(t *testing.T, encLevel protocol.EncryptionLevel, 0, 1200, &rttStats, + &utils.ConnectionStats{}, false, false, protocol.PerspectiveClient, @@ -253,7 +256,7 @@ func testSentPacketHandlerRTTs(t *testing.T, encLevel protocol.EncryptionLevel, for i := 0; i < 5; i++ { packets = append(packets, sendPacket(now)) } - expectedRTTStatsNoAckDelay := expectedRTTStats + expectedRTTStatsNoAckDelay := expectedRTTStats.Clone() for i := 0; i < 5; i++ { const ackDelay = 500 * time.Millisecond expectedRTTStats.UpdateRTT(time.Duration(i+1)*time.Second, ackDelay) @@ -302,6 +305,7 @@ func testSentPacketHandlerAmplificationLimitServer(t *testing.T, addressValidate 0, 1200, &utils.RTTStats{}, + &utils.ConnectionStats{}, addressValidated, false, protocol.PerspectiveServer, @@ -371,6 +375,7 @@ func testSentPacketHandlerAmplificationLimitClient(t *testing.T, dropHandshake b 0, 1200, &utils.RTTStats{}, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveClient, @@ -425,6 +430,7 @@ func TestSentPacketHandlerDelayBasedLossDetection(t *testing.T) { 0, 1200, &rttStats, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveServer, @@ -477,6 +483,7 @@ func TestSentPacketHandlerPacketBasedLossDetection(t *testing.T) { 0, 1200, &rttStats, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveServer, @@ -541,6 +548,7 @@ func testSentPacketHandlerPTO(t *testing.T, encLevel protocol.EncryptionLevel, p 0, 1200, &rttStats, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveServer, @@ -680,6 +688,7 @@ func TestSentPacketHandlerPacketNumberSpacesPTO(t *testing.T) { 0, 1200, &rttStats, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveServer, @@ -770,6 +779,7 @@ func TestSentPacketHandler0RTT(t *testing.T) { 0, 1200, &utils.RTTStats{}, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveClient, @@ -819,6 +829,7 @@ func TestSentPacketHandlerCongestion(t *testing.T) { 0, 1200, &rttStats, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveServer, @@ -918,6 +929,7 @@ func testSentPacketHandlerRetry(t *testing.T, rtt, expectedRTT time.Duration) { 0, 1200, &rttStats, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveClient, @@ -968,6 +980,7 @@ func TestSentPacketHandlerRetryAfterPTO(t *testing.T) { 0, 1200, &rttStats, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveClient, @@ -1011,6 +1024,7 @@ func TestSentPacketHandlerECN(t *testing.T) { 0, 1200, &utils.RTTStats{}, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveClient, @@ -1113,6 +1127,7 @@ func TestSentPacketHandlerPathProbe(t *testing.T) { 0, 1200, &rttStats, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveClient, @@ -1191,6 +1206,7 @@ func TestSentPacketHandlerPathProbeAckAndLoss(t *testing.T) { 0, 1200, &rttStats, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveClient, @@ -1265,6 +1281,7 @@ func testSentPacketHandlerRandomized(t *testing.T, seed uint64) { 0, 1200, &rttStats, + &utils.ConnectionStats{}, true, false, protocol.PerspectiveClient, diff --git a/internal/congestion/cubic_sender.go b/internal/congestion/cubic_sender.go index 075b08e00..faac12d78 100644 --- a/internal/congestion/cubic_sender.go +++ b/internal/congestion/cubic_sender.go @@ -22,6 +22,7 @@ const ( type cubicSender struct { hybridSlowStart HybridSlowStart rttStats *utils.RTTStats + connStats *utils.ConnectionStats cubic *Cubic pacer *pacer clock Clock @@ -68,6 +69,7 @@ var ( func NewCubicSender( clock Clock, rttStats *utils.RTTStats, + connStats *utils.ConnectionStats, initialMaxDatagramSize protocol.ByteCount, reno bool, tracer *logging.ConnectionTracer, @@ -75,6 +77,7 @@ func NewCubicSender( return newCubicSender( clock, rttStats, + connStats, reno, initialMaxDatagramSize, initialCongestionWindow*initialMaxDatagramSize, @@ -86,6 +89,7 @@ func NewCubicSender( func newCubicSender( clock Clock, rttStats *utils.RTTStats, + connStats *utils.ConnectionStats, reno bool, initialMaxDatagramSize, initialCongestionWindow, @@ -94,6 +98,7 @@ func newCubicSender( ) *cubicSender { c := &cubicSender{ rttStats: rttStats, + connStats: connStats, largestSentPacketNumber: protocol.InvalidPacketNumber, largestAckedPacketNumber: protocol.InvalidPacketNumber, largestSentAtLastCutback: protocol.InvalidPacketNumber, @@ -189,6 +194,9 @@ func (c *cubicSender) OnPacketAcked( } func (c *cubicSender) OnCongestionEvent(packetNumber protocol.PacketNumber, lostBytes, priorInFlight protocol.ByteCount) { + c.connStats.PacketsLost.Add(1) + c.connStats.BytesLost.Add(uint64(lostBytes)) + // TCP NewReno (RFC6582) says that once a loss occurs, any losses in packets // already sent should be treated as a single loss event, since it's expected. if packetNumber <= c.largestSentAtLastCutback { diff --git a/internal/congestion/cubic_sender_test.go b/internal/congestion/cubic_sender_test.go index 82866d28e..dded2d25a 100644 --- a/internal/congestion/cubic_sender_test.go +++ b/internal/congestion/cubic_sender_test.go @@ -47,6 +47,7 @@ func newTestCubicSender(cubic bool) *testCubicSender { sender: newCubicSender( &clock, &rttStats, + &utils.ConnectionStats{}, !cubic, protocol.InitialPacketSize, initialCongestionWindowPackets*maxDatagramSize, @@ -494,6 +495,7 @@ func TestCubicSenderSlowStartsUpToMaximumCongestionWindow(t *testing.T) { sender := newCubicSender( &clock, &rttStats, + &utils.ConnectionStats{}, true, protocol.InitialPacketSize, initialCongestionWindowPackets*maxDatagramSize, @@ -520,6 +522,7 @@ func TestCubicSenderSlowStartsPacketSizeIncrease(t *testing.T) { sender := newCubicSender( &clock, &rttStats, + &utils.ConnectionStats{}, true, protocol.InitialPacketSize, initialCongestionWindowPackets*maxDatagramSize, @@ -543,6 +546,7 @@ func TestCubicSenderLimitCwndIncreaseInCongestionAvoidance(t *testing.T) { sender := newCubicSender( &clock, &rttStats, + &utils.ConnectionStats{}, false, protocol.InitialPacketSize, initialCongestionWindowPackets*maxDatagramSize, diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index b1109ff43..a8813a662 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -2,6 +2,7 @@ package protocol import ( "fmt" + "sync/atomic" "time" ) @@ -95,6 +96,8 @@ func (e ECN) String() string { // A ByteCount in QUIC type ByteCount int64 +type AtomicByteCount atomic.Int64 + // MaxByteCount is the maximum value of a ByteCount const MaxByteCount = ByteCount(1<<62 - 1) diff --git a/internal/utils/connstats.go b/internal/utils/connstats.go new file mode 100644 index 000000000..19d883126 --- /dev/null +++ b/internal/utils/connstats.go @@ -0,0 +1,14 @@ +package utils + +import "sync/atomic" + +// ConnectionStats stores stats for the connection. See the public +// ConnectionStats struct in connection.go for more information +type ConnectionStats struct { + BytesSent atomic.Uint64 + PacketsSent atomic.Uint64 + BytesReceived atomic.Uint64 + PacketsReceived atomic.Uint64 + BytesLost atomic.Uint64 + PacketsLost atomic.Uint64 +} diff --git a/internal/utils/rtt_stats.go b/internal/utils/rtt_stats.go index cfa9d5b0d..61cbef5c3 100644 --- a/internal/utils/rtt_stats.go +++ b/internal/utils/rtt_stats.go @@ -1,6 +1,7 @@ package utils import ( + "sync/atomic" "time" "github.com/quic-go/quic-go/internal/protocol" @@ -19,31 +20,41 @@ const ( type RTTStats struct { hasMeasurement bool - minRTT time.Duration - latestRTT time.Duration - smoothedRTT time.Duration - meanDeviation time.Duration + minRTT atomic.Int64 // nanoseconds + latestRTT atomic.Int64 // nanoseconds + smoothedRTT atomic.Int64 // nanoseconds + meanDeviation atomic.Int64 // nanoseconds - maxAckDelay time.Duration + maxAckDelay atomic.Int64 // nanoseconds } // MinRTT Returns the minRTT for the entire connection. // May return Zero if no valid updates have occurred. -func (r *RTTStats) MinRTT() time.Duration { return r.minRTT } +func (r *RTTStats) MinRTT() time.Duration { + return time.Duration(r.minRTT.Load()) +} // LatestRTT returns the most recent rtt measurement. // May return Zero if no valid updates have occurred. -func (r *RTTStats) LatestRTT() time.Duration { return r.latestRTT } +func (r *RTTStats) LatestRTT() time.Duration { + return time.Duration(r.latestRTT.Load()) +} // SmoothedRTT returns the smoothed RTT for the connection. // May return Zero if no valid updates have occurred. -func (r *RTTStats) SmoothedRTT() time.Duration { return r.smoothedRTT } +func (r *RTTStats) SmoothedRTT() time.Duration { + return time.Duration(r.smoothedRTT.Load()) +} // MeanDeviation gets the mean deviation -func (r *RTTStats) MeanDeviation() time.Duration { return r.meanDeviation } +func (r *RTTStats) MeanDeviation() time.Duration { + return time.Duration(r.meanDeviation.Load()) +} // MaxAckDelay gets the max_ack_delay advertised by the peer -func (r *RTTStats) MaxAckDelay() time.Duration { return r.maxAckDelay } +func (r *RTTStats) MaxAckDelay() time.Duration { + return time.Duration(r.maxAckDelay.Load()) +} // PTO gets the probe timeout duration. func (r *RTTStats) PTO(includeMaxAckDelay bool) time.Duration { @@ -67,32 +78,37 @@ func (r *RTTStats) UpdateRTT(sendDelta, ackDelay time.Duration) { // ackDelay but the raw observed sendDelta, since poor clock granularity at // the client may cause a high ackDelay to result in underestimation of the // r.minRTT. - if r.minRTT == 0 || r.minRTT > sendDelta { - r.minRTT = sendDelta + minRTT := time.Duration(r.minRTT.Load()) + if minRTT == 0 || minRTT > sendDelta { + minRTT = sendDelta + r.minRTT.Store(int64(sendDelta)) } // Correct for ackDelay if information received from the peer results in a // an RTT sample at least as large as minRTT. Otherwise, only use the // sendDelta. sample := sendDelta - if sample-r.minRTT >= ackDelay { + if sample-minRTT >= ackDelay { sample -= ackDelay } - r.latestRTT = sample + r.latestRTT.Store(int64(sample)) // First time call. if !r.hasMeasurement { r.hasMeasurement = true - r.smoothedRTT = sample - r.meanDeviation = sample / 2 + r.smoothedRTT.Store(int64(sample)) + r.meanDeviation.Store(int64(sample / 2)) } else { - r.meanDeviation = time.Duration(oneMinusBeta*float32(r.meanDeviation/time.Microsecond)+rttBeta*float32((r.smoothedRTT-sample).Abs()/time.Microsecond)) * time.Microsecond - r.smoothedRTT = time.Duration((float32(r.smoothedRTT/time.Microsecond)*oneMinusAlpha)+(float32(sample/time.Microsecond)*rttAlpha)) * time.Microsecond + smoothedRTT := r.SmoothedRTT() + meanDev := time.Duration(oneMinusBeta*float32(r.MeanDeviation()/time.Microsecond)+rttBeta*float32((smoothedRTT-sample).Abs()/time.Microsecond)) * time.Microsecond + newSmoothedRTT := time.Duration((float32(smoothedRTT/time.Microsecond)*oneMinusAlpha)+(float32(sample/time.Microsecond)*rttAlpha)) * time.Microsecond + r.meanDeviation.Store(int64(meanDev)) + r.smoothedRTT.Store(int64(newSmoothedRTT)) } } // SetMaxAckDelay sets the max_ack_delay func (r *RTTStats) SetMaxAckDelay(mad time.Duration) { - r.maxAckDelay = mad + r.maxAckDelay.Store(int64(mad)) } // SetInitialRTT sets the initial RTT. @@ -105,15 +121,26 @@ func (r *RTTStats) SetInitialRTT(t time.Duration) { if r.hasMeasurement { return } - r.smoothedRTT = t - r.latestRTT = t + r.smoothedRTT.Store(int64(t)) + r.latestRTT.Store(int64(t)) } func (r *RTTStats) ResetForPathMigration() { r.hasMeasurement = false - r.minRTT = 0 - r.latestRTT = 0 - r.smoothedRTT = 0 - r.meanDeviation = 0 + r.minRTT.Store(0) + r.latestRTT.Store(0) + r.smoothedRTT.Store(0) + r.meanDeviation.Store(0) // max_ack_delay remains valid } + +func (r *RTTStats) Clone() *RTTStats { + out := &RTTStats{} + out.hasMeasurement = r.hasMeasurement + out.minRTT.Store(r.minRTT.Load()) + out.latestRTT.Store(r.latestRTT.Load()) + out.smoothedRTT.Store(r.smoothedRTT.Load()) + out.meanDeviation.Store(r.meanDeviation.Load()) + out.maxAckDelay.Store(r.maxAckDelay.Load()) + return out +}