add methods needed for connection-level FlowController

ref #39
This commit is contained in:
Marten Seemann
2016-05-18 12:28:00 +07:00
parent e0801262bc
commit 9ecbfa65ac
3 changed files with 40 additions and 10 deletions

View File

@@ -23,13 +23,21 @@ type flowController struct {
mutex sync.RWMutex mutex sync.RWMutex
} }
func newFlowController(connectionParametersManager *handshake.ConnectionParametersManager) *flowController { func newFlowController(streamID protocol.StreamID, connectionParametersManager *handshake.ConnectionParametersManager) *flowController {
return &flowController{ fc := flowController{
sendFlowControlWindow: connectionParametersManager.GetSendStreamFlowControlWindow(),
receiveFlowControlWindow: connectionParametersManager.GetReceiveStreamFlowControlWindow(),
receiveWindowUpdateThreshold: protocol.WindowUpdateThreshold, receiveWindowUpdateThreshold: protocol.WindowUpdateThreshold,
receiveFlowControlWindowIncrement: protocol.ReceiveStreamFlowControlWindowIncrement, receiveFlowControlWindowIncrement: protocol.ReceiveStreamFlowControlWindowIncrement,
} }
if streamID == 0 {
fc.sendFlowControlWindow = connectionParametersManager.GetSendConnectionFlowControlWindow()
fc.receiveFlowControlWindow = connectionParametersManager.GetReceiveConnectionFlowControlWindow()
} else {
fc.sendFlowControlWindow = connectionParametersManager.GetSendStreamFlowControlWindow()
fc.receiveFlowControlWindow = connectionParametersManager.GetReceiveStreamFlowControlWindow()
}
return &fc
} }
func (c *flowController) AddBytesSent(n protocol.ByteCount) { func (c *flowController) AddBytesSent(n protocol.ByteCount) {
@@ -62,13 +70,27 @@ func (c *flowController) SendWindowSize() protocol.ByteCount {
return c.sendFlowControlWindow - c.bytesSent return c.sendFlowControlWindow - c.bytesSent
} }
func (c *flowController) UpdateHighestReceived(n protocol.ByteCount) { // UpdateHighestReceived updates the highestReceived value, if the byteOffset is higher
// Should **only** be used for the stream-level FlowController
func (c *flowController) UpdateHighestReceived(byteOffset protocol.ByteCount) protocol.ByteCount {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
if n > c.highestReceived { if byteOffset > c.highestReceived {
c.highestReceived = n increment := byteOffset - c.highestReceived
c.highestReceived = byteOffset
return increment
} }
return 0
}
// IncrementHighestReceived adds an increment to the highestReceived value
// Should **only** be used for the connection-level FlowController
func (c *flowController) IncrementHighestReceived(increment protocol.ByteCount) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.highestReceived += increment
} }
func (c *flowController) AddBytesRead(n protocol.ByteCount) { func (c *flowController) AddBytesRead(n protocol.ByteCount) {

View File

@@ -105,16 +105,24 @@ var _ = Describe("Flow controller", func() {
It("updates the highestReceived", func() { It("updates the highestReceived", func() {
controller.highestReceived = 1337 controller.highestReceived = 1337
controller.UpdateHighestReceived(1338) increment := controller.UpdateHighestReceived(1338)
Expect(increment).To(Equal(protocol.ByteCount(1338 - 1337)))
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1338))) Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1338)))
}) })
It("does not decrease the highestReceived", func() { It("does not decrease the highestReceived", func() {
controller.highestReceived = 1337 controller.highestReceived = 1337
controller.UpdateHighestReceived(1000) increment := controller.UpdateHighestReceived(1000)
Expect(increment).To(Equal(protocol.ByteCount(0)))
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337))) Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337)))
}) })
It("increases the highestReceived by a given increment", func() {
controller.highestReceived = 1337
controller.IncrementHighestReceived(123)
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337 + 123)))
})
It("detects a flow control violation", func() { It("detects a flow control violation", func() {
controller.UpdateHighestReceived(receiveFlowControlWindow + 1) controller.UpdateHighestReceived(receiveFlowControlWindow + 1)
Expect(controller.CheckFlowControlViolation()).To(BeTrue()) Expect(controller.CheckFlowControlViolation()).To(BeTrue())

View File

@@ -51,7 +51,7 @@ func newStream(session streamHandler, connectionParameterManager *handshake.Conn
s := &stream{ s := &stream{
session: session, session: session,
streamID: StreamID, streamID: StreamID,
flowController: newFlowController(connectionParameterManager), flowController: newFlowController(StreamID, connectionParameterManager),
} }
s.newFrameOrErrCond.L = &s.mutex s.newFrameOrErrCond.L = &s.mutex