From 4cfbb2f1346a74f7a8142fd6dfe40b498409e8e6 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 4 Sep 2019 20:24:09 +0700 Subject: [PATCH] use STREAM frames from the buffer for sending data --- internal/wire/pool.go | 2 +- internal/wire/pool_test.go | 4 +- internal/wire/stream_frame.go | 4 +- send_stream.go | 83 +++++++++++++++++++++-------------- send_stream_test.go | 6 +-- 5 files changed, 57 insertions(+), 42 deletions(-) diff --git a/internal/wire/pool.go b/internal/wire/pool.go index 966a4758..334b79b3 100644 --- a/internal/wire/pool.go +++ b/internal/wire/pool.go @@ -17,7 +17,7 @@ func init() { } } -func getStreamFrame() *StreamFrame { +func GetStreamFrame() *StreamFrame { f := pool.Get().(*StreamFrame) return f } diff --git a/internal/wire/pool_test.go b/internal/wire/pool_test.go index ed9593c9..b55e493b 100644 --- a/internal/wire/pool_test.go +++ b/internal/wire/pool_test.go @@ -7,12 +7,12 @@ import ( var _ = Describe("Pool", func() { It("gets and puts STREAM frames", func() { - f := getStreamFrame() + f := GetStreamFrame() putStreamFrame(f) }) It("panics when putting a STREAM frame with a wrong capacity", func() { - f := getStreamFrame() + f := GetStreamFrame() f.Data = []byte("foobar") Expect(func() { putStreamFrame(f) }).To(Panic()) }) diff --git a/internal/wire/stream_frame.go b/internal/wire/stream_frame.go index 41254166..3b1f76c2 100644 --- a/internal/wire/stream_frame.go +++ b/internal/wire/stream_frame.go @@ -59,7 +59,7 @@ func parseStreamFrame(r *bytes.Reader, version protocol.VersionNumber) (*StreamF if dataLen < protocol.MinStreamFrameBufferSize { frame = &StreamFrame{Data: make([]byte, dataLen)} } else { - frame = getStreamFrame() + frame = GetStreamFrame() // The STREAM frame can't be larger than the StreamFrame we obtained from the buffer, // since those StreamFrames have a buffer length of the maximum packet size. if dataLen > uint64(cap(frame.Data)) { @@ -167,7 +167,7 @@ func (f *StreamFrame) MaybeSplitOffFrame(maxSize protocol.ByteCount, version pro return nil, true } - new := getStreamFrame() + new := GetStreamFrame() new.StreamID = f.StreamID new.Offset = f.Offset new.FinBit = false diff --git a/send_stream.go b/send_stream.go index 97503d23..a2a3ad2a 100644 --- a/send_stream.go +++ b/send_stream.go @@ -153,63 +153,78 @@ func (s *sendStream) Write(p []byte) (int, error) { // maxBytes is the maximum length this frame (including frame header) will have. func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool /* has more data to send */) { s.mutex.Lock() - frame, hasMoreData := s.popStreamFrameImpl(maxBytes) - if frame != nil { + f, hasMoreData := s.popNewOrRetransmittedStreamFrame(maxBytes) + if f != nil { s.numOutstandingFrames++ } s.mutex.Unlock() - return frame, hasMoreData + if f == nil { + return nil, hasMoreData + } + return &ackhandler.Frame{Frame: f, OnLost: s.queueRetransmission, OnAcked: s.frameAcked}, hasMoreData } -func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool /* has more data to send */) { +func (s *sendStream) popNewOrRetransmittedStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) { if len(s.retransmissionQueue) > 0 { - frame, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes) - if frame != nil || hasMoreRetransmissions { - if frame == nil { + f, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes) + if f != nil || hasMoreRetransmissions { + if f == nil { return nil, true } // We always claim that we have more data to send. // This might be incorrect, in which case there'll be a spurious call to popStreamFrame in the future. - return &ackhandler.Frame{Frame: frame, OnLost: s.queueRetransmission, OnAcked: s.frameAcked}, true + return f, true } } + f := wire.GetStreamFrame() + f.FinBit = false + f.StreamID = s.streamID + f.Offset = s.writeOffset + f.DataLenPresent = true + f.Data = f.Data[:0] + + hasMoreData := s.popNewStreamFrame(f, maxBytes) + + if len(f.Data) == 0 && !f.FinBit { + f.PutBack() + return nil, hasMoreData + } + return f, hasMoreData +} + +func (s *sendStream) popNewStreamFrame(f *wire.StreamFrame, maxBytes protocol.ByteCount) bool { if s.canceledWrite || s.closeForShutdownErr != nil { - return nil, false + return false } - frame := &wire.StreamFrame{ - StreamID: s.streamID, - Offset: s.writeOffset, - DataLenPresent: true, - } - maxDataLen := frame.MaxDataLen(maxBytes, s.version) + maxDataLen := f.MaxDataLen(maxBytes, s.version) if maxDataLen == 0 { // a STREAM frame must have at least one byte of data - return nil, s.dataForWriting != nil + return s.dataForWriting != nil } - frame.Data, frame.FinBit = s.getDataForWriting(maxDataLen) - if len(frame.Data) == 0 && !frame.FinBit { + s.getDataForWriting(f, maxDataLen) + if len(f.Data) == 0 && !f.FinBit { // this can happen if: // - popStreamFrame is called but there's no data for writing // - there's data for writing, but the stream is stream-level flow control blocked // - there's data for writing, but the stream is connection-level flow control blocked if s.dataForWriting == nil { - return nil, false + return false } if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked { s.sender.queueControlFrame(&wire.StreamDataBlockedFrame{ StreamID: s.streamID, DataLimit: offset, }) - return nil, false + return false } - return nil, true + return true } - if frame.FinBit { + if f.FinBit { s.finSent = true } - return &ackhandler.Frame{Frame: frame, OnLost: s.queueRetransmission, OnAcked: s.frameAcked}, s.dataForWriting != nil + return s.dataForWriting != nil } func (s *sendStream) maybeGetRetransmission(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more retransmissions */) { @@ -229,30 +244,30 @@ func (s *sendStream) hasData() bool { return hasData } -func (s *sendStream) getDataForWriting(maxBytes protocol.ByteCount) ([]byte, bool /* should send FIN */) { +func (s *sendStream) getDataForWriting(f *wire.StreamFrame, maxBytes protocol.ByteCount) { if s.dataForWriting == nil { - return nil, s.finishedWriting && !s.finSent + f.FinBit = s.finishedWriting && !s.finSent + return } maxBytes = utils.MinByteCount(maxBytes, s.flowController.SendWindowSize()) if maxBytes == 0 { - return nil, false + return } - var ret []byte if protocol.ByteCount(len(s.dataForWriting)) > maxBytes { - ret = make([]byte, int(maxBytes)) - copy(ret, s.dataForWriting[:maxBytes]) + f.Data = f.Data[:maxBytes] + copy(f.Data, s.dataForWriting) s.dataForWriting = s.dataForWriting[maxBytes:] } else { - ret = make([]byte, len(s.dataForWriting)) - copy(ret, s.dataForWriting) + f.Data = f.Data[:len(s.dataForWriting)] + copy(f.Data, s.dataForWriting) s.dataForWriting = nil s.signalWrite() } - s.writeOffset += protocol.ByteCount(len(ret)) - s.flowController.AddBytesSent(protocol.ByteCount(len(ret))) - return ret, s.finishedWriting && s.dataForWriting == nil && !s.finSent + s.writeOffset += f.DataLen() + s.flowController.AddBytesSent(f.DataLen()) + f.FinBit = s.finishedWriting && s.dataForWriting == nil && !s.finSent } func (s *sendStream) frameAcked() { diff --git a/send_stream_test.go b/send_stream_test.go index dadec815..3f47b0de 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -408,8 +408,8 @@ var _ = Describe("Send Stream", func() { mockSender.EXPECT().onHasStreamData(streamID) str.Close() frame, hasMoreData := str.popStreamFrame(1000) + Expect(frame).ToNot(BeNil()) f := frame.Frame.(*wire.StreamFrame) - Expect(f).ToNot(BeNil()) Expect(f.Data).To(BeEmpty()) Expect(f.FinBit).To(BeTrue()) Expect(hasMoreData).To(BeFalse()) @@ -423,8 +423,8 @@ var _ = Describe("Send Stream", func() { str.dataForWriting = []byte("foobar") Expect(str.Close()).To(Succeed()) frame, _ := str.popStreamFrame(3 + frameHeaderLen) + Expect(frame).ToNot(BeNil()) f := frame.Frame.(*wire.StreamFrame) - Expect(f).ToNot(BeNil()) Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.FinBit).To(BeFalse()) frame, _ = str.popStreamFrame(100) @@ -444,8 +444,8 @@ var _ = Describe("Send Stream", func() { mockSender.EXPECT().onHasStreamData(streamID) str.Close() frame, _ := str.popStreamFrame(1000) + Expect(frame).ToNot(BeNil()) f := frame.Frame.(*wire.StreamFrame) - Expect(f).ToNot(BeNil()) Expect(f.Data).To(BeEmpty()) Expect(f.FinBit).To(BeTrue()) frame, hasMoreData := str.popStreamFrame(1000)