forked from quic-go/quic-go
allow receiving of multiple packets before sending a packet
By draining the channel holding received packets, we reduce the number of spurious ACKs we send. We also make sure that sending and receiving packets is balanced, as we allow sending of a large number of packets (if we have cwnd available and the pacing frequency is high enough).
This commit is contained in:
37
session.go
37
session.go
@@ -583,20 +583,41 @@ runLoop:
|
||||
case <-s.sendingScheduled:
|
||||
// We do all the interesting stuff after the switch statement, so
|
||||
// nothing to see here.
|
||||
case p := <-s.receivedPackets:
|
||||
s.sentPacketHandler.ReceivedBytes(p.Size())
|
||||
// Only reset the timers if this packet was actually processed.
|
||||
// This avoids modifying any state when handling undecryptable packets,
|
||||
// which could be injected by an attacker.
|
||||
if wasProcessed := s.handlePacketImpl(p); !wasProcessed {
|
||||
continue
|
||||
}
|
||||
case firstPacket := <-s.receivedPackets:
|
||||
s.sentPacketHandler.ReceivedBytes(firstPacket.Size())
|
||||
wasProcessed := s.handlePacketImpl(firstPacket)
|
||||
// Don't set timers and send packets if the packet made us close the session.
|
||||
select {
|
||||
case closeErr = <-s.closeChan:
|
||||
break runLoop
|
||||
default:
|
||||
}
|
||||
// Now process all packets in the receivedPackets channel.
|
||||
// Limit the number of packets to the length of the receivedPackets channel,
|
||||
// so we eventually get a chance to send out an ACK when receiving a lot of packets.
|
||||
numPackets := len(s.receivedPackets)
|
||||
receiveLoop:
|
||||
for i := 0; i < numPackets; i++ {
|
||||
select {
|
||||
case p := <-s.receivedPackets:
|
||||
if processed := s.handlePacketImpl(p); processed {
|
||||
wasProcessed = true
|
||||
}
|
||||
select {
|
||||
case closeErr = <-s.closeChan:
|
||||
break runLoop
|
||||
default:
|
||||
}
|
||||
default:
|
||||
break receiveLoop
|
||||
}
|
||||
}
|
||||
// Only reset the timers if this packet was actually processed.
|
||||
// This avoids modifying any state when handling undecryptable packets,
|
||||
// which could be injected by an attacker.
|
||||
if !wasProcessed {
|
||||
continue
|
||||
}
|
||||
case <-s.handshakeCompleteChan:
|
||||
s.handleHandshakeComplete()
|
||||
}
|
||||
|
||||
@@ -671,8 +671,9 @@ var _ = Describe("Session", func() {
|
||||
buf := &bytes.Buffer{}
|
||||
Expect(extHdr.Write(buf, sess.version)).To(Succeed())
|
||||
return &receivedPacket{
|
||||
data: append(buf.Bytes(), data...),
|
||||
buffer: getPacketBuffer(),
|
||||
data: append(buf.Bytes(), data...),
|
||||
buffer: getPacketBuffer(),
|
||||
rcvTime: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -857,6 +858,51 @@ var _ = Describe("Session", func() {
|
||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("processes multiple received packets before sending one", func() {
|
||||
sess.sessionCreationTime = time.Now()
|
||||
var pn protocol.PacketNumber
|
||||
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(hdr *wire.Header, rcvTime time.Time, data []byte) (*unpackedPacket, error) {
|
||||
pn++
|
||||
return &unpackedPacket{
|
||||
data: []byte{0}, // PADDING frame
|
||||
encryptionLevel: protocol.Encryption1RTT,
|
||||
packetNumber: pn,
|
||||
hdr: &wire.ExtendedHeader{Header: *hdr},
|
||||
}, nil
|
||||
}).Times(3)
|
||||
tracer.EXPECT().StartedConnection(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any())
|
||||
tracer.EXPECT().ReceivedPacket(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(hdr *wire.ExtendedHeader, _ protocol.ByteCount, _ []logging.Frame) {
|
||||
}).Times(3)
|
||||
packer.EXPECT().PackCoalescedPacket() // only expect a single call
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
sess.handlePacket(getPacket(&wire.ExtendedHeader{
|
||||
Header: wire.Header{DestConnectionID: srcConnID},
|
||||
PacketNumber: 0x1337,
|
||||
PacketNumberLen: protocol.PacketNumberLen2,
|
||||
}, []byte("foobar")))
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
cryptoSetup.EXPECT().RunHandshake().MaxTimes(1)
|
||||
sess.run()
|
||||
}()
|
||||
Consistently(sess.Context().Done()).ShouldNot(BeClosed())
|
||||
|
||||
// make the go routine return
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
cryptoSetup.EXPECT().Close()
|
||||
packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&coalescedPacket{buffer: getPacketBuffer()}, nil)
|
||||
expectReplaceWithClosed()
|
||||
tracer.EXPECT().ClosedConnection(gomock.Any())
|
||||
tracer.EXPECT().Close()
|
||||
mconn.EXPECT().Write(gomock.Any())
|
||||
fmt.Println("close")
|
||||
sess.closeLocal(errors.New("close"))
|
||||
Eventually(sess.Context().Done()).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("closes the session when unpacking fails because the reserved bits were incorrect", func() {
|
||||
unpacker.EXPECT().Unpack(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, wire.ErrInvalidReservedBits)
|
||||
streamManager.EXPECT().CloseWithError(gomock.Any())
|
||||
|
||||
Reference in New Issue
Block a user