forked from quic-go/quic-go
delay transmission of small packets to wait for new data
fixes #9, fixes #61
This commit is contained in:
@@ -200,3 +200,7 @@ func (p *packetPacker) composeNextPacket(stopWaitingFrame *frames.StopWaitingFra
|
|||||||
func (p *packetPacker) Empty() bool {
|
func (p *packetPacker) Empty() bool {
|
||||||
return p.streamFrameQueue.Front() == nil
|
return p.streamFrameQueue.Front() == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *packetPacker) StreamFrameQueueByteLen() protocol.ByteCount {
|
||||||
|
return p.streamFrameQueue.ByteLen()
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
package protocol
|
package protocol
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
// A PacketNumber in QUIC
|
// A PacketNumber in QUIC
|
||||||
type PacketNumber uint64
|
type PacketNumber uint64
|
||||||
|
|
||||||
@@ -55,3 +57,10 @@ const DefaultMaxCongestionWindow PacketNumber = 107
|
|||||||
// MaxUndecryptablePackets limits the number of undecryptable packets that a
|
// MaxUndecryptablePackets limits the number of undecryptable packets that a
|
||||||
// session queues for later until it sends a public reset.
|
// session queues for later until it sends a public reset.
|
||||||
const MaxUndecryptablePackets = 10
|
const MaxUndecryptablePackets = 10
|
||||||
|
|
||||||
|
// SmallPacketPayloadSizeThreshold defines a threshold for small packets
|
||||||
|
// if the packet payload size (i.e. the packet without public header and private header) is below SmallPacketSizeThreshold, sending will be delayed by SmallPacketSendDelay
|
||||||
|
const SmallPacketPayloadSizeThreshold = MaxPacketSize / 2
|
||||||
|
|
||||||
|
// SmallPacketSendDelay is the time delay applied to small packets
|
||||||
|
const SmallPacketSendDelay = 500 * time.Microsecond
|
||||||
|
|||||||
55
session.go
55
session.go
@@ -62,6 +62,8 @@ type Session struct {
|
|||||||
undecryptablePackets []receivedPacket
|
undecryptablePackets []receivedPacket
|
||||||
aeadChanged chan struct{}
|
aeadChanged chan struct{}
|
||||||
|
|
||||||
|
smallPacketDelayedOccurranceTime time.Time
|
||||||
|
|
||||||
connectionParametersManager *handshake.ConnectionParametersManager
|
connectionParametersManager *handshake.ConnectionParametersManager
|
||||||
|
|
||||||
// Used to calculate the next packet number from the truncated wire
|
// Used to calculate the next packet number from the truncated wire
|
||||||
@@ -132,6 +134,12 @@ func (s *Session) Run() {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// receive at a nil channel blocks forever
|
||||||
|
var smallPacketSendTimer <-chan time.Time
|
||||||
|
if !s.smallPacketDelayedOccurranceTime.IsZero() {
|
||||||
|
smallPacketSendTimer = time.After(time.Now().Sub(s.smallPacketDelayedOccurranceTime))
|
||||||
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
select {
|
select {
|
||||||
case <-s.closeChan:
|
case <-s.closeChan:
|
||||||
@@ -144,6 +152,8 @@ func (s *Session) Run() {
|
|||||||
}
|
}
|
||||||
s.scheduleSending()
|
s.scheduleSending()
|
||||||
case <-s.sendingScheduled:
|
case <-s.sendingScheduled:
|
||||||
|
err = s.maybeSendPacket()
|
||||||
|
case <-smallPacketSendTimer:
|
||||||
err = s.sendPacket()
|
err = s.sendPacket()
|
||||||
case <-s.aeadChanged:
|
case <-s.aeadChanged:
|
||||||
s.tryDecryptingQueuedPackets()
|
s.tryDecryptingQueuedPackets()
|
||||||
@@ -381,6 +391,49 @@ func (s *Session) closeStreamsWithError(err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Session) maybeSendPacket() error {
|
||||||
|
if !s.congestionAllowsSending() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// always send out retransmissions immediately. No need to check the size of the packet
|
||||||
|
if s.sentPacketHandler.HasPacketForRetransmission() {
|
||||||
|
return s.sendPacket()
|
||||||
|
}
|
||||||
|
|
||||||
|
var maxPacketSize protocol.ByteCount // the maximum size of a packet we could send out at this moment
|
||||||
|
|
||||||
|
// we only estimate the size of the StopWaitingFrame here
|
||||||
|
stopWaitingFrame := s.stopWaitingManager.GetStopWaitingFrame()
|
||||||
|
if stopWaitingFrame != nil {
|
||||||
|
// The actual size of a StopWaitingFrame depends on the packet number of the packet it is sent with, and it's easier here to neglect the fact the StopWaitingFrame could be 5 bytes smaller than calculated here
|
||||||
|
maxPacketSize += 8
|
||||||
|
}
|
||||||
|
|
||||||
|
ack, err := s.receivedPacketHandler.GetAckFrame(false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ack != nil {
|
||||||
|
ackLength, _ := ack.MinLength() // MinLength never errors for an ACK frame
|
||||||
|
maxPacketSize += ackLength
|
||||||
|
}
|
||||||
|
|
||||||
|
// note that maxPacketSize can get (much) larger than protocol.MaxPacketSize if there is a long queue of StreamFrames
|
||||||
|
maxPacketSize += s.packer.StreamFrameQueueByteLen()
|
||||||
|
|
||||||
|
if maxPacketSize > protocol.SmallPacketPayloadSizeThreshold {
|
||||||
|
return s.sendPacket()
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.smallPacketDelayedOccurranceTime.IsZero() {
|
||||||
|
s.smallPacketDelayedOccurranceTime = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Session) sendPacket() error {
|
func (s *Session) sendPacket() error {
|
||||||
if !s.congestionAllowsSending() {
|
if !s.congestionAllowsSending() {
|
||||||
return nil
|
return nil
|
||||||
@@ -419,6 +472,8 @@ func (s *Session) sendPacket() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.smallPacketDelayedOccurranceTime = time.Time{} // zero
|
||||||
|
|
||||||
err = s.sentPacketHandler.SentPacket(&ackhandler.Packet{
|
err = s.sentPacketHandler.SentPacket(&ackhandler.Packet{
|
||||||
PacketNumber: packet.number,
|
PacketNumber: packet.number,
|
||||||
Frames: packet.frames,
|
Frames: packet.frames,
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package quic
|
package quic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"runtime"
|
"runtime"
|
||||||
@@ -379,6 +380,74 @@ var _ = Describe("Session", func() {
|
|||||||
session.Run()
|
session.Run()
|
||||||
Expect(session.sendingScheduled).To(Receive())
|
Expect(session.sendingScheduled).To(Receive())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Context("bundling of small packets", func() {
|
||||||
|
It("bundles two small frames into one packet", func() {
|
||||||
|
go session.Run()
|
||||||
|
|
||||||
|
session.QueueStreamFrame(&frames.StreamFrame{
|
||||||
|
StreamID: 5,
|
||||||
|
Data: []byte("foobar1"),
|
||||||
|
})
|
||||||
|
session.QueueStreamFrame(&frames.StreamFrame{
|
||||||
|
StreamID: 5,
|
||||||
|
Data: []byte("foobar2"),
|
||||||
|
})
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
Expect(conn.written).To(HaveLen(1))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("sends out two big frames in two packet", func() {
|
||||||
|
go session.Run()
|
||||||
|
|
||||||
|
session.QueueStreamFrame(&frames.StreamFrame{
|
||||||
|
StreamID: 5,
|
||||||
|
Data: bytes.Repeat([]byte{'e'}, int(protocol.SmallPacketPayloadSizeThreshold+50)),
|
||||||
|
})
|
||||||
|
session.QueueStreamFrame(&frames.StreamFrame{
|
||||||
|
StreamID: 5,
|
||||||
|
Data: bytes.Repeat([]byte{'f'}, int(protocol.SmallPacketPayloadSizeThreshold+50)),
|
||||||
|
})
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
Expect(conn.written).To(HaveLen(2))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("sends out two small frames that are written to long after one another into two packet", func() {
|
||||||
|
go session.Run()
|
||||||
|
|
||||||
|
session.QueueStreamFrame(&frames.StreamFrame{
|
||||||
|
StreamID: 5,
|
||||||
|
Data: []byte("foobar1"),
|
||||||
|
})
|
||||||
|
time.Sleep(10 * protocol.SmallPacketSendDelay)
|
||||||
|
session.QueueStreamFrame(&frames.StreamFrame{
|
||||||
|
StreamID: 5,
|
||||||
|
Data: []byte("foobar2"),
|
||||||
|
})
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
Expect(conn.written).To(HaveLen(2))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("sends a queued ACK frame only once", func() {
|
||||||
|
go session.Run()
|
||||||
|
|
||||||
|
packetNumber := protocol.PacketNumber(0x1337)
|
||||||
|
session.receivedPacketHandler.ReceivedPacket(packetNumber, true)
|
||||||
|
session.QueueStreamFrame(&frames.StreamFrame{
|
||||||
|
StreamID: 5,
|
||||||
|
Data: []byte("foobar1"),
|
||||||
|
})
|
||||||
|
time.Sleep(10 * protocol.SmallPacketSendDelay)
|
||||||
|
session.QueueStreamFrame(&frames.StreamFrame{
|
||||||
|
StreamID: 5,
|
||||||
|
Data: []byte("foobar2"),
|
||||||
|
})
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
Expect(conn.written).To(HaveLen(2))
|
||||||
|
Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x37, 0x13})))
|
||||||
|
Expect(conn.written[1]).ToNot(ContainSubstring(string([]byte{0x37, 0x13})))
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
It("closes when crypto stream errors", func() {
|
It("closes when crypto stream errors", func() {
|
||||||
|
|||||||
Reference in New Issue
Block a user