From 758334f6aa67c7180891b17f9cea920d388cd620 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 26 May 2016 11:14:38 +0700 Subject: [PATCH] move splitting of StreamFrames to StreamFrameQueue work towards #146 --- frames/stream_frame.go | 21 ------ frames/stream_frame_test.go | 32 --------- packet_packer.go | 26 ++----- stream_frame_queue.go | 81 ++++++++++++++++------ stream_frame_queue_test.go | 135 +++++++++++++++++++++++++++--------- 5 files changed, 170 insertions(+), 125 deletions(-) diff --git a/frames/stream_frame.go b/frames/stream_frame.go index 92511497..da999178 100644 --- a/frames/stream_frame.go +++ b/frames/stream_frame.go @@ -199,24 +199,3 @@ func (f *StreamFrame) MinLength() (protocol.ByteCount, error) { return length + 1, nil } - -// MaybeSplitOffFrame removes the first n bytes and returns them as a separate frame. If n >= len(n), nil is returned and nothing is modified. -func (f *StreamFrame) MaybeSplitOffFrame(n protocol.ByteCount) *StreamFrame { - minLength, _ := f.MinLength() // StreamFrame.MinLength *never* errors - if n >= minLength-1+protocol.ByteCount(len(f.Data)) { - return nil - } - n -= minLength - 1 - - defer func() { - f.Data = f.Data[n:] - f.Offset += n - }() - - return &StreamFrame{ - FinBit: false, - StreamID: f.StreamID, - Offset: f.Offset, - Data: f.Data[:n], - } -} diff --git a/frames/stream_frame_test.go b/frames/stream_frame_test.go index f50969a3..ff5d5608 100644 --- a/frames/stream_frame_test.go +++ b/frames/stream_frame_test.go @@ -371,36 +371,4 @@ var _ = Describe("StreamFrame", func() { Expect(f.getOffsetLength()).To(Equal(protocol.ByteCount(8))) }) }) - - Context("splitting off earlier stream frames", func() { - It("splits off nothing", func() { - f := &StreamFrame{ - StreamID: 1, - Data: []byte("bar"), - Offset: 3, - } - Expect(f.MaybeSplitOffFrame(1000)).To(BeNil()) - Expect(f.Offset).To(Equal(protocol.ByteCount(3))) - }) - - It("splits off initial frame", func() { - f := &StreamFrame{ - StreamID: 1, - Data: []byte("foobar"), - Offset: 3, - FinBit: true, - } - minLength, _ := f.MinLength() - previous := f.MaybeSplitOffFrame(minLength - 1 + 3) - Expect(previous).ToNot(BeNil()) - Expect(previous.StreamID).To(Equal(protocol.StreamID(1))) - Expect(previous.Data).To(Equal([]byte("foo"))) - Expect(previous.Offset).To(Equal(protocol.ByteCount(3))) - Expect(previous.FinBit).To(BeFalse()) - Expect(f.StreamID).To(Equal(protocol.StreamID(1))) - Expect(f.Data).To(Equal([]byte("bar"))) - Expect(f.Offset).To(Equal(protocol.ByteCount(6))) - Expect(f.FinBit).To(BeTrue()) - }) - }) }) diff --git a/packet_packer.go b/packet_packer.go index d39d6ef3..81ec669b 100644 --- a/packet_packer.go +++ b/packet_packer.go @@ -197,34 +197,23 @@ func (p *packetPacker) composeNextPacket(stopWaitingFrame *frames.StopWaitingFra maxFrameSize += 2 for p.streamFrameQueue.Len() > 0 { - frame := p.streamFrameQueue.Front() - frame.DataLenPresent = true // set the dataLen by default. Remove them later if applicable - if payloadLength > maxFrameSize { return nil, errors.New("PacketPacker BUG: packet payload too large") } - // Does the frame fit into the remaining space? - frameMinLength, _ := frame.MinLength() // StreamFrame.MinLength *never* returns an error - if payloadLength+frameMinLength > maxFrameSize { + frame := p.streamFrameQueue.Pop(maxFrameSize - payloadLength) + if frame == nil { break } + frame.DataLenPresent = true // set the dataLen by default. Remove them later if applicable - // Split stream frames if necessary - previousFrame := frame.MaybeSplitOffFrame(maxFrameSize - payloadLength) - if previousFrame != nil { - // Don't pop the queue, leave the modified frame in - frame = previousFrame - payloadLength += protocol.ByteCount(len(previousFrame.Data)) - 1 - } else { - p.streamFrameQueue.Pop() - payloadLength += protocol.ByteCount(len(frame.Data)) - 1 - } + frameMinLength, _ := frame.MinLength() // StreamFrame.MinLength *never* returns an error + payloadLength += frameMinLength - 1 + protocol.ByteCount(len(frame.Data)) blockedFrame := p.blockedManager.GetBlockedFrame(frame.StreamID, frame.Offset+protocol.ByteCount(len(frame.Data))) if blockedFrame != nil { blockedLength, _ := blockedFrame.MinLength() // BlockedFrame.MinLength *never* returns an error - if payloadLength+frameMinLength+blockedLength <= maxFrameSize { + if payloadLength+blockedLength <= maxFrameSize { payloadFrames = append(payloadFrames, blockedFrame) payloadLength += blockedLength } else { @@ -232,7 +221,6 @@ func (p *packetPacker) composeNextPacket(stopWaitingFrame *frames.StopWaitingFra } } - payloadLength += frameMinLength payloadFrames = append(payloadFrames, frame) hasStreamFrames = true } @@ -252,7 +240,7 @@ func (p *packetPacker) composeNextPacket(stopWaitingFrame *frames.StopWaitingFra // Empty returns true if no frames are queued func (p *packetPacker) Empty() bool { - return p.streamFrameQueue.Front() == nil + return p.streamFrameQueue.ByteLen() == 0 } func (p *packetPacker) StreamFrameQueueByteLen() protocol.ByteCount { diff --git a/stream_frame_queue.go b/stream_frame_queue.go index 9998f989..18f58e8b 100644 --- a/stream_frame_queue.go +++ b/stream_frame_queue.go @@ -11,7 +11,7 @@ import ( type streamFrameQueue struct { prioFrames []*frames.StreamFrame frames []*frames.StreamFrame - mutex sync.Mutex + mutex sync.RWMutex } // Push adds a new StreamFrame to the queue @@ -19,6 +19,8 @@ func (q *streamFrameQueue) Push(frame *frames.StreamFrame, prio bool) { q.mutex.Lock() defer q.mutex.Unlock() + frame.DataLenPresent = true + if prio { q.prioFrames = append(q.prioFrames, frame) } else { @@ -28,16 +30,16 @@ func (q *streamFrameQueue) Push(frame *frames.StreamFrame, prio bool) { // Len returns the total number of queued StreamFrames func (q *streamFrameQueue) Len() int { - q.mutex.Lock() - defer q.mutex.Unlock() + q.mutex.RLock() + defer q.mutex.RUnlock() return len(q.prioFrames) + len(q.frames) } // ByteLen returns the total number of bytes queued func (q *streamFrameQueue) ByteLen() protocol.ByteCount { - q.mutex.Lock() - defer q.mutex.Unlock() + q.mutex.RLock() + defer q.mutex.RUnlock() // TODO: improve performance // This is a very unperformant implementation. However, the obvious solution of keeping track of the length on Push() and Pop() doesn't work, since the front frame can be split by the PacketPacker @@ -53,33 +55,68 @@ func (q *streamFrameQueue) ByteLen() protocol.ByteCount { } // Pop returns the next element and deletes it from the queue -func (q *streamFrameQueue) Pop() *frames.StreamFrame { +func (q *streamFrameQueue) Pop(maxLength protocol.ByteCount) *frames.StreamFrame { q.mutex.Lock() defer q.mutex.Unlock() - if len(q.prioFrames) > 0 { - frame := q.prioFrames[0] + frame, isPrioFrame := q.front() + if frame == nil { + return nil + } + + // Does the frame fit into the remaining space? + frameMinLength, _ := frame.MinLength() // StreamFrame.MinLength *never* returns an error + if frameMinLength > maxLength { + return nil + } + + splitFrame := q.maybeSplitOffFrame(frame, maxLength) + + if splitFrame != nil { // StreamFrame was split + return splitFrame + } + + // StreamFrame was not split. Remove it from the appropriate queue + if isPrioFrame { q.prioFrames = q.prioFrames[1:] - return frame - } - if len(q.frames) > 0 { - frame := q.frames[0] + } else { q.frames = q.frames[1:] - return frame } - return nil + + return frame } -// Front returns the next element without modifying the queue -func (q *streamFrameQueue) Front() *frames.StreamFrame { - q.mutex.Lock() - defer q.mutex.Unlock() - +// front returns the next element without modifying the queue +// has to be called from a function that has already acquired the mutex +func (q *streamFrameQueue) front() (*frames.StreamFrame, bool) { if len(q.prioFrames) > 0 { - return q.prioFrames[0] + return q.prioFrames[0], true } if len(q.frames) > 0 { - return q.frames[0] + return q.frames[0], false + } + return nil, false +} + +// maybeSplitOffFrame removes the first n bytes and returns them as a separate frame. If n >= len(n), nil is returned and nothing is modified. +// has to be called from a function that has already acquired the mutex +func (q *streamFrameQueue) maybeSplitOffFrame(frame *frames.StreamFrame, n protocol.ByteCount) *frames.StreamFrame { + minLength, _ := frame.MinLength() // StreamFrame.MinLength *never* errors + if n >= minLength-1+protocol.ByteCount(len(frame.Data)) { + return nil + } + n -= minLength - 1 + + defer func() { + frame.Data = frame.Data[n:] + frame.Offset += n + }() + + return &frames.StreamFrame{ + FinBit: false, + StreamID: frame.StreamID, + Offset: frame.Offset, + Data: frame.Data[:n], + DataLenPresent: frame.DataLenPresent, } - return nil } diff --git a/stream_frame_queue_test.go b/stream_frame_queue_test.go index 5c6f9dd0..17151cf1 100644 --- a/stream_frame_queue_test.go +++ b/stream_frame_queue_test.go @@ -24,7 +24,7 @@ var _ = Describe("streamFrameQueue", func() { } frame1 = &frames.StreamFrame{ StreamID: 10, - Data: []byte{0xCA, 0xFE, 0x13}, + Data: []byte{0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE, 0x13, 0x37}, } frame2 = &frames.StreamFrame{ StreamID: 11, @@ -51,19 +51,19 @@ var _ = Describe("streamFrameQueue", func() { queue.Push(frame1, false) queue.Push(frame2, false) Expect(queue.Len()).To(Equal(4)) - queue.Pop() + queue.Pop(1000) Expect(queue.Len()).To(Equal(3)) - queue.Pop() - queue.Pop() - queue.Pop() + queue.Pop(1000) + queue.Pop(1000) + queue.Pop(1000) Expect(queue.Len()).To(Equal(0)) }) - It("does not change the length when using Front()", func() { + It("does not change the length when using front()", func() { queue.Push(prioFrame1, true) queue.Push(frame1, false) Expect(queue.Len()).To(Equal(2)) - queue.Front() + queue.front() Expect(queue.Len()).To(Equal(2)) }) }) @@ -78,48 +78,45 @@ var _ = Describe("streamFrameQueue", func() { Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(2))) queue.Push(frame1, false) queue.Push(frame2, false) - Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(2 + 3 + 5))) + Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(len(prioFrame1.Data) + len(frame1.Data) + len(frame2.Data)))) }) It("returns the correct byte length when popping", func() { queue.Push(prioFrame1, true) - queue.Push(prioFrame2, true) queue.Push(frame1, false) - queue.Push(frame2, false) - Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(2 + 4 + 3 + 5))) - queue.Pop() - Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(4 + 3 + 5))) - queue.Pop() - queue.Pop() - queue.Pop() + Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(len(prioFrame1.Data) + len(frame1.Data)))) + queue.Pop(1000) + Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(len(frame1.Data)))) + queue.Pop(1000) Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(0))) }) It("does not change the byte length when using Front()", func() { queue.Push(prioFrame1, true) queue.Push(frame1, false) - Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(2 + 3))) - queue.Front() - Expect(queue.ByteLen()).To(Equal(protocol.ByteCount(2 + 3))) + length := protocol.ByteCount(len(prioFrame1.Data) + len(frame1.Data)) + Expect(queue.ByteLen()).To(Equal(length)) + queue.front() + Expect(queue.ByteLen()).To(Equal(length)) }) }) Context("Popping", func() { It("returns nil when popping an empty queue", func() { - Expect(queue.Pop()).To(BeNil()) + Expect(queue.Pop(1000)).To(BeNil()) }) It("deletes elements once they are popped", func() { queue.Push(frame1, false) - Expect(queue.Pop()).To(Equal(frame1)) - Expect(queue.Pop()).To(BeNil()) + Expect(queue.Pop(1000)).To(Equal(frame1)) + Expect(queue.Pop(1000)).To(BeNil()) }) It("returns normal frames if no prio frames are available", func() { queue.Push(frame1, false) queue.Push(frame2, false) - Expect(queue.Pop()).To(Equal(frame1)) - Expect(queue.Pop()).To(Equal(frame2)) + Expect(queue.Pop(1000)).To(Equal(frame1)) + Expect(queue.Pop(1000)).To(Equal(frame2)) }) It("returns prio frames first", func() { @@ -127,22 +124,96 @@ var _ = Describe("streamFrameQueue", func() { queue.Push(frame1, false) queue.Push(frame2, false) queue.Push(prioFrame2, true) - Expect(queue.Pop()).To(Equal(prioFrame1)) - Expect(queue.Pop()).To(Equal(prioFrame2)) - Expect(queue.Pop()).To(Equal(frame1)) + frame := queue.Pop(1000) + Expect(frame).To(Equal(prioFrame1)) + frame = queue.Pop(1000) + Expect(frame).To(Equal(prioFrame2)) + frame = queue.Pop(1000) + Expect(frame).To(Equal(frame1)) + }) + + Context("splitting of frames", func() { + It("splits off nothing", func() { + f := &frames.StreamFrame{ + StreamID: 1, + Data: []byte("bar"), + Offset: 3, + } + Expect(queue.maybeSplitOffFrame(f, 1000)).To(BeNil()) + Expect(f.Offset).To(Equal(protocol.ByteCount(3))) + }) + + It("splits off initial frame", func() { + f := &frames.StreamFrame{ + StreamID: 1, + Data: []byte("foobar"), + Offset: 3, + FinBit: true, + } + minLength, _ := f.MinLength() + previous := queue.maybeSplitOffFrame(f, minLength-1+3) + Expect(previous).ToNot(BeNil()) + Expect(previous.StreamID).To(Equal(protocol.StreamID(1))) + Expect(previous.Data).To(Equal([]byte("foo"))) + Expect(previous.Offset).To(Equal(protocol.ByteCount(3))) + Expect(previous.FinBit).To(BeFalse()) + Expect(f.StreamID).To(Equal(protocol.StreamID(1))) + Expect(f.Data).To(Equal([]byte("bar"))) + Expect(f.Offset).To(Equal(protocol.ByteCount(6))) + Expect(f.FinBit).To(BeTrue()) + }) + + It("splits a frame", func() { + queue.Push(frame1, false) + origlen := len(frame1.Data) + frame := queue.Pop(6) + minLength, _ := frame.MinLength() + Expect(int(minLength) - 1 + len(frame.Data)).To(Equal(6)) + Expect(queue.frames[0].Data).To(HaveLen(origlen - len(frame.Data))) + Expect(queue.frames[0].Offset).To(Equal(protocol.ByteCount(len(frame.Data)))) + }) + + It("only removes a frame from the queue after return all split parts", func() { + queue.Push(frame1, false) + Expect(queue.Len()).To(Equal(1)) + frame := queue.Pop(6) + Expect(frame).ToNot(BeNil()) + Expect(queue.Len()).To(Equal(1)) + frame = queue.Pop(100) + Expect(frame).ToNot(BeNil()) + Expect(queue.Len()).To(BeZero()) + }) + + It("gets the whole data of a frame, when it was split", func() { + length := len(frame1.Data) + origdata := make([]byte, length) + copy(origdata, frame1.Data) + queue.Push(frame1, false) + frame := queue.Pop(6) + nextframe := queue.Pop(1000) + Expect(len(frame.Data) + len(nextframe.Data)).To(Equal(length)) + data := make([]byte, length) + copy(data, frame.Data) + copy(data[len(frame.Data):], nextframe.Data) + Expect(data).To(Equal(origdata)) + }) }) }) Context("Front", func() { It("returns nil for an empty queue", func() { - Expect(queue.Front()).To(BeNil()) + Expect(queue.front()).To(BeNil()) }) It("returns normal frames if no prio frames are available", func() { queue.Push(frame1, false) queue.Push(frame2, false) - Expect(queue.Front()).To(Equal(frame1)) - Expect(queue.Front()).To(Equal(frame1)) + frame, isPrioFrame := queue.front() + Expect(isPrioFrame).To(BeFalse()) + Expect(frame).To(Equal(frame1)) + frame, isPrioFrame = queue.front() + Expect(isPrioFrame).To(BeFalse()) + Expect(frame).To(Equal(frame1)) }) It("returns prio frames first", func() { @@ -150,7 +221,9 @@ var _ = Describe("streamFrameQueue", func() { queue.Push(frame1, false) queue.Push(frame2, false) queue.Push(prioFrame2, true) - Expect(queue.Front()).To(Equal(prioFrame1)) + frame, isPrioFrame := queue.front() + Expect(isPrioFrame).To(BeTrue()) + Expect(frame).To(Equal(prioFrame1)) }) }) })