implement basic FlowControlManager

This commit is contained in:
Marten Seemann
2016-06-15 18:53:54 +07:00
parent 37e57c6f05
commit 800320ec39
5 changed files with 250 additions and 5 deletions

View File

@@ -0,0 +1,128 @@
package flowcontrol
import (
"errors"
"sync"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
)
type flowControlManager struct {
connectionParametersManager *handshake.ConnectionParametersManager
streamFlowController map[protocol.StreamID]*flowController
contributesToConnectionFlowControl map[protocol.StreamID]bool
mutex sync.Mutex
}
var (
// ErrStreamFlowControlViolation is a stream flow control violation
ErrStreamFlowControlViolation = errors.New("Stream level flow control violation")
// ErrConnectionFlowControlViolation is a connection level flow control violation
ErrConnectionFlowControlViolation = errors.New("Connection level flow control violation")
)
var errMapAccess = errors.New("Error accessing the flowController map.")
// NewFlowControlManager creates a new flow control manager
func NewFlowControlManager(connectionParametersManager *handshake.ConnectionParametersManager) FlowControlManager {
fcm := flowControlManager{
connectionParametersManager: connectionParametersManager,
streamFlowController: make(map[protocol.StreamID]*flowController),
contributesToConnectionFlowControl: make(map[protocol.StreamID]bool),
}
// initialize connection level flow controller
fcm.streamFlowController[0] = NewFlowController(0, connectionParametersManager)
fcm.contributesToConnectionFlowControl[0] = false
return &fcm
}
// NewStream creates new flow controllers for a stream
func (f *flowControlManager) NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool) {
f.mutex.Lock()
defer f.mutex.Unlock()
if _, ok := f.streamFlowController[streamID]; ok {
return
}
f.streamFlowController[streamID] = NewFlowController(streamID, f.connectionParametersManager)
f.contributesToConnectionFlowControl[streamID] = contributesToConnectionFlow
}
// UpdateHighestReceived updates the highest received byte offset for a stream
// it adds the number of additional bytes to connection level flow control
func (f *flowControlManager) UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error {
if streamID == 0 {
return errors.New("UpdateHightestReceived requires an actual StreamID.")
}
f.mutex.Lock()
defer f.mutex.Unlock()
streamFlowController, err := f.getFlowController(streamID)
if err != nil {
return err
}
increment := streamFlowController.UpdateHighestReceived(byteOffset)
if streamFlowController.CheckFlowControlViolation() {
return ErrStreamFlowControlViolation
}
if f.contributesToConnectionFlowControl[streamID] {
connectionFlowController := f.streamFlowController[0]
connectionFlowController.IncrementHighestReceived(increment)
if connectionFlowController.CheckFlowControlViolation() {
return ErrConnectionFlowControlViolation
}
}
return nil
}
func (f *flowControlManager) AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error {
f.mutex.Lock()
defer f.mutex.Unlock()
streamFlowController, err := f.getFlowController(streamID)
if err != nil {
return err
}
streamFlowController.AddBytesRead(n)
if f.contributesToConnectionFlowControl[streamID] {
f.streamFlowController[0].AddBytesRead(n)
}
return nil
}
func (f *flowControlManager) MaybeTriggerStreamWindowUpdate(streamID protocol.StreamID) (bool, protocol.ByteCount, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
streamFlowController, err := f.getFlowController(streamID)
if err != nil {
return false, 0, err
}
doIt, offset := streamFlowController.MaybeTriggerWindowUpdate()
return doIt, offset, nil
}
func (f *flowControlManager) MaybeTriggerConnectionWindowUpdate() (bool, protocol.ByteCount) {
f.mutex.Lock()
defer f.mutex.Unlock()
return f.streamFlowController[0].MaybeTriggerWindowUpdate()
}
func (f *flowControlManager) getFlowController(streamID protocol.StreamID) (FlowController, error) {
streamFlowController, ok := f.streamFlowController[streamID]
if !ok {
return nil, errMapAccess
}
return streamFlowController, nil
}

View File

