From 089582dd9f84cdf1d1e4b3d4cdda76c6e7921b43 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 15 May 2016 18:05:15 +0700 Subject: [PATCH] improve sending of WindowUpdate frames fixes #96 --- packet_packer.go | 22 ++-------- packet_packer_test.go | 65 ----------------------------- protocol/server_parameters.go | 3 ++ session.go | 13 +++--- session_test.go | 16 ++++++++ window_update_manager.go | 66 ++++++++++++++++++++++++++++++ window_update_manager_test.go | 77 +++++++++++++++++++++++++++++++++++ 7 files changed, 173 insertions(+), 89 deletions(-) create mode 100644 window_update_manager.go create mode 100644 window_update_manager_test.go diff --git a/packet_packer.go b/packet_packer.go index a2ca327d..3ed80248 100644 --- a/packet_packer.go +++ b/packet_packer.go @@ -27,9 +27,8 @@ type packetPacker struct { sentPacketHandler ackhandler.SentPacketHandler connectionParametersManager *handshake.ConnectionParametersManager - streamFrameQueue StreamFrameQueue - windowUpdateFrames []*frames.WindowUpdateFrame - controlFrames []frames.Frame + streamFrameQueue StreamFrameQueue + controlFrames []frames.Frame lastPacketNumber protocol.PacketNumber } @@ -42,13 +41,9 @@ func (p *packetPacker) AddHighPrioStreamFrame(f frames.StreamFrame) { p.streamFrameQueue.Push(&f, true) } -func (p *packetPacker) AddWindowUpdateFrame(f *frames.WindowUpdateFrame) { - p.windowUpdateFrames = append(p.windowUpdateFrames, f) -} - func (p *packetPacker) PackPacket(stopWaitingFrame *frames.StopWaitingFrame, controlFrames []frames.Frame, includeStreamFrames bool) (*packedPacket, error) { // don't send out packets that only contain a StopWaitingFrame - if len(p.windowUpdateFrames) == 0 && len(controlFrames) == 0 && (p.streamFrameQueue.Len() == 0 || !includeStreamFrames) { + if len(controlFrames) == 0 && (p.streamFrameQueue.Len() == 0 || !includeStreamFrames) { return nil, nil } @@ -132,17 +127,6 @@ func (p *packetPacker) composeNextPacket(stopWaitingFrame *frames.StopWaitingFra maxFrameSize := protocol.MaxFrameAndPublicHeaderSize - publicHeaderLength - for len(p.windowUpdateFrames) > 0 { - frame := p.windowUpdateFrames[0] - minLength, _ := frame.MinLength() // windowUpdateFrames.MinLength() *never* returns an error - if payloadLength+minLength > maxFrameSize { - break - } - payloadLength += minLength - payloadFrames = append(payloadFrames, frame) - p.windowUpdateFrames = p.windowUpdateFrames[1:] - } - if stopWaitingFrame != nil { payloadFrames = append(payloadFrames, stopWaitingFrame) minLength, err := stopWaitingFrame.MinLength() diff --git a/packet_packer_test.go b/packet_packer_test.go index d9bb2264..027545ba 100644 --- a/packet_packer_test.go +++ b/packet_packer_test.go @@ -186,71 +186,6 @@ var _ = Describe("Packet packer", func() { Expect(packer.lastPacketNumber).To(Equal(protocol.PacketNumber(2))) }) - Context("WindowUpdate Frame handling", func() { - It("packs one WindowUpdateFrame", func() { - f := &frames.WindowUpdateFrame{ - StreamID: 0x1337, - ByteOffset: 0xDECAFBAD, - } - packer.AddWindowUpdateFrame(f) - payloadFrames, err := packer.composeNextPacket(nil, publicHeaderLen, true) - Expect(err).ToNot(HaveOccurred()) - Expect(payloadFrames).To(HaveLen(1)) - Expect(payloadFrames[0]).To(Equal(f)) - }) - - It("packs multiple WindowUpdateFrame", func() { - f1 := &frames.WindowUpdateFrame{ - StreamID: 0x1337, - ByteOffset: 0xDECAFBAD, - } - f2 := &frames.WindowUpdateFrame{ - StreamID: 0x7331, - ByteOffset: 0xDABFACED, - } - packer.AddWindowUpdateFrame(f1) - packer.AddWindowUpdateFrame(f2) - payloadFrames, err := packer.composeNextPacket(nil, publicHeaderLen, true) - Expect(err).ToNot(HaveOccurred()) - Expect(payloadFrames).To(HaveLen(2)) - Expect(payloadFrames).To(ContainElement(f1)) - Expect(payloadFrames).To(ContainElement(f2)) - }) - - It("packs a lot of window update frames into 2 packets if they don't fit into one", func() { - windowUpdateFrame := &frames.WindowUpdateFrame{ - StreamID: 0x1337, - } - minLength, _ := windowUpdateFrame.MinLength() - maxFramesPerPacket := int(protocol.MaxFrameAndPublicHeaderSize-publicHeaderLen) / int(minLength) - var controlFrames []frames.Frame - for i := 0; i < maxFramesPerPacket+10; i++ { - controlFrames = append(controlFrames, windowUpdateFrame) - } - packer.controlFrames = controlFrames - payloadFrames, err := packer.composeNextPacket(nil, publicHeaderLen, true) - Expect(err).ToNot(HaveOccurred()) - Expect(payloadFrames).To(HaveLen(maxFramesPerPacket)) - payloadFrames, err = packer.composeNextPacket(nil, publicHeaderLen, true) - Expect(err).ToNot(HaveOccurred()) - Expect(payloadFrames).To(HaveLen(10)) - }) - - It("only packs a WindowUpdateFrame once", func() { - f := &frames.WindowUpdateFrame{ - StreamID: 0x1337, - ByteOffset: 0xDECAFBAD, - } - packer.AddWindowUpdateFrame(f) - payloadFrames, err := packer.composeNextPacket(nil, publicHeaderLen, true) - Expect(err).ToNot(HaveOccurred()) - Expect(payloadFrames).ToNot(BeNil()) - payloadFrames, err = packer.composeNextPacket(nil, publicHeaderLen, true) - Expect(err).ToNot(HaveOccurred()) - Expect(payloadFrames).To(BeNil()) - }) - }) - Context("Stream Frame handling", func() { It("does not splits a stream frame with maximum size", func() { f := frames.StreamFrame{ diff --git a/protocol/server_parameters.go b/protocol/server_parameters.go index 42206bee..e90bc07c 100644 --- a/protocol/server_parameters.go +++ b/protocol/server_parameters.go @@ -46,3 +46,6 @@ const MaxIdleConnectionStateLifetime = 60 * time.Second // WindowUpdateThreshold is the size of the receive flow control window for which we send out a WindowUpdate frame const WindowUpdateThreshold = ReceiveStreamFlowControlWindow / 2 + +// WindowUpdateNumRepitions is the number of times the same WindowUpdate frame will be sent to the client +const WindowUpdateNumRepitions uint8 = 2 diff --git a/session.go b/session.go index cede862d..78602f7a 100644 --- a/session.go +++ b/session.go @@ -50,6 +50,7 @@ type Session struct { sentPacketHandler ackhandler.SentPacketHandler receivedPacketHandler ackhandler.ReceivedPacketHandler stopWaitingManager ackhandler.StopWaitingManager + windowUpdateManager *WindowUpdateManager unpacker *packetUnpacker packer *packetPacker @@ -86,6 +87,7 @@ func NewSession(conn connection, v protocol.VersionNumber, connectionID protocol sentPacketHandler: ackhandler.NewSentPacketHandler(stopWaitingManager), receivedPacketHandler: ackhandler.NewReceivedPacketHandler(), stopWaitingManager: stopWaitingManager, + windowUpdateManager: NewWindowUpdateManager(), receivedPackets: make(chan receivedPacket, 1000), // TODO: What if server receives many packets and connection is already closed?! closeChan: make(chan struct{}, 1), sendingScheduled: make(chan struct{}, 1), @@ -454,6 +456,11 @@ func (s *Session) sendPacket() error { } } + windowUpdateFrames := s.windowUpdateManager.GetWindowUpdateFrames() + + for _, wuf := range windowUpdateFrames { + controlFrames = append(controlFrames, wuf) + } ack, err := s.receivedPacketHandler.GetAckFrame(true) if err != nil { @@ -525,11 +532,7 @@ func (s *Session) QueueStreamFrame(frame *frames.StreamFrame) error { // UpdateReceiveFlowControlWindow updates the flow control window for a stream func (s *Session) UpdateReceiveFlowControlWindow(streamID protocol.StreamID, byteOffset protocol.ByteCount) error { - wuf := frames.WindowUpdateFrame{ - StreamID: streamID, - ByteOffset: byteOffset, - } - s.packer.AddWindowUpdateFrame(&wuf) + s.windowUpdateManager.SetStreamOffset(streamID, byteOffset) return nil } diff --git a/session_test.go b/session_test.go index 747f01c1..1d2ecc13 100644 --- a/session_test.go +++ b/session_test.go @@ -353,6 +353,22 @@ var _ = Describe("Session", func() { Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x04, 0x05, 0, 0, 0}))) }) + It("repeats a WindowUpdate frame in WindowUpdateNumRepitions packets", func() { + _, err := session.NewStream(5) + Expect(err).ToNot(HaveOccurred()) + err = session.UpdateReceiveFlowControlWindow(5, 0xDECAFBAD) + Expect(err).ToNot(HaveOccurred()) + for i := uint8(0); i < protocol.WindowUpdateNumRepitions; i++ { + err = session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(conn.written[i]).To(ContainSubstring(string([]byte{0x04, 0x05, 0, 0, 0}))) + } + Expect(conn.written).To(HaveLen(int(protocol.WindowUpdateNumRepitions))) + err = session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(conn.written).To(HaveLen(int(protocol.WindowUpdateNumRepitions))) // no packet was sent + }) + It("sends public reset", func() { err := session.sendPublicReset(1) Expect(err).NotTo(HaveOccurred()) diff --git a/window_update_manager.go b/window_update_manager.go new file mode 100644 index 00000000..bd6627d9 --- /dev/null +++ b/window_update_manager.go @@ -0,0 +1,66 @@ +package quic + +import ( + "sync" + + "github.com/lucas-clemente/quic-go/frames" + "github.com/lucas-clemente/quic-go/protocol" +) + +type windowUpdateItem struct { + Offset protocol.ByteCount + Counter uint8 +} + +// WindowUpdateManager manages window update frames for receiving data +type WindowUpdateManager struct { + streamOffsets map[protocol.StreamID]*windowUpdateItem + mutex sync.RWMutex +} + +// NewWindowUpdateManager returns a new WindowUpdateManager +func NewWindowUpdateManager() *WindowUpdateManager { + return &WindowUpdateManager{ + streamOffsets: make(map[protocol.StreamID]*windowUpdateItem), + } +} + +// SetStreamOffset sets an offset for a stream +func (m *WindowUpdateManager) SetStreamOffset(streamID protocol.StreamID, n protocol.ByteCount) { + m.mutex.Lock() + defer m.mutex.Unlock() + + entry, ok := m.streamOffsets[streamID] + if !ok { + m.streamOffsets[streamID] = &windowUpdateItem{Offset: n} + return + } + + if n > entry.Offset { + entry.Offset = n + entry.Counter = 0 + } +} + +// GetWindowUpdateFrames gets all the WindowUpdate frames that need to be sent +func (m *WindowUpdateManager) GetWindowUpdateFrames() []*frames.WindowUpdateFrame { + m.mutex.RLock() + defer m.mutex.RUnlock() + + var wuf []*frames.WindowUpdateFrame + + for key, value := range m.streamOffsets { + if value.Counter >= protocol.WindowUpdateNumRepitions { + continue + } + + frame := frames.WindowUpdateFrame{ + StreamID: key, + ByteOffset: value.Offset, + } + value.Counter++ + wuf = append(wuf, &frame) + } + + return wuf +} diff --git a/window_update_manager_test.go b/window_update_manager_test.go new file mode 100644 index 00000000..4c707390 --- /dev/null +++ b/window_update_manager_test.go @@ -0,0 +1,77 @@ +package quic + +import ( + "github.com/lucas-clemente/quic-go/frames" + "github.com/lucas-clemente/quic-go/protocol" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("WindowUpdateManager", func() { + var wum *WindowUpdateManager + + BeforeEach(func() { + wum = NewWindowUpdateManager() + }) + + Context("queueing new window updates", func() { + It("queues a window update for a new stream", func() { + wum.SetStreamOffset(5, 0x1000) + Expect(wum.streamOffsets).To(HaveKey(protocol.StreamID(5))) + Expect(wum.streamOffsets[5].Offset).To(Equal(protocol.ByteCount(0x1000))) + }) + + It("updates the offset for an existing stream", func() { + wum.SetStreamOffset(5, 0x1000) + wum.SetStreamOffset(5, 0x2000) + Expect(wum.streamOffsets).To(HaveKey(protocol.StreamID(5))) + Expect(wum.streamOffsets[5].Offset).To(Equal(protocol.ByteCount(0x2000))) + }) + + It("does not decrease the offset for an existing stream", func() { + wum.SetStreamOffset(5, 0x1000) + wum.SetStreamOffset(5, 0x500) + Expect(wum.streamOffsets).To(HaveKey(protocol.StreamID(5))) + Expect(wum.streamOffsets[5].Offset).To(Equal(protocol.ByteCount(0x1000))) + }) + + It("resets the counter after increasing the offset", func() { + wum.streamOffsets[5] = &windowUpdateItem{ + Offset: 0x1000, + Counter: 1, + } + wum.SetStreamOffset(5, 0x2000) + Expect(wum.streamOffsets[5].Offset).To(Equal(protocol.ByteCount(0x2000))) + Expect(wum.streamOffsets[5].Counter).To(Equal(uint8(0))) + }) + }) + + Context("dequeueing window updates", func() { + BeforeEach(func() { + wum.SetStreamOffset(7, 0x1000) + wum.SetStreamOffset(9, 0x500) + }) + + It("gets the window update frames", func() { + f := wum.GetWindowUpdateFrames() + Expect(f).To(HaveLen(2)) + Expect(f).To(ContainElement(&frames.WindowUpdateFrame{StreamID: 7, ByteOffset: 0x1000})) + Expect(f).To(ContainElement(&frames.WindowUpdateFrame{StreamID: 9, ByteOffset: 0x500})) + }) + + It("increases the counter", func() { + _ = wum.GetWindowUpdateFrames() + Expect(wum.streamOffsets[7].Counter).To(Equal(uint8(1))) + Expect(wum.streamOffsets[9].Counter).To(Equal(uint8(1))) + }) + + It("only sends out a window update frame WindowUpdateNumRepitions times", func() { + for i := uint8(0); i < protocol.WindowUpdateNumRepitions; i++ { + frames := wum.GetWindowUpdateFrames() + Expect(frames).To(HaveLen(2)) + } + frames := wum.GetWindowUpdateFrames() + Expect(frames).To(BeEmpty()) + }) + }) +})