From ac59e284ddf0bba675733c9e749c51e0bcfe4069 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 21 Aug 2018 14:41:47 +0700 Subject: [PATCH] remove the Head method from the streamFrameSorter We now store the STREAM frame that is currently being read in the receiveStream, and pop it from the streamFrameSorter directly. --- receive_stream.go | 39 +++++++++++++++++++------------- stream_frame_sorter.go | 17 +++++--------- stream_frame_sorter_test.go | 45 ++++++++++++++----------------------- 3 files changed, 46 insertions(+), 55 deletions(-) diff --git a/receive_stream.go b/receive_stream.go index 6534fae5a..f0ee4355c 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -8,7 +8,6 @@ import ( "github.com/lucas-clemente/quic-go/internal/flowcontrol" "github.com/lucas-clemente/quic-go/internal/protocol" - "github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/wire" ) @@ -28,9 +27,11 @@ type receiveStream struct { sender streamSender - frameQueue *streamFrameSorter + frameQueue *streamFrameSorter + readOffset protocol.ByteCount + + currentFrame *wire.StreamFrame readPosInFrame int - readOffset protocol.ByteCount closeForShutdownErr error cancelReadErr error @@ -99,8 +100,10 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err bytesRead := 0 for bytesRead < len(p) { - frame := s.frameQueue.Head() - if frame == nil && bytesRead > 0 { + if s.currentFrame == nil || s.readPosInFrame >= int(s.currentFrame.DataLen()) { + s.dequeueNextFrame() + } + if s.currentFrame == nil && bytesRead > 0 { return false, bytesRead, s.closeForShutdownErr } @@ -121,8 +124,7 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err return false, bytesRead, errDeadline } - if frame != nil { - s.readPosInFrame = int(s.readOffset - frame.Offset) + if s.currentFrame != nil { break } @@ -136,20 +138,21 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err } } s.mutex.Lock() - frame = s.frameQueue.Head() + if s.currentFrame == nil { + s.dequeueNextFrame() + } } if bytesRead > len(p) { return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p)) } - if s.readPosInFrame > int(frame.DataLen()) { - return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, frame.DataLen()) + if s.readPosInFrame > int(s.currentFrame.DataLen()) { + return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, s.currentFrame.DataLen()) } s.mutex.Unlock() - copy(p[bytesRead:], frame.Data[s.readPosInFrame:]) - m := utils.Min(len(p)-bytesRead, int(frame.DataLen())-s.readPosInFrame) + m := copy(p[bytesRead:], s.currentFrame.Data[s.readPosInFrame:]) s.readPosInFrame += m bytesRead += m s.readOffset += protocol.ByteCount(m) @@ -162,10 +165,9 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err // increase the flow control window, if necessary s.flowController.MaybeQueueWindowUpdate() - if s.readPosInFrame >= int(frame.DataLen()) { - s.frameQueue.Pop() - s.finRead = frame.FinBit - if frame.FinBit { + if s.readPosInFrame >= int(s.currentFrame.DataLen()) { + if s.currentFrame.FinBit { + s.finRead = true return true, bytesRead, io.EOF } } @@ -173,6 +175,11 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err return false, bytesRead, nil } +func (s *receiveStream) dequeueNextFrame() { + s.currentFrame = s.frameQueue.Pop() + s.readPosInFrame = 0 +} + func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) error { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/stream_frame_sorter.go b/stream_frame_sorter.go index c9a168ae9..aeb110721 100644 --- a/stream_frame_sorter.go +++ b/stream_frame_sorter.go @@ -141,17 +141,12 @@ func (s *streamFrameSorter) Push(frame *wire.StreamFrame) error { return nil } -func (s *streamFrameSorter) Pop() { - if frame := s.Head(); frame != nil { - s.readPosition += frame.DataLen() - delete(s.queuedFrames, frame.Offset) - } -} - -func (s *streamFrameSorter) Head() *wire.StreamFrame { +func (s *streamFrameSorter) Pop() *wire.StreamFrame { frame, ok := s.queuedFrames[s.readPosition] - if ok { - return frame + if !ok { + return nil } - return nil + s.readPosition += frame.DataLen() + delete(s.queuedFrames, frame.Offset) + return frame } diff --git a/stream_frame_sorter_test.go b/stream_frame_sorter_test.go index 22e7eb86a..57a5ee059 100644 --- a/stream_frame_sorter_test.go +++ b/stream_frame_sorter_test.go @@ -27,7 +27,7 @@ var _ = Describe("STREAM frame sorter", func() { }) It("head returns nil when empty", func() { - Expect(s.Head()).To(BeNil()) + Expect(s.Pop()).To(BeNil()) }) Context("Push", func() { @@ -36,11 +36,9 @@ var _ = Describe("STREAM frame sorter", func() { Offset: 0, Data: []byte("foobar"), } - err := s.Push(f) - Expect(err).ToNot(HaveOccurred()) - Expect(s.Head()).To(Equal(f)) - s.Pop() - Expect(s.Head()).To(BeNil()) + Expect(s.Push(f)).To(Succeed()) + Expect(s.Pop()).To(Equal(f)) + Expect(s.Pop()).To(BeNil()) }) It("inserts and pops two consecutive frame", func() { @@ -52,22 +50,17 @@ var _ = Describe("STREAM frame sorter", func() { Offset: 6, Data: []byte("foobar2"), } - err := s.Push(f1) - Expect(err).ToNot(HaveOccurred()) - err = s.Push(f2) - Expect(err).ToNot(HaveOccurred()) - Expect(s.Head()).To(Equal(f1)) - s.Pop() - Expect(s.Head()).To(Equal(f2)) - s.Pop() - Expect(s.Head()).To(BeNil()) + Expect(s.Push(f1)).To(Succeed()) + Expect(s.Push(f2)).To(Succeed()) + Expect(s.Pop()).To(Equal(f1)) + Expect(s.Pop()).To(Equal(f2)) + Expect(s.Pop()).To(BeNil()) }) It("ignores empty frames", func() { f := &wire.StreamFrame{} - err := s.Push(f) - Expect(err).ToNot(HaveOccurred()) - Expect(s.Head()).To(BeNil()) + Expect(s.Push(f)).To(Succeed()) + Expect(s.Pop()).To(BeNil()) }) Context("FinBit handling", func() { @@ -76,9 +69,8 @@ var _ = Describe("STREAM frame sorter", func() { Offset: 0, FinBit: true, } - err := s.Push(f) - Expect(err).ToNot(HaveOccurred()) - Expect(s.Head()).To(Equal(f)) + Expect(s.Push(f)).To(Succeed()) + Expect(s.Pop()).To(Equal(f)) }) It("sets the FinBit if a stream is closed after receiving some data", func() { @@ -86,17 +78,14 @@ var _ = Describe("STREAM frame sorter", func() { Offset: 0, Data: []byte("foobar"), } - err := s.Push(f1) - Expect(err).ToNot(HaveOccurred()) + Expect(s.Push(f1)).To(Succeed()) f2 := &wire.StreamFrame{ Offset: 6, FinBit: true, } - err = s.Push(f2) - Expect(err).ToNot(HaveOccurred()) - Expect(s.Head()).To(Equal(f1)) - s.Pop() - Expect(s.Head()).To(Equal(f2)) + Expect(s.Push(f2)).To(Succeed()) + Expect(s.Pop()).To(Equal(f1)) + Expect(s.Pop()).To(Equal(f2)) }) })