From 9ecbfa65ac02761efd2a6c4fcd158132b4a2b2b0 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 18 May 2016 12:28:00 +0700 Subject: [PATCH] add methods needed for connection-level FlowController ref #39 --- flow_controller.go | 36 +++++++++++++++++++++++++++++------- flow_controller_test.go | 12 ++++++++++-- stream.go | 2 +- 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/flow_controller.go b/flow_controller.go index d23b3ace3..5b8aacfeb 100644 --- a/flow_controller.go +++ b/flow_controller.go @@ -23,13 +23,21 @@ type flowController struct { mutex sync.RWMutex } -func newFlowController(connectionParametersManager *handshake.ConnectionParametersManager) *flowController { - return &flowController{ - sendFlowControlWindow: connectionParametersManager.GetSendStreamFlowControlWindow(), - receiveFlowControlWindow: connectionParametersManager.GetReceiveStreamFlowControlWindow(), +func newFlowController(streamID protocol.StreamID, connectionParametersManager *handshake.ConnectionParametersManager) *flowController { + fc := flowController{ receiveWindowUpdateThreshold: protocol.WindowUpdateThreshold, receiveFlowControlWindowIncrement: protocol.ReceiveStreamFlowControlWindowIncrement, } + + if streamID == 0 { + fc.sendFlowControlWindow = connectionParametersManager.GetSendConnectionFlowControlWindow() + fc.receiveFlowControlWindow = connectionParametersManager.GetReceiveConnectionFlowControlWindow() + } else { + fc.sendFlowControlWindow = connectionParametersManager.GetSendStreamFlowControlWindow() + fc.receiveFlowControlWindow = connectionParametersManager.GetReceiveStreamFlowControlWindow() + } + + return &fc } func (c *flowController) AddBytesSent(n protocol.ByteCount) { @@ -62,13 +70,27 @@ func (c *flowController) SendWindowSize() protocol.ByteCount { return c.sendFlowControlWindow - c.bytesSent } -func (c *flowController) UpdateHighestReceived(n protocol.ByteCount) { +// UpdateHighestReceived updates the highestReceived value, if the byteOffset is higher +// Should **only** be used for the stream-level FlowController +func (c *flowController) UpdateHighestReceived(byteOffset protocol.ByteCount) protocol.ByteCount { c.mutex.Lock() defer c.mutex.Unlock() - if n > c.highestReceived { - c.highestReceived = n + if byteOffset > c.highestReceived { + increment := byteOffset - c.highestReceived + c.highestReceived = byteOffset + return increment } + return 0 +} + +// IncrementHighestReceived adds an increment to the highestReceived value +// Should **only** be used for the connection-level FlowController +func (c *flowController) IncrementHighestReceived(increment protocol.ByteCount) { + c.mutex.Lock() + defer c.mutex.Unlock() + + c.highestReceived += increment } func (c *flowController) AddBytesRead(n protocol.ByteCount) { diff --git a/flow_controller_test.go b/flow_controller_test.go index 651d8b00f..17c61bbf8 100644 --- a/flow_controller_test.go +++ b/flow_controller_test.go @@ -105,16 +105,24 @@ var _ = Describe("Flow controller", func() { It("updates the highestReceived", func() { controller.highestReceived = 1337 - controller.UpdateHighestReceived(1338) + increment := controller.UpdateHighestReceived(1338) + Expect(increment).To(Equal(protocol.ByteCount(1338 - 1337))) Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1338))) }) It("does not decrease the highestReceived", func() { controller.highestReceived = 1337 - controller.UpdateHighestReceived(1000) + increment := controller.UpdateHighestReceived(1000) + Expect(increment).To(Equal(protocol.ByteCount(0))) Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337))) }) + It("increases the highestReceived by a given increment", func() { + controller.highestReceived = 1337 + controller.IncrementHighestReceived(123) + Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337 + 123))) + }) + It("detects a flow control violation", func() { controller.UpdateHighestReceived(receiveFlowControlWindow + 1) Expect(controller.CheckFlowControlViolation()).To(BeTrue()) diff --git a/stream.go b/stream.go index a20c35f83..1ba7c9c18 100644 --- a/stream.go +++ b/stream.go @@ -51,7 +51,7 @@ func newStream(session streamHandler, connectionParameterManager *handshake.Conn s := &stream{ session: session, streamID: StreamID, - flowController: newFlowController(connectionParameterManager), + flowController: newFlowController(StreamID, connectionParameterManager), } s.newFrameOrErrCond.L = &s.mutex