send Blocked frames for flow control blocked streams

fixes #100
This commit is contained in:
Marten Seemann
2016-05-17 19:00:11 +07:00
parent 078b563526
commit 42f3091e1b
4 changed files with 61 additions and 0 deletions

View File

@@ -50,6 +50,7 @@ type Session struct {
receivedPacketHandler ackhandler.ReceivedPacketHandler
stopWaitingManager ackhandler.StopWaitingManager
windowUpdateManager *windowUpdateManager
blockedFrameQueue []*frames.BlockedFrame
unpacker *packetUnpacker
packer *packetPacker
@@ -454,6 +455,11 @@ func (s *Session) sendPacket() error {
controlFrames = append(controlFrames, wuf)
}
for _, bf := range s.blockedFrameQueue {
controlFrames = append(controlFrames, bf)
}
s.blockedFrameQueue = s.blockedFrameQueue[:0]
ack, err := s.receivedPacketHandler.GetAckFrame(true)
if err != nil {
return err
@@ -541,6 +547,10 @@ func (s *Session) updateReceiveFlowControlWindow(streamID protocol.StreamID, byt
return nil
}
func (s *Session) streamBlocked(streamID protocol.StreamID) {
s.blockedFrameQueue = append(s.blockedFrameQueue, &frames.BlockedFrame{StreamID: streamID})
}
// OpenStream creates a new stream open for reading and writing
func (s *Session) OpenStream(id protocol.StreamID) (utils.Stream, error) {
s.streamsMutex.Lock()

View File

@@ -410,6 +410,30 @@ var _ = Describe("Session", func() {
Expect(conn.written).To(HaveLen(int(protocol.WindowUpdateNumRepitions))) // no packet was sent
})
It("sends queued Blocked frames", func() {
bf1 := frames.BlockedFrame{StreamID: 0x1337}
bf2 := frames.BlockedFrame{StreamID: 0xDECAFBAD}
session.blockedFrameQueue = append(session.blockedFrameQueue, &bf1)
session.blockedFrameQueue = append(session.blockedFrameQueue, &bf2)
err := session.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(conn.written).To(HaveLen(1))
Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x05, 0x37, 0x13, 0, 0})))
Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x05, 0xAD, 0xFB, 0xCA, 0xDE})))
})
It("only sends every queued Blocked frame once", func() {
bf := frames.BlockedFrame{StreamID: 0x1337}
session.blockedFrameQueue = append(session.blockedFrameQueue, &bf)
err := session.sendPacket()
Expect(err).NotTo(HaveOccurred())
session.queueStreamFrame(&frames.StreamFrame{StreamID: 5, Data: []byte("foobar")}) // queue something, so that a packet can actually be sent
err = session.sendPacket()
Expect(err).NotTo(HaveOccurred())
Expect(conn.written).To(HaveLen(2))
Expect(conn.written[1]).ToNot(ContainSubstring(string([]byte{0x05, 0x37, 0x13, 0, 0})))
})
It("sends public reset", func() {
err := session.sendPublicReset(1)
Expect(err).NotTo(HaveOccurred())

View File

@@ -15,6 +15,7 @@ import (
type streamHandler interface {
queueStreamFrame(*frames.StreamFrame) error
updateReceiveFlowControlWindow(streamID protocol.StreamID, byteOffset protocol.ByteCount) error
streamBlocked(streamID protocol.StreamID)
}
var errFlowControlViolation = qerr.FlowControlReceivedTooMuchData
@@ -224,6 +225,14 @@ func (s *stream) maybeTriggerWindowUpdate() {
}
}
func (s *stream) maybeTriggerBlocked() {
doIt := s.flowController.MaybeTriggerBlocked()
if doIt {
s.session.streamBlocked(s.streamID)
}
}
// RegisterError is called by session to indicate that an error occurred and the
// stream should be closed.
func (s *stream) RegisterError(err error) {

View File

@@ -16,6 +16,9 @@ import (
type mockStreamHandler struct {
frames []frames.Frame
receivedBlockedCalled bool
receivedBlockedForStream protocol.StreamID
receiveFlowControlWindowCalled bool
receiveFlowControlWindowCalledForStream protocol.StreamID
}
@@ -25,6 +28,11 @@ func (m *mockStreamHandler) queueStreamFrame(f *frames.StreamFrame) error {
return nil
}
func (m *mockStreamHandler) streamBlocked(streamID protocol.StreamID) {
m.receivedBlockedCalled = true
m.receivedBlockedForStream = streamID
}
func (m *mockStreamHandler) updateReceiveFlowControlWindow(streamID protocol.StreamID, byteOffset protocol.ByteCount) error {
m.receiveFlowControlWindowCalled = true
m.receiveFlowControlWindowCalledForStream = streamID
@@ -358,6 +366,16 @@ var _ = Describe("Stream", func() {
})
})
Context("Blocked streams", func() {
It("notifies the session when a stream is flow control blocked", func() {
str.flowController.sendFlowControlWindow = 1337
str.flowController.bytesSent = 1337
str.maybeTriggerBlocked()
Expect(handler.receivedBlockedCalled).To(BeTrue())
Expect(handler.receivedBlockedForStream).To(Equal(str.streamID))
})
})
Context("flow control window updating, for receiving", func() {
var receiveFlowControlWindow protocol.ByteCount = 1337
var receiveWindowUpdateThreshold protocol.ByteCount = 1000