From ba799cef0167fa73c0784e43e43b1745ba20ae49 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 13 May 2016 17:06:30 +0700 Subject: [PATCH] delay transmission of small packets to wait for new data fixes #9, fixes #61 --- packet_packer.go | 4 +++ protocol/protocol.go | 9 ++++++ session.go | 55 +++++++++++++++++++++++++++++++++++ session_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+) diff --git a/packet_packer.go b/packet_packer.go index a4fa41db9..8bacbeaa6 100644 --- a/packet_packer.go +++ b/packet_packer.go @@ -200,3 +200,7 @@ func (p *packetPacker) composeNextPacket(stopWaitingFrame *frames.StopWaitingFra func (p *packetPacker) Empty() bool { return p.streamFrameQueue.Front() == nil } + +func (p *packetPacker) StreamFrameQueueByteLen() protocol.ByteCount { + return p.streamFrameQueue.ByteLen() +} diff --git a/protocol/protocol.go b/protocol/protocol.go index e20ac99ad..94aceedbe 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -1,5 +1,7 @@ package protocol +import "time" + // A PacketNumber in QUIC type PacketNumber uint64 @@ -55,3 +57,10 @@ const DefaultMaxCongestionWindow PacketNumber = 107 // MaxUndecryptablePackets limits the number of undecryptable packets that a // session queues for later until it sends a public reset. const MaxUndecryptablePackets = 10 + +// SmallPacketPayloadSizeThreshold defines a threshold for small packets +// if the packet payload size (i.e. the packet without public header and private header) is below SmallPacketSizeThreshold, sending will be delayed by SmallPacketSendDelay +const SmallPacketPayloadSizeThreshold = MaxPacketSize / 2 + +// SmallPacketSendDelay is the time delay applied to small packets +const SmallPacketSendDelay = 500 * time.Microsecond diff --git a/session.go b/session.go index 8948df491..98e8bd984 100644 --- a/session.go +++ b/session.go @@ -62,6 +62,8 @@ type Session struct { undecryptablePackets []receivedPacket aeadChanged chan struct{} + smallPacketDelayedOccurranceTime time.Time + connectionParametersManager *handshake.ConnectionParametersManager // Used to calculate the next packet number from the truncated wire @@ -132,6 +134,12 @@ func (s *Session) Run() { default: } + // receive at a nil channel blocks forever + var smallPacketSendTimer <-chan time.Time + if !s.smallPacketDelayedOccurranceTime.IsZero() { + smallPacketSendTimer = time.After(time.Now().Sub(s.smallPacketDelayedOccurranceTime)) + } + var err error select { case <-s.closeChan: @@ -144,6 +152,8 @@ func (s *Session) Run() { } s.scheduleSending() case <-s.sendingScheduled: + err = s.maybeSendPacket() + case <-smallPacketSendTimer: err = s.sendPacket() case <-s.aeadChanged: s.tryDecryptingQueuedPackets() @@ -381,6 +391,49 @@ func (s *Session) closeStreamsWithError(err error) { } } +func (s *Session) maybeSendPacket() error { + if !s.congestionAllowsSending() { + return nil + } + + // always send out retransmissions immediately. No need to check the size of the packet + if s.sentPacketHandler.HasPacketForRetransmission() { + return s.sendPacket() + } + + var maxPacketSize protocol.ByteCount // the maximum size of a packet we could send out at this moment + + // we only estimate the size of the StopWaitingFrame here + stopWaitingFrame := s.stopWaitingManager.GetStopWaitingFrame() + if stopWaitingFrame != nil { + // The actual size of a StopWaitingFrame depends on the packet number of the packet it is sent with, and it's easier here to neglect the fact the StopWaitingFrame could be 5 bytes smaller than calculated here + maxPacketSize += 8 + } + + ack, err := s.receivedPacketHandler.GetAckFrame(false) + if err != nil { + return err + } + + if ack != nil { + ackLength, _ := ack.MinLength() // MinLength never errors for an ACK frame + maxPacketSize += ackLength + } + + // note that maxPacketSize can get (much) larger than protocol.MaxPacketSize if there is a long queue of StreamFrames + maxPacketSize += s.packer.StreamFrameQueueByteLen() + + if maxPacketSize > protocol.SmallPacketPayloadSizeThreshold { + return s.sendPacket() + } + + if s.smallPacketDelayedOccurranceTime.IsZero() { + s.smallPacketDelayedOccurranceTime = time.Now() + } + + return nil +} + func (s *Session) sendPacket() error { if !s.congestionAllowsSending() { return nil @@ -419,6 +472,8 @@ func (s *Session) sendPacket() error { return nil } + s.smallPacketDelayedOccurranceTime = time.Time{} // zero + err = s.sentPacketHandler.SentPacket(&ackhandler.Packet{ PacketNumber: packet.number, Frames: packet.frames, diff --git a/session_test.go b/session_test.go index 8c1099c10..29ed9facd 100644 --- a/session_test.go +++ b/session_test.go @@ -1,6 +1,7 @@ package quic import ( + "bytes" "errors" "io" "runtime" @@ -379,6 +380,74 @@ var _ = Describe("Session", func() { session.Run() Expect(session.sendingScheduled).To(Receive()) }) + + Context("bundling of small packets", func() { + It("bundles two small frames into one packet", func() { + go session.Run() + + session.QueueStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte("foobar1"), + }) + session.QueueStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte("foobar2"), + }) + time.Sleep(10 * time.Millisecond) + Expect(conn.written).To(HaveLen(1)) + }) + + It("sends out two big frames in two packet", func() { + go session.Run() + + session.QueueStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: bytes.Repeat([]byte{'e'}, int(protocol.SmallPacketPayloadSizeThreshold+50)), + }) + session.QueueStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: bytes.Repeat([]byte{'f'}, int(protocol.SmallPacketPayloadSizeThreshold+50)), + }) + time.Sleep(10 * time.Millisecond) + Expect(conn.written).To(HaveLen(2)) + }) + + It("sends out two small frames that are written to long after one another into two packet", func() { + go session.Run() + + session.QueueStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte("foobar1"), + }) + time.Sleep(10 * protocol.SmallPacketSendDelay) + session.QueueStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte("foobar2"), + }) + time.Sleep(10 * time.Millisecond) + Expect(conn.written).To(HaveLen(2)) + }) + + It("sends a queued ACK frame only once", func() { + go session.Run() + + packetNumber := protocol.PacketNumber(0x1337) + session.receivedPacketHandler.ReceivedPacket(packetNumber, true) + session.QueueStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte("foobar1"), + }) + time.Sleep(10 * protocol.SmallPacketSendDelay) + session.QueueStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte("foobar2"), + }) + time.Sleep(10 * time.Millisecond) + Expect(conn.written).To(HaveLen(2)) + Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x37, 0x13}))) + Expect(conn.written[1]).ToNot(ContainSubstring(string([]byte{0x37, 0x13}))) + }) + }) }) It("closes when crypto stream errors", func() {