diff --git a/session.go b/session.go index 40a8b1e9..9ac0c782 100644 --- a/session.go +++ b/session.go @@ -50,6 +50,7 @@ type Session struct { receivedPacketHandler ackhandler.ReceivedPacketHandler stopWaitingManager ackhandler.StopWaitingManager windowUpdateManager *windowUpdateManager + blockedFrameQueue []*frames.BlockedFrame unpacker *packetUnpacker packer *packetPacker @@ -454,6 +455,11 @@ func (s *Session) sendPacket() error { controlFrames = append(controlFrames, wuf) } + for _, bf := range s.blockedFrameQueue { + controlFrames = append(controlFrames, bf) + } + s.blockedFrameQueue = s.blockedFrameQueue[:0] + ack, err := s.receivedPacketHandler.GetAckFrame(true) if err != nil { return err @@ -541,6 +547,10 @@ func (s *Session) updateReceiveFlowControlWindow(streamID protocol.StreamID, byt return nil } +func (s *Session) streamBlocked(streamID protocol.StreamID) { + s.blockedFrameQueue = append(s.blockedFrameQueue, &frames.BlockedFrame{StreamID: streamID}) +} + // OpenStream creates a new stream open for reading and writing func (s *Session) OpenStream(id protocol.StreamID) (utils.Stream, error) { s.streamsMutex.Lock() diff --git a/session_test.go b/session_test.go index b567945d..d39f4064 100644 --- a/session_test.go +++ b/session_test.go @@ -410,6 +410,30 @@ var _ = Describe("Session", func() { Expect(conn.written).To(HaveLen(int(protocol.WindowUpdateNumRepitions))) // no packet was sent }) + It("sends queued Blocked frames", func() { + bf1 := frames.BlockedFrame{StreamID: 0x1337} + bf2 := frames.BlockedFrame{StreamID: 0xDECAFBAD} + session.blockedFrameQueue = append(session.blockedFrameQueue, &bf1) + session.blockedFrameQueue = append(session.blockedFrameQueue, &bf2) + err := session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(conn.written).To(HaveLen(1)) + Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x05, 0x37, 0x13, 0, 0}))) + Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x05, 0xAD, 0xFB, 0xCA, 0xDE}))) + }) + + It("only sends every queued Blocked frame once", func() { + bf := frames.BlockedFrame{StreamID: 0x1337} + session.blockedFrameQueue = append(session.blockedFrameQueue, &bf) + err := session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + session.queueStreamFrame(&frames.StreamFrame{StreamID: 5, Data: []byte("foobar")}) // queue something, so that a packet can actually be sent + err = session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(conn.written).To(HaveLen(2)) + Expect(conn.written[1]).ToNot(ContainSubstring(string([]byte{0x05, 0x37, 0x13, 0, 0}))) + }) + It("sends public reset", func() { err := session.sendPublicReset(1) Expect(err).NotTo(HaveOccurred()) diff --git a/stream.go b/stream.go index 24fe179c..5459251c 100644 --- a/stream.go +++ b/stream.go @@ -15,6 +15,7 @@ import ( type streamHandler interface { queueStreamFrame(*frames.StreamFrame) error updateReceiveFlowControlWindow(streamID protocol.StreamID, byteOffset protocol.ByteCount) error + streamBlocked(streamID protocol.StreamID) } var errFlowControlViolation = qerr.FlowControlReceivedTooMuchData @@ -224,6 +225,14 @@ func (s *stream) maybeTriggerWindowUpdate() { } } +func (s *stream) maybeTriggerBlocked() { + doIt := s.flowController.MaybeTriggerBlocked() + + if doIt { + s.session.streamBlocked(s.streamID) + } +} + // RegisterError is called by session to indicate that an error occurred and the // stream should be closed. func (s *stream) RegisterError(err error) { diff --git a/stream_test.go b/stream_test.go index d34f5832..2258c651 100644 --- a/stream_test.go +++ b/stream_test.go @@ -16,6 +16,9 @@ import ( type mockStreamHandler struct { frames []frames.Frame + receivedBlockedCalled bool + receivedBlockedForStream protocol.StreamID + receiveFlowControlWindowCalled bool receiveFlowControlWindowCalledForStream protocol.StreamID } @@ -25,6 +28,11 @@ func (m *mockStreamHandler) queueStreamFrame(f *frames.StreamFrame) error { return nil } +func (m *mockStreamHandler) streamBlocked(streamID protocol.StreamID) { + m.receivedBlockedCalled = true + m.receivedBlockedForStream = streamID +} + func (m *mockStreamHandler) updateReceiveFlowControlWindow(streamID protocol.StreamID, byteOffset protocol.ByteCount) error { m.receiveFlowControlWindowCalled = true m.receiveFlowControlWindowCalledForStream = streamID @@ -358,6 +366,16 @@ var _ = Describe("Stream", func() { }) }) + Context("Blocked streams", func() { + It("notifies the session when a stream is flow control blocked", func() { + str.flowController.sendFlowControlWindow = 1337 + str.flowController.bytesSent = 1337 + str.maybeTriggerBlocked() + Expect(handler.receivedBlockedCalled).To(BeTrue()) + Expect(handler.receivedBlockedForStream).To(Equal(str.streamID)) + }) + }) + Context("flow control window updating, for receiving", func() { var receiveFlowControlWindow protocol.ByteCount = 1337 var receiveWindowUpdateThreshold protocol.ByteCount = 1000