@@ -0,0 +1,107 @@
package flowcontrol
import (
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Flow Control Manager", func() {
var fcm *flowControlManager
var cpm *handshake.ConnectionParametersManager
BeforeEach(func() {
cpm = &handshake.ConnectionParametersManager{}
setConnectionParametersManagerWindow(cpm, "receiveStreamFlowControlWindow", 0x100)
setConnectionParametersManagerWindow(cpm, "receiveConnectionFlowControlWindow", 0x200)
fcm = NewFlowControlManager(cpm).(*flowControlManager)
})
It("creates a connection level flow controller", func() {
Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(0)))
Expect(fcm.contributesToConnectionFlowControl).To(HaveKey(protocol.StreamID(0)))
})
Context("creating new streams", func() {
It("creates a new stream", func() {
fcm.NewStream(5, true)
Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(5)))
Expect(fcm.streamFlowController[5]).ToNot(BeNil())
Expect(fcm.contributesToConnectionFlowControl).To(HaveKey(protocol.StreamID(5)))
Expect(fcm.contributesToConnectionFlowControl[5]).To(BeTrue())
})
})
Context("receiving data", func() {
BeforeEach(func() {
fcm.NewStream(1, false)
fcm.NewStream(4, true)
fcm.NewStream(6, true)
})
It("updates the connection level flow controller if the stream does not contribute", func() {
err := fcm.UpdateHighestReceived(4, 0x100)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0x100)))
Expect(fcm.streamFlowController[4].highestReceived).To(Equal(protocol.ByteCount(0x100)))
})
It("adds the offsets of multiple streams for the connection flow control window", func() {
err := fcm.UpdateHighestReceived(4, 0x100)
Expect(err).ToNot(HaveOccurred())
err = fcm.UpdateHighestReceived(6, 0x50)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0x100 + 0x50)))
})
It("does not update the connection level flow controller if the stream does not contribute", func() {
err := fcm.UpdateHighestReceived(1, 0x100)
// fcm.streamFlowController[4].receiveFlowControlWindow = 0x1000
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0)))
Expect(fcm.streamFlowController[1].highestReceived).To(Equal(protocol.ByteCount(0x100)))
})
Context("flow control violations", func() {
It("errors when encountering a stream level flow control violation", func() {
err := fcm.UpdateHighestReceived(4, 0x101)
Expect(err).To(MatchError(ErrStreamFlowControlViolation))
})
It("errors when encountering a connection-level flow control violation", func() {
fcm.streamFlowController[4].receiveFlowControlWindow = 0x300
err := fcm.UpdateHighestReceived(4, 0x201)
Expect(err).To(MatchError(ErrConnectionFlowControlViolation))
})
})
Context("window updates", func() {
It("gets stream level window updates", func() {
err := fcm.UpdateHighestReceived(4, 0x100)
Expect(err).ToNot(HaveOccurred())
err = fcm.AddBytesRead(4, 0x100-0x10)
Expect(err).ToNot(HaveOccurred())
doIt, offset, err := fcm.MaybeTriggerStreamWindowUpdate(4)
Expect(err).ToNot(HaveOccurred())
Expect(doIt).To(BeTrue())
Expect(offset).ToNot(Equal(protocol.ByteCount(0x100)))
})
It("gets connection level window updates", func() {
err := fcm.UpdateHighestReceived(4, 0x100)
Expect(err).ToNot(HaveOccurred())
err = fcm.UpdateHighestReceived(6, 0x100)
Expect(err).ToNot(HaveOccurred())
err = fcm.AddBytesRead(4, 0x100-0x10)
Expect(err).ToNot(HaveOccurred())
err = fcm.AddBytesRead(6, 0x100-0x10)
Expect(err).ToNot(HaveOccurred())
doIt, offset := fcm.MaybeTriggerConnectionWindowUpdate()
Expect(err).ToNot(HaveOccurred())
Expect(doIt).To(BeTrue())
Expect(offset).ToNot(Equal(protocol.ByteCount(0x200)))
})
})
})
})

View File

@@ -25,7 +25,8 @@ type flowController struct {
} }
// NewFlowController gets a new flow controller // NewFlowController gets a new flow controller
func NewFlowController(streamID protocol.StreamID, connectionParametersManager *handshake.ConnectionParametersManager) FlowController { // TODO: make private
func NewFlowController(streamID protocol.StreamID, connectionParametersManager *handshake.ConnectionParametersManager) *flowController {
fc := flowController{ fc := flowController{
streamID: streamID, streamID: streamID,
connectionParametersManager: connectionParametersManager, connectionParametersManager: connectionParametersManager,

View File

@@ -35,24 +35,24 @@ var _ = Describe("Flow controller", func() {
}) })
It("reads the stream send and receive windows when acting as stream-level flow controller", func() { It("reads the stream send and receive windows when acting as stream-level flow controller", func() {
fc := NewFlowController(5, cpm).(*flowController) fc := NewFlowController(5, cpm)
Expect(fc.streamID).To(Equal(protocol.StreamID(5))) Expect(fc.streamID).To(Equal(protocol.StreamID(5)))
Expect(fc.receiveFlowControlWindow).To(Equal(protocol.ByteCount(2000))) Expect(fc.receiveFlowControlWindow).To(Equal(protocol.ByteCount(2000)))
}) })
It("reads the stream send and receive windows when acting as stream-level flow controller", func() { It("reads the stream send and receive windows when acting as stream-level flow controller", func() {
fc := NewFlowController(0, cpm).(*flowController) fc := NewFlowController(0, cpm)
Expect(fc.streamID).To(Equal(protocol.StreamID(0))) Expect(fc.streamID).To(Equal(protocol.StreamID(0)))
Expect(fc.receiveFlowControlWindow).To(Equal(protocol.ByteCount(4000))) Expect(fc.receiveFlowControlWindow).To(Equal(protocol.ByteCount(4000)))
}) })
It("does not set the stream flow control windows for sending", func() { It("does not set the stream flow control windows for sending", func() {
fc := NewFlowController(5, cpm).(*flowController) fc := NewFlowController(5, cpm)
Expect(fc.sendFlowControlWindow).To(BeZero()) Expect(fc.sendFlowControlWindow).To(BeZero())
}) })
It("does not set the connection flow control windows for sending", func() { It("does not set the connection flow control windows for sending", func() {
fc := NewFlowController(0, cpm).(*flowController) fc := NewFlowController(0, cpm)
Expect(fc.sendFlowControlWindow).To(BeZero()) Expect(fc.sendFlowControlWindow).To(BeZero())
}) })
}) })

View File

@@ -15,3 +15,12 @@ type FlowController interface {
CheckFlowControlViolation() bool CheckFlowControlViolation() bool
GetHighestReceived() protocol.ByteCount GetHighestReceived() protocol.ByteCount
} }
// A FlowControlManager manages the flow control
type FlowControlManager interface {
NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool)
UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error
AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error
MaybeTriggerStreamWindowUpdate(streamID protocol.StreamID) (bool, protocol.ByteCount, error)
MaybeTriggerConnectionWindowUpdate() (bool, protocol.ByteCount)
}