From 6b9921bbfcb9bc95efb665ad37dd13d9fd134560 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 28 Jan 2025 05:08:05 +0100 Subject: [PATCH] refactor connection timer logic (#4927) --- connection.go | 46 +++++++++++++++++++++++++++----------- connection_test.go | 48 ++++++++++++++++++++++------------------ connection_timer_test.go | 11 --------- 3 files changed, 59 insertions(+), 46 deletions(-) diff --git a/connection.go b/connection.go index 62329569..2d67c151 100644 --- a/connection.go +++ b/connection.go @@ -544,10 +544,15 @@ runLoop: default: } - s.maybeResetTimer() + // no need to set a timer if we can send packets immediately + if s.pacingDeadline != deadlineSendImmediately { + s.maybeResetTimer() + } - var processedUndecryptablePacket bool + // 1st: handle undecryptable packets, if any. + // This can only occur before completion of the handshake. if len(s.undecryptablePacketsToProcess) > 0 { + var processedUndecryptablePacket bool queue := s.undecryptablePacketsToProcess s.undecryptablePacketsToProcess = nil for _, p := range queue { @@ -560,19 +565,36 @@ runLoop: processedUndecryptablePacket = true } } + if processedUndecryptablePacket { + // if we processed any undecryptable packets, jump to the resetting of the timers directly + continue + } } - // If we processed any undecryptable packets, jump to the resetting of the timers directly. - if !processedUndecryptablePacket { + + // 2nd: receive packets. + processed, err := s.handlePackets() // don't check receivedPackets.Len() in the run loop to avoid locking the mutex + if err != nil { + s.setCloseError(&closeError{err: err}) + break runLoop + } + + // We don't need to wait for new events if: + // * we processed packets: we probably need to send an ACK, and potentially more data + // * the pacer allows us to send more packets immediately + shouldProceedImmediately := sendQueueAvailable == nil && (processed || s.pacingDeadline == deadlineSendImmediately) + if !shouldProceedImmediately { + // 3rd: wait for something to happen: + // * closing of the connection + // * timer firing + // * sending scheduled + // * send queue available + // * received packets select { case <-s.closeChan: break runLoop case <-s.timer.Chan(): s.timer.SetRead() - // We do all the interesting stuff after the switch statement, so - // nothing to see here. case <-s.sendingScheduled: - // We do all the interesting stuff after the switch statement, so - // nothing to see here. case <-sendQueueAvailable: case <-s.notifyReceivedPacket: wasProcessed, err := s.handlePackets() @@ -580,19 +602,17 @@ runLoop: s.setCloseError(&closeError{err: err}) break runLoop } - // Only reset the timers if this packet was actually processed. - // This avoids modifying any state when handling undecryptable packets, - // which could be injected by an attacker. + // if we processed any undecryptable packets, jump to the resetting of the timers directly if !wasProcessed { continue } } } + // Check for loss detection timeout. + // This could cause packets to be declared lost, and retransmissions to be enqueued. now := time.Now() if timeout := s.sentPacketHandler.GetLossDetectionTimeout(); !timeout.IsZero() && timeout.Before(now) { - // This could cause packets to be retransmitted. - // Check it before trying to send packets. if err := s.sentPacketHandler.OnLossDetectionTimeout(now); err != nil { s.setCloseError(&closeError{err: err}) break runLoop diff --git a/connection_test.go b/connection_test.go index 32ade183..18541fc9 100644 --- a/connection_test.go +++ b/connection_test.go @@ -1399,48 +1399,52 @@ func TestConnection0RTTTransportParameters(t *testing.T) { func TestConnectionReceivePrioritization(t *testing.T) { t.Run("handshake complete", func(t *testing.T) { - counter := testConnectionReceivePrioritization(t, true) - require.Equal(t, 10, counter) + events := testConnectionReceivePrioritization(t, true, 5) + require.Equal(t, []string{"unpack", "unpack", "unpack", "unpack", "unpack", "pack"}, events) }) // before handshake completion, we trigger packing of a new packet every time we receive a packet t.Run("handshake not complete", func(t *testing.T) { - counter := testConnectionReceivePrioritization(t, false) - require.Equal(t, 1, counter) + events := testConnectionReceivePrioritization(t, false, 5) + require.Equal(t, []string{ + "unpack", "pack", + "unpack", "pack", + "unpack", "pack", + "unpack", "pack", + "unpack", "pack", + }, events) }) } -func testConnectionReceivePrioritization(t *testing.T, handshakeComplete bool) int { +func testConnectionReceivePrioritization(t *testing.T, handshakeComplete bool, numPackets int) []string { mockCtrl := gomock.NewController(t) unpacker := NewMockUnpacker(mockCtrl) opts := []testConnectionOpt{connectionOptUnpacker(unpacker)} if handshakeComplete { opts = append(opts, connectionOptHandshakeConfirmed()) } - tc := newServerTestConnection(t, - mockCtrl, - nil, - false, - opts..., - ) + tc := newServerTestConnection(t, mockCtrl, nil, false, opts...) + var events []string var counter int - var packedFirst bool + var testDone bool done := make(chan struct{}) unpacker.EXPECT().UnpackShortHeader(gomock.Any(), gomock.Any()).DoAndReturn( func(rcvTime time.Time, data []byte) (protocol.PacketNumber, protocol.PacketNumberLen, protocol.KeyPhaseBit, []byte, error) { - if !packedFirst { - counter++ + counter++ + if counter == numPackets { + testDone = true } + events = append(events, "unpack") return protocol.PacketNumber(counter), protocol.PacketNumberLen2, protocol.KeyPhaseZero, []byte{0, 1} /* PADDING, PING */, nil }, - ).AnyTimes() + ).Times(numPackets) switch handshakeComplete { case false: tc.packer.EXPECT().PackCoalescedPacket(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(b bool, bc protocol.ByteCount, t time.Time, v protocol.Version) (*coalescedPacket, error) { - if !packedFirst { - packedFirst = true + events = append(events, "pack") + if testDone { close(done) } return nil, nil @@ -1449,8 +1453,8 @@ func testConnectionReceivePrioritization(t *testing.T, handshakeComplete bool) i case true: tc.packer.EXPECT().AppendPacket(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(b *packetBuffer, bc protocol.ByteCount, t time.Time, v protocol.Version) (shortHeaderPacket, error) { - if !packedFirst { - packedFirst = true + events = append(events, "pack") + if testDone { close(done) } return shortHeaderPacket{}, errNothingToPack @@ -1458,10 +1462,11 @@ func testConnectionReceivePrioritization(t *testing.T, handshakeComplete bool) i ).AnyTimes() } - for i := 0; i < 10; i++ { + for i := range numPackets { tc.conn.handlePacket(getShortHeaderPacket(t, tc.srcConnID, protocol.PacketNumber(i), []byte("foobar"))) } + tc.connRunner.EXPECT().Remove(gomock.Any()).AnyTimes() errChan := make(chan error, 1) go func() { errChan <- tc.conn.run() }() @@ -1480,8 +1485,7 @@ func testConnectionReceivePrioritization(t *testing.T, handshakeComplete bool) i case <-time.After(time.Second): t.Fatal("timeout") } - - return counter + return events } func TestConnectionPacketBuffering(t *testing.T) { diff --git a/connection_timer_test.go b/connection_timer_test.go index d850fc43..189f7b5b 100644 --- a/connection_timer_test.go +++ b/connection_timer_test.go @@ -47,14 +47,3 @@ func TestConnectionTimerReset(t *testing.T) { timer.SetTimer(now.Add(time.Hour), now.Add(time.Minute), time.Time{}, time.Time{}) require.Equal(t, now.Add(time.Hour), timer.Deadline()) } - -func TestConnectionTimerSendImmediately(t *testing.T) { - now := time.Now() - timer := newTimer() - timer.SetTimer(now.Add(time.Hour), now.Add(time.Minute), time.Time{}, time.Time{}) - require.Equal(t, now.Add(time.Minute), timer.Deadline()) - timer.SetRead() - - timer.SetTimer(now.Add(time.Hour), now.Add(time.Minute), time.Time{}, deadlineSendImmediately) - require.Equal(t, deadlineSendImmediately, timer.Deadline()) -}