From bdbf8d3ebec70034698c7ec58dbfe36cbc75a773 Mon Sep 17 00:00:00 2001 From: Lucas Clemente Date: Sat, 9 Jul 2016 14:29:56 +0200 Subject: [PATCH] add BLOCKED frame management to streamFramer ref #211 --- stream_framer.go | 22 +++++++++++ stream_framer_test.go | 90 ++++++++++++++++++++++++++----------------- stream_test.go | 6 +++ 3 files changed, 83 insertions(+), 35 deletions(-) diff --git a/stream_framer.go b/stream_framer.go index da64d9d6..178c0be6 100644 --- a/stream_framer.go +++ b/stream_framer.go @@ -17,6 +17,7 @@ type streamFramer struct { flowControlManager flowcontrol.FlowControlManager retransmissionQueue []*frames.StreamFrame + blockedFrameQueue []*frames.BlockedFrame } func newStreamFramer(streams *map[protocol.StreamID]*stream, streamsMutex *sync.RWMutex, flowControlManager flowcontrol.FlowControlManager) *streamFramer { @@ -95,6 +96,15 @@ func (f *streamFramer) PopStreamFrame(maxLen protocol.ByteCount) (*frames.Stream return f.maybePopNormalFrame(maxLen) } +func (f *streamFramer) PopBlockedFrame() *frames.BlockedFrame { + if len(f.blockedFrameQueue) == 0 { + return nil + } + frame := f.blockedFrameQueue[0] + f.blockedFrameQueue = f.blockedFrameQueue[1:] + return frame +} + func (f *streamFramer) maybePopFrameForRetransmission(maxLen protocol.ByteCount) *frames.StreamFrame { if len(f.retransmissionQueue) == 0 { return nil @@ -159,6 +169,18 @@ func (f *streamFramer) maybePopNormalFrame(maxBytes protocol.ByteCount) (*frames if err := f.flowControlManager.AddBytesSent(s.streamID, protocol.ByteCount(len(data))); err != nil { return nil, err } + + // Finally, check if we are now FC blocked and should queue a BLOCKED frame + individualFcOffset, _ := f.flowControlManager.SendWindowSize(s.streamID) // can never error + if s.writeOffset == individualFcOffset { + // We are now stream-level FC blocked + f.blockedFrameQueue = append(f.blockedFrameQueue, &frames.BlockedFrame{StreamID: s.StreamID()}) + } + if f.flowControlManager.RemainingConnectionWindowSize() == 0 { + // We are now connection-level FC blocked + f.blockedFrameQueue = append(f.blockedFrameQueue, &frames.BlockedFrame{StreamID: 0}) + } + return frame, nil } return nil, nil diff --git a/stream_framer_test.go b/stream_framer_test.go index 8306e183..2c34dc3d 100644 --- a/stream_framer_test.go +++ b/stream_framer_test.go @@ -395,39 +395,59 @@ var _ = Describe("Stream Framer", func() { Expect(frame).To(BeNil()) }) }) + + Context("BLOCKED frames", func() { + BeforeEach(func() { + fcm.remainingConnectionWindowSize = protocol.MaxByteCount + }) + + It("Pop returns nil if no frame is queued", func() { + Expect(framer.PopBlockedFrame()).To(BeNil()) + }) + + It("queues and pops BLOCKED frames for individually blocked streams", func() { + fcm.sendWindowSizes[stream1.StreamID()] = 3 + stream1.dataForWriting = []byte("foo") + _, err := framer.PopStreamFrame(1000) + Expect(err).NotTo(HaveOccurred()) + blockedFrame := framer.PopBlockedFrame() + Expect(blockedFrame).ToNot(BeNil()) + Expect(blockedFrame.StreamID).To(Equal(stream1.StreamID())) + Expect(framer.PopBlockedFrame()).To(BeNil()) + }) + + It("queues and pops BLOCKED frames for connection blocked streams", func() { + fcm.remainingConnectionWindowSize = 3 + fcm.streamsContributing = []protocol.StreamID{stream1.StreamID()} + stream1.dataForWriting = []byte("foo") + _, err := framer.PopStreamFrame(1000) + Expect(err).NotTo(HaveOccurred()) + blockedFrame := framer.PopBlockedFrame() + Expect(blockedFrame).ToNot(BeNil()) + Expect(blockedFrame.StreamID).To(BeZero()) + Expect(framer.PopBlockedFrame()).To(BeNil()) + }) + + It("does not queue BLOCKED frames for non-contributing streams", func() { + fcm.remainingConnectionWindowSize = 3 + stream1.dataForWriting = []byte("foo") + _, err := framer.PopStreamFrame(1000) + Expect(err).NotTo(HaveOccurred()) + Expect(framer.PopBlockedFrame()).To(BeNil()) + }) + + It("does not queue BLOCKED frames twice", func() { + fcm.sendWindowSizes[stream1.StreamID()] = 3 + stream1.dataForWriting = []byte("foobar") + _, err := framer.PopStreamFrame(1000) + Expect(err).NotTo(HaveOccurred()) + frame, err := framer.PopStreamFrame(1000) + Expect(err).NotTo(HaveOccurred()) + Expect(frame).To(BeNil()) + blockedFrame := framer.PopBlockedFrame() + Expect(blockedFrame).ToNot(BeNil()) + Expect(blockedFrame.StreamID).To(Equal(stream1.StreamID())) + Expect(framer.PopBlockedFrame()).To(BeNil()) + }) + }) }) - -// Old stream tests - -// PContext("Blocked streams", func() { -// It("notifies the session when a stream is flow control blocked", func() { -// updated, err := str.flowControlManager.UpdateWindow(str.streamID, 1337) -// Expect(err).ToNot(HaveOccurred()) -// Expect(updated).To(BeTrue()) -// str.flowControlManager.AddBytesSent(str.streamID, 1337) -// str.maybeTriggerBlocked() -// Expect(handler.receivedBlockedCalled).To(BeTrue()) -// Expect(handler.receivedBlockedForStream).To(Equal(str.streamID)) -// }) -// -// It("notifies the session as soon as a stream is reaching the end of the window", func() { -// updated, err := str.flowControlManager.UpdateWindow(str.streamID, 4) -// Expect(err).ToNot(HaveOccurred()) -// Expect(updated).To(BeTrue()) -// str.Write([]byte{0xDE, 0xCA, 0xFB, 0xAD}) -// Expect(handler.receivedBlockedCalled).To(BeTrue()) -// Expect(handler.receivedBlockedForStream).To(Equal(str.streamID)) -// }) -// -// It("notifies the session as soon as a stream is flow control blocked", func() { -// updated, err := str.flowControlManager.UpdateWindow(str.streamID, 2) -// Expect(err).ToNot(HaveOccurred()) -// Expect(updated).To(BeTrue()) -// go func() { -// str.Write([]byte{0xDE, 0xCA, 0xFB, 0xAD}) -// }() -// time.Sleep(time.Millisecond) -// Expect(handler.receivedBlockedCalled).To(BeTrue()) -// Expect(handler.receivedBlockedForStream).To(Equal(str.streamID)) -// }) -// }) diff --git a/stream_test.go b/stream_test.go index bd47226d..fdcaef62 100644 --- a/stream_test.go +++ b/stream_test.go @@ -75,6 +75,12 @@ func (m *mockFlowControlHandler) UpdateHighestReceived(streamID protocol.StreamI func (m *mockFlowControlHandler) AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error { m.bytesSent += n + for _, s := range m.streamsContributing { + if s == streamID { + m.remainingConnectionWindowSize -= n + return nil + } + } return nil }