From d9a3980efc541fb7879ca4284bd5f18ed0c6c5af Mon Sep 17 00:00:00 2001 From: Lucas Clemente Date: Mon, 25 Jul 2016 17:27:39 +0200 Subject: [PATCH] remove streamFramer.HasData() ref #217 --- session.go | 152 ++++++++++++++++++++---------------------- stream_framer.go | 22 ------ stream_framer_test.go | 32 +-------- 3 files changed, 76 insertions(+), 130 deletions(-) diff --git a/session.go b/session.go index 17c1214f..55720f8b 100644 --- a/session.go +++ b/session.go @@ -447,7 +447,6 @@ func (s *Session) closeStreamWithError(str *stream, err error) { str.RegisterError(err) } -// TODO: try sending more than one packet func (s *Session) maybeSendPacket() error { if !s.smallPacketDelayedOccurranceTime.IsZero() && time.Now().Sub(s.smallPacketDelayedOccurranceTime) > protocol.SmallPacketSendDelay { return s.sendPacket() @@ -503,87 +502,84 @@ func (s *Session) maybeSendPacket() error { func (s *Session) sendPacket() error { s.smallPacketDelayedOccurranceTime = time.Time{} // zero - err := s.sentPacketHandler.CheckForError() - if err != nil { - return err - } - - if !s.sentPacketHandler.CongestionAllowsSending() { - return nil - } - - var controlFrames []frames.Frame - - // check for retransmissions first + // Repeatedly try sending until we don't have any more data, or run out of the congestion window for { - retransmitPacket := s.sentPacketHandler.DequeuePacketForRetransmission() - if retransmitPacket == nil { - break + err := s.sentPacketHandler.CheckForError() + if err != nil { + return err } - utils.Debugf("\tDequeueing retransmission for packet 0x%x", retransmitPacket.PacketNumber) - s.stopWaitingManager.RegisterPacketForRetransmission(retransmitPacket) - // resend the frames that were in the packet - controlFrames = append(controlFrames, retransmitPacket.GetControlFramesForRetransmission()...) - for _, streamFrame := range retransmitPacket.GetStreamFramesForRetransmission() { - s.streamFramer.AddFrameForRetransmission(streamFrame) + + if !s.sentPacketHandler.CongestionAllowsSending() { + return nil } + + var controlFrames []frames.Frame + + // check for retransmissions first + for { + retransmitPacket := s.sentPacketHandler.DequeuePacketForRetransmission() + if retransmitPacket == nil { + break + } + utils.Debugf("\tDequeueing retransmission for packet 0x%x", retransmitPacket.PacketNumber) + s.stopWaitingManager.RegisterPacketForRetransmission(retransmitPacket) + // resend the frames that were in the packet + controlFrames = append(controlFrames, retransmitPacket.GetControlFramesForRetransmission()...) + for _, streamFrame := range retransmitPacket.GetStreamFramesForRetransmission() { + s.streamFramer.AddFrameForRetransmission(streamFrame) + } + } + + windowUpdateFrames, err := s.getWindowUpdateFrames() + if err != nil { + return err + } + + for _, wuf := range windowUpdateFrames { + controlFrames = append(controlFrames, wuf) + } + + ack, err := s.receivedPacketHandler.GetAckFrame(true) + if err != nil { + return err + } + if ack != nil { + controlFrames = append(controlFrames, ack) + } + + stopWaitingFrame := s.stopWaitingManager.GetStopWaitingFrame() + packet, err := s.packer.PackPacket(stopWaitingFrame, controlFrames, s.sentPacketHandler.GetLargestAcked()) + if err != nil { + return err + } + if packet == nil { + return nil + } + + for _, f := range windowUpdateFrames { + s.packer.QueueControlFrameForNextPacket(f) + } + + err = s.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{ + PacketNumber: packet.number, + Frames: packet.frames, + EntropyBit: packet.entropyBit, + Length: protocol.ByteCount(len(packet.raw)), + }) + if err != nil { + return err + } + + s.stopWaitingManager.SentStopWaitingWithPacket(packet.number) + + s.logPacket(packet) + + err = s.conn.write(packet.raw) + if err != nil { + return err + } + } - - windowUpdateFrames, err := s.getWindowUpdateFrames() - if err != nil { - return err - } - - for _, wuf := range windowUpdateFrames { - controlFrames = append(controlFrames, wuf) - } - - ack, err := s.receivedPacketHandler.GetAckFrame(true) - if err != nil { - return err - } - if ack != nil { - controlFrames = append(controlFrames, ack) - } - - stopWaitingFrame := s.stopWaitingManager.GetStopWaitingFrame() - packet, err := s.packer.PackPacket(stopWaitingFrame, controlFrames, s.sentPacketHandler.GetLargestAcked()) - - if err != nil { - return err - } - if packet == nil { - return nil - } - - for _, f := range windowUpdateFrames { - s.packer.QueueControlFrameForNextPacket(f) - } - - err = s.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{ - PacketNumber: packet.number, - Frames: packet.frames, - EntropyBit: packet.entropyBit, - Length: protocol.ByteCount(len(packet.raw)), - }) - if err != nil { - return err - } - - s.stopWaitingManager.SentStopWaitingWithPacket(packet.number) - - s.logPacket(packet) - - err = s.conn.write(packet.raw) - if err != nil { - return err - } - - if s.streamFramer.HasData() { - s.scheduleSending() - } - - return nil } func (s *Session) sendConnectionClose(quicErr *qerr.QuicError) error { diff --git a/stream_framer.go b/stream_framer.go index b6db27df..5b537765 100644 --- a/stream_framer.go +++ b/stream_framer.go @@ -28,28 +28,6 @@ func newStreamFramer(streams *map[protocol.StreamID]*stream, streamsMutex *sync. } } -func (f *streamFramer) HasData() bool { - if len(f.retransmissionQueue) > 0 { - return true - } - f.streamsMutex.RLock() - defer f.streamsMutex.RUnlock() - for _, s := range *f.streams { - if s == nil { - continue - } - // An error should never happen, and needlessly complicates the return values - fcLimit, _ := f.getFCAllowanceForStream(s) - if fcLimit == 0 { - continue - } - if s.lenOfDataForWriting() > 0 || s.shouldSendFin() { - return true - } - } - return false -} - func (f *streamFramer) AddFrameForRetransmission(frame *frames.StreamFrame) { f.retransmissionQueue = append(f.retransmissionQueue, frame) } diff --git a/stream_framer_test.go b/stream_framer_test.go index f7cd212e..d9c9e95a 100644 --- a/stream_framer_test.go +++ b/stream_framer_test.go @@ -59,34 +59,6 @@ var _ = Describe("Stream Framer", func() { Expect(fs[0].DataLenPresent).To(BeTrue()) }) - Context("HasData", func() { - It("has no data initially", func() { - Expect(framer.HasData()).To(BeFalse()) - }) - - It("has data with retransmitted frames", func() { - framer.AddFrameForRetransmission(retransmittedFrame1) - Expect(framer.HasData()).To(BeTrue()) - }) - - It("has data with normal frames", func() { - stream1.dataForWriting = []byte("foobar") - Expect(framer.HasData()).To(BeTrue()) - }) - - It("has data with FIN frames", func() { - stream1.closed = 1 - Expect(framer.HasData()).To(BeTrue()) - }) - - It("has no data when FC blocked", func() { - stream1.dataForWriting = []byte("foobar") - Expect(framer.HasData()).To(BeTrue()) - fcm.sendWindowSizes[stream1.StreamID()] = 0 - Expect(framer.HasData()).To(BeFalse()) - }) - }) - Context("Framer estimated data length", func() { It("returns the correct length for an empty framer", func() { Expect(framer.EstimatedDataLen()).To(BeZero()) @@ -227,10 +199,10 @@ var _ = Describe("Stream Framer", func() { framer.AddFrameForRetransmission(retransmittedFrame2) fs := framer.PopStreamFrames(6) Expect(fs).To(HaveLen(1)) - Expect(framer.HasData()).To(BeTrue()) + Expect(framer.retransmissionQueue).ToNot(BeEmpty()) fs = framer.PopStreamFrames(1000) Expect(fs).To(HaveLen(1)) - Expect(framer.HasData()).To(BeFalse()) + Expect(framer.retransmissionQueue).To(BeEmpty()) }) It("gets the whole data of a frame if it was split", func() {