From 4cfbb2f1346a74f7a8142fd6dfe40b498409e8e6 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 4 Sep 2019 20:24:09 +0700 Subject: [PATCH 1/2] use STREAM frames from the buffer for sending data --- internal/wire/pool.go | 2 +- internal/wire/pool_test.go | 4 +- internal/wire/stream_frame.go | 4 +- send_stream.go | 83 +++++++++++++++++++++-------------- send_stream_test.go | 6 +-- 5 files changed, 57 insertions(+), 42 deletions(-) diff --git a/internal/wire/pool.go b/internal/wire/pool.go index 966a4758..334b79b3 100644 --- a/internal/wire/pool.go +++ b/internal/wire/pool.go @@ -17,7 +17,7 @@ func init() { } } -func getStreamFrame() *StreamFrame { +func GetStreamFrame() *StreamFrame { f := pool.Get().(*StreamFrame) return f } diff --git a/internal/wire/pool_test.go b/internal/wire/pool_test.go index ed9593c9..b55e493b 100644 --- a/internal/wire/pool_test.go +++ b/internal/wire/pool_test.go @@ -7,12 +7,12 @@ import ( var _ = Describe("Pool", func() { It("gets and puts STREAM frames", func() { - f := getStreamFrame() + f := GetStreamFrame() putStreamFrame(f) }) It("panics when putting a STREAM frame with a wrong capacity", func() { - f := getStreamFrame() + f := GetStreamFrame() f.Data = []byte("foobar") Expect(func() { putStreamFrame(f) }).To(Panic()) }) diff --git a/internal/wire/stream_frame.go b/internal/wire/stream_frame.go index 41254166..3b1f76c2 100644 --- a/internal/wire/stream_frame.go +++ b/internal/wire/stream_frame.go @@ -59,7 +59,7 @@ func parseStreamFrame(r *bytes.Reader, version protocol.VersionNumber) (*StreamF if dataLen < protocol.MinStreamFrameBufferSize { frame = &StreamFrame{Data: make([]byte, dataLen)} } else { - frame = getStreamFrame() + frame = GetStreamFrame() // The STREAM frame can't be larger than the StreamFrame we obtained from the buffer, // since those StreamFrames have a buffer length of the maximum packet size. if dataLen > uint64(cap(frame.Data)) { @@ -167,7 +167,7 @@ func (f *StreamFrame) MaybeSplitOffFrame(maxSize protocol.ByteCount, version pro return nil, true } - new := getStreamFrame() + new := GetStreamFrame() new.StreamID = f.StreamID new.Offset = f.Offset new.FinBit = false diff --git a/send_stream.go b/send_stream.go index 97503d23..a2a3ad2a 100644 --- a/send_stream.go +++ b/send_stream.go @@ -153,63 +153,78 @@ func (s *sendStream) Write(p []byte) (int, error) { // maxBytes is the maximum length this frame (including frame header) will have. func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool /* has more data to send */) { s.mutex.Lock() - frame, hasMoreData := s.popStreamFrameImpl(maxBytes) - if frame != nil { + f, hasMoreData := s.popNewOrRetransmittedStreamFrame(maxBytes) + if f != nil { s.numOutstandingFrames++ } s.mutex.Unlock() - return frame, hasMoreData + if f == nil { + return nil, hasMoreData + } + return &ackhandler.Frame{Frame: f, OnLost: s.queueRetransmission, OnAcked: s.frameAcked}, hasMoreData } -func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (*ackhandler.Frame, bool /* has more data to send */) { +func (s *sendStream) popNewOrRetransmittedStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) { if len(s.retransmissionQueue) > 0 { - frame, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes) - if frame != nil || hasMoreRetransmissions { - if frame == nil { + f, hasMoreRetransmissions := s.maybeGetRetransmission(maxBytes) + if f != nil || hasMoreRetransmissions { + if f == nil { return nil, true } // We always claim that we have more data to send. // This might be incorrect, in which case there'll be a spurious call to popStreamFrame in the future. - return &ackhandler.Frame{Frame: frame, OnLost: s.queueRetransmission, OnAcked: s.frameAcked}, true + return f, true } } + f := wire.GetStreamFrame() + f.FinBit = false + f.StreamID = s.streamID + f.Offset = s.writeOffset + f.DataLenPresent = true + f.Data = f.Data[:0] + + hasMoreData := s.popNewStreamFrame(f, maxBytes) + + if len(f.Data) == 0 && !f.FinBit { + f.PutBack() + return nil, hasMoreData + } + return f, hasMoreData +} + +func (s *sendStream) popNewStreamFrame(f *wire.StreamFrame, maxBytes protocol.ByteCount) bool { if s.canceledWrite || s.closeForShutdownErr != nil { - return nil, false + return false } - frame := &wire.StreamFrame{ - StreamID: s.streamID, - Offset: s.writeOffset, - DataLenPresent: true, - } - maxDataLen := frame.MaxDataLen(maxBytes, s.version) + maxDataLen := f.MaxDataLen(maxBytes, s.version) if maxDataLen == 0 { // a STREAM frame must have at least one byte of data - return nil, s.dataForWriting != nil + return s.dataForWriting != nil } - frame.Data, frame.FinBit = s.getDataForWriting(maxDataLen) - if len(frame.Data) == 0 && !frame.FinBit { + s.getDataForWriting(f, maxDataLen) + if len(f.Data) == 0 && !f.FinBit { // this can happen if: // - popStreamFrame is called but there's no data for writing // - there's data for writing, but the stream is stream-level flow control blocked // - there's data for writing, but the stream is connection-level flow control blocked if s.dataForWriting == nil { - return nil, false + return false } if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked { s.sender.queueControlFrame(&wire.StreamDataBlockedFrame{ StreamID: s.streamID, DataLimit: offset, }) - return nil, false + return false } - return nil, true + return true } - if frame.FinBit { + if f.FinBit { s.finSent = true } - return &ackhandler.Frame{Frame: frame, OnLost: s.queueRetransmission, OnAcked: s.frameAcked}, s.dataForWriting != nil + return s.dataForWriting != nil } func (s *sendStream) maybeGetRetransmission(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more retransmissions */) { @@ -229,30 +244,30 @@ func (s *sendStream) hasData() bool { return hasData } -func (s *sendStream) getDataForWriting(maxBytes protocol.ByteCount) ([]byte, bool /* should send FIN */) { +func (s *sendStream) getDataForWriting(f *wire.StreamFrame, maxBytes protocol.ByteCount) { if s.dataForWriting == nil { - return nil, s.finishedWriting && !s.finSent + f.FinBit = s.finishedWriting && !s.finSent + return } maxBytes = utils.MinByteCount(maxBytes, s.flowController.SendWindowSize()) if maxBytes == 0 { - return nil, false + return } - var ret []byte if protocol.ByteCount(len(s.dataForWriting)) > maxBytes { - ret = make([]byte, int(maxBytes)) - copy(ret, s.dataForWriting[:maxBytes]) + f.Data = f.Data[:maxBytes] + copy(f.Data, s.dataForWriting) s.dataForWriting = s.dataForWriting[maxBytes:] } else { - ret = make([]byte, len(s.dataForWriting)) - copy(ret, s.dataForWriting) + f.Data = f.Data[:len(s.dataForWriting)] + copy(f.Data, s.dataForWriting) s.dataForWriting = nil s.signalWrite() } - s.writeOffset += protocol.ByteCount(len(ret)) - s.flowController.AddBytesSent(protocol.ByteCount(len(ret))) - return ret, s.finishedWriting && s.dataForWriting == nil && !s.finSent + s.writeOffset += f.DataLen() + s.flowController.AddBytesSent(f.DataLen()) + f.FinBit = s.finishedWriting && s.dataForWriting == nil && !s.finSent } func (s *sendStream) frameAcked() { diff --git a/send_stream_test.go b/send_stream_test.go index dadec815..3f47b0de 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -408,8 +408,8 @@ var _ = Describe("Send Stream", func() { mockSender.EXPECT().onHasStreamData(streamID) str.Close() frame, hasMoreData := str.popStreamFrame(1000) + Expect(frame).ToNot(BeNil()) f := frame.Frame.(*wire.StreamFrame) - Expect(f).ToNot(BeNil()) Expect(f.Data).To(BeEmpty()) Expect(f.FinBit).To(BeTrue()) Expect(hasMoreData).To(BeFalse()) @@ -423,8 +423,8 @@ var _ = Describe("Send Stream", func() { str.dataForWriting = []byte("foobar") Expect(str.Close()).To(Succeed()) frame, _ := str.popStreamFrame(3 + frameHeaderLen) + Expect(frame).ToNot(BeNil()) f := frame.Frame.(*wire.StreamFrame) - Expect(f).ToNot(BeNil()) Expect(f.Data).To(Equal([]byte("foo"))) Expect(f.FinBit).To(BeFalse()) frame, _ = str.popStreamFrame(100) @@ -444,8 +444,8 @@ var _ = Describe("Send Stream", func() { mockSender.EXPECT().onHasStreamData(streamID) str.Close() frame, _ := str.popStreamFrame(1000) + Expect(frame).ToNot(BeNil()) f := frame.Frame.(*wire.StreamFrame) - Expect(f).ToNot(BeNil()) Expect(f.Data).To(BeEmpty()) Expect(f.FinBit).To(BeTrue()) frame, hasMoreData := str.popStreamFrame(1000) From 4cb8bf3101f39bde0bbe04fc774a03e6749937aa Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 4 Sep 2019 20:36:45 +0700 Subject: [PATCH 2/2] put STREAM frames back into the pool when they are acknowledged --- internal/ackhandler/frame.go | 2 +- internal/ackhandler/sent_packet_handler.go | 2 +- internal/ackhandler/sent_packet_handler_test.go | 7 ++++++- send_stream.go | 4 +++- send_stream_test.go | 8 ++++---- 5 files changed, 15 insertions(+), 8 deletions(-) diff --git a/internal/ackhandler/frame.go b/internal/ackhandler/frame.go index 5731c2bc..aed6038d 100644 --- a/internal/ackhandler/frame.go +++ b/internal/ackhandler/frame.go @@ -5,5 +5,5 @@ import "github.com/lucas-clemente/quic-go/internal/wire" type Frame struct { wire.Frame // nil if the frame has already been acknowledged in another packet OnLost func(wire.Frame) - OnAcked func() + OnAcked func(wire.Frame) } diff --git a/internal/ackhandler/sent_packet_handler.go b/internal/ackhandler/sent_packet_handler.go index 4b481563..f542db32 100644 --- a/internal/ackhandler/sent_packet_handler.go +++ b/internal/ackhandler/sent_packet_handler.go @@ -446,7 +446,7 @@ func (h *sentPacketHandler) onPacketAcked(p *Packet, rcvTime time.Time) error { for _, f := range p.Frames { if f.OnAcked != nil { - f.OnAcked() + f.OnAcked(f.Frame) } } if p.includedInBytesInFlight { diff --git a/internal/ackhandler/sent_packet_handler_test.go b/internal/ackhandler/sent_packet_handler_test.go index af1ee4a8..0859a0a7 100644 --- a/internal/ackhandler/sent_packet_handler_test.go +++ b/internal/ackhandler/sent_packet_handler_test.go @@ -202,9 +202,14 @@ var _ = Describe("SentPacketHandler", func() { It("calls the OnAcked callback", func() { var acked bool + ping := &wire.PingFrame{} handler.SentPacket(ackElicitingPacket(&Packet{ PacketNumber: 13, - Frames: []Frame{{Frame: &wire.PingFrame{}, OnAcked: func() { acked = true }}}, + Frames: []Frame{{Frame: ping, OnAcked: func(f wire.Frame) { + Expect(f).To(Equal(ping)) + acked = true + }, + }}, })) ack := &wire.AckFrame{AckRanges: []wire.AckRange{{Smallest: 13, Largest: 13}}} Expect(handler.ReceivedAck(ack, 1, protocol.Encryption1RTT, time.Now())).To(Succeed()) diff --git a/send_stream.go b/send_stream.go index a2a3ad2a..c2e1aa7d 100644 --- a/send_stream.go +++ b/send_stream.go @@ -270,7 +270,9 @@ func (s *sendStream) getDataForWriting(f *wire.StreamFrame, maxBytes protocol.By f.FinBit = s.finishedWriting && s.dataForWriting == nil && !s.finSent } -func (s *sendStream) frameAcked() { +func (s *sendStream) frameAcked(f wire.Frame) { + f.(*wire.StreamFrame).PutBack() + s.mutex.Lock() s.numOutstandingFrames-- if s.numOutstandingFrames < 0 { diff --git a/send_stream_test.go b/send_stream_test.go index 3f47b0de..e15eb5f3 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -786,7 +786,7 @@ var _ = Describe("Send Stream", func() { // Acknowledge all frames. // We don't expect the stream to be completed, since we still need to send the FIN. for _, f := range frames { - f.OnAcked() + f.OnAcked(f.Frame) } // Now close the stream and acknowledge the FIN. @@ -794,7 +794,7 @@ var _ = Describe("Send Stream", func() { frame, _ := str.popStreamFrame(protocol.MaxByteCount) Expect(frame).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) - frame.OnAcked() + frame.OnAcked(frame.Frame) }) It("doesn't say it's completed when there are frames waiting to be retransmitted", func() { @@ -824,7 +824,7 @@ var _ = Describe("Send Stream", func() { // lose the first frame, acknowledge all others for _, f := range frames[1:] { - f.OnAcked() + f.OnAcked(f.Frame) } frames[0].OnLost(frames[0].Frame) @@ -832,7 +832,7 @@ var _ = Describe("Send Stream", func() { ret, _ := str.popStreamFrame(protocol.MaxByteCount) Expect(ret).ToNot(BeNil()) mockSender.EXPECT().onStreamCompleted(streamID) - ret.OnAcked() + ret.OnAcked(ret.Frame) }) }) })