From 800320ec39bbcdddf0fda4a687f1eb602bd94bb9 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 15 Jun 2016 18:53:54 +0700 Subject: [PATCH] implement basic FlowControlManager --- flowcontrol/flow_control_manager.go | 128 +++++++++++++++++++++++ flowcontrol/flow_control_manager_test.go | 107 +++++++++++++++++++ flowcontrol/flow_controller.go | 3 +- flowcontrol/flow_controller_test.go | 8 +- flowcontrol/interface.go | 9 ++ 5 files changed, 250 insertions(+), 5 deletions(-) create mode 100644 flowcontrol/flow_control_manager.go create mode 100644 flowcontrol/flow_control_manager_test.go diff --git a/flowcontrol/flow_control_manager.go b/flowcontrol/flow_control_manager.go new file mode 100644 index 00000000..91ccd345 --- /dev/null +++ b/flowcontrol/flow_control_manager.go @@ -0,0 +1,128 @@ +package flowcontrol + +import ( + "errors" + "sync" + + "github.com/lucas-clemente/quic-go/handshake" + "github.com/lucas-clemente/quic-go/protocol" +) + +type flowControlManager struct { + connectionParametersManager *handshake.ConnectionParametersManager + streamFlowController map[protocol.StreamID]*flowController + contributesToConnectionFlowControl map[protocol.StreamID]bool + mutex sync.Mutex +} + +var ( + // ErrStreamFlowControlViolation is a stream flow control violation + ErrStreamFlowControlViolation = errors.New("Stream level flow control violation") + // ErrConnectionFlowControlViolation is a connection level flow control violation + ErrConnectionFlowControlViolation = errors.New("Connection level flow control violation") +) + +var errMapAccess = errors.New("Error accessing the flowController map.") + +// NewFlowControlManager creates a new flow control manager +func NewFlowControlManager(connectionParametersManager *handshake.ConnectionParametersManager) FlowControlManager { + fcm := flowControlManager{ + connectionParametersManager: connectionParametersManager, + streamFlowController: make(map[protocol.StreamID]*flowController), + contributesToConnectionFlowControl: make(map[protocol.StreamID]bool), + } + // initialize connection level flow controller + fcm.streamFlowController[0] = NewFlowController(0, connectionParametersManager) + fcm.contributesToConnectionFlowControl[0] = false + return &fcm +} + +// NewStream creates new flow controllers for a stream +func (f *flowControlManager) NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool) { + f.mutex.Lock() + defer f.mutex.Unlock() + + if _, ok := f.streamFlowController[streamID]; ok { + return + } + + f.streamFlowController[streamID] = NewFlowController(streamID, f.connectionParametersManager) + f.contributesToConnectionFlowControl[streamID] = contributesToConnectionFlow +} + +// UpdateHighestReceived updates the highest received byte offset for a stream +// it adds the number of additional bytes to connection level flow control +func (f *flowControlManager) UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error { + if streamID == 0 { + return errors.New("UpdateHightestReceived requires an actual StreamID.") + } + + f.mutex.Lock() + defer f.mutex.Unlock() + + streamFlowController, err := f.getFlowController(streamID) + if err != nil { + return err + } + increment := streamFlowController.UpdateHighestReceived(byteOffset) + + if streamFlowController.CheckFlowControlViolation() { + return ErrStreamFlowControlViolation + } + + if f.contributesToConnectionFlowControl[streamID] { + connectionFlowController := f.streamFlowController[0] + connectionFlowController.IncrementHighestReceived(increment) + if connectionFlowController.CheckFlowControlViolation() { + return ErrConnectionFlowControlViolation + } + } + + return nil +} + +func (f *flowControlManager) AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error { + f.mutex.Lock() + defer f.mutex.Unlock() + + streamFlowController, err := f.getFlowController(streamID) + if err != nil { + return err + } + + streamFlowController.AddBytesRead(n) + + if f.contributesToConnectionFlowControl[streamID] { + f.streamFlowController[0].AddBytesRead(n) + } + + return nil +} + +func (f *flowControlManager) MaybeTriggerStreamWindowUpdate(streamID protocol.StreamID) (bool, protocol.ByteCount, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + + streamFlowController, err := f.getFlowController(streamID) + if err != nil { + return false, 0, err + } + + doIt, offset := streamFlowController.MaybeTriggerWindowUpdate() + return doIt, offset, nil +} + +func (f *flowControlManager) MaybeTriggerConnectionWindowUpdate() (bool, protocol.ByteCount) { + f.mutex.Lock() + defer f.mutex.Unlock() + + return f.streamFlowController[0].MaybeTriggerWindowUpdate() +} + +func (f *flowControlManager) getFlowController(streamID protocol.StreamID) (FlowController, error) { + streamFlowController, ok := f.streamFlowController[streamID] + if !ok { + return nil, errMapAccess + } + return streamFlowController, nil +} diff --git a/flowcontrol/flow_control_manager_test.go b/flowcontrol/flow_control_manager_test.go new file mode 100644 index 00000000..c86adc7c --- /dev/null +++ b/flowcontrol/flow_control_manager_test.go @@ -0,0 +1,107 @@ +package flowcontrol + +import ( + "github.com/lucas-clemente/quic-go/handshake" + "github.com/lucas-clemente/quic-go/protocol" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Flow Control Manager", func() { + var fcm *flowControlManager + var cpm *handshake.ConnectionParametersManager + + BeforeEach(func() { + cpm = &handshake.ConnectionParametersManager{} + setConnectionParametersManagerWindow(cpm, "receiveStreamFlowControlWindow", 0x100) + setConnectionParametersManagerWindow(cpm, "receiveConnectionFlowControlWindow", 0x200) + fcm = NewFlowControlManager(cpm).(*flowControlManager) + }) + + It("creates a connection level flow controller", func() { + Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(0))) + Expect(fcm.contributesToConnectionFlowControl).To(HaveKey(protocol.StreamID(0))) + }) + + Context("creating new streams", func() { + It("creates a new stream", func() { + fcm.NewStream(5, true) + Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(5))) + Expect(fcm.streamFlowController[5]).ToNot(BeNil()) + Expect(fcm.contributesToConnectionFlowControl).To(HaveKey(protocol.StreamID(5))) + Expect(fcm.contributesToConnectionFlowControl[5]).To(BeTrue()) + }) + }) + + Context("receiving data", func() { + BeforeEach(func() { + fcm.NewStream(1, false) + fcm.NewStream(4, true) + fcm.NewStream(6, true) + }) + + It("updates the connection level flow controller if the stream does not contribute", func() { + err := fcm.UpdateHighestReceived(4, 0x100) + Expect(err).ToNot(HaveOccurred()) + Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0x100))) + Expect(fcm.streamFlowController[4].highestReceived).To(Equal(protocol.ByteCount(0x100))) + }) + + It("adds the offsets of multiple streams for the connection flow control window", func() { + err := fcm.UpdateHighestReceived(4, 0x100) + Expect(err).ToNot(HaveOccurred()) + err = fcm.UpdateHighestReceived(6, 0x50) + Expect(err).ToNot(HaveOccurred()) + Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0x100 + 0x50))) + }) + + It("does not update the connection level flow controller if the stream does not contribute", func() { + err := fcm.UpdateHighestReceived(1, 0x100) + // fcm.streamFlowController[4].receiveFlowControlWindow = 0x1000 + Expect(err).ToNot(HaveOccurred()) + Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0))) + Expect(fcm.streamFlowController[1].highestReceived).To(Equal(protocol.ByteCount(0x100))) + }) + + Context("flow control violations", func() { + It("errors when encountering a stream level flow control violation", func() { + err := fcm.UpdateHighestReceived(4, 0x101) + Expect(err).To(MatchError(ErrStreamFlowControlViolation)) + }) + + It("errors when encountering a connection-level flow control violation", func() { + fcm.streamFlowController[4].receiveFlowControlWindow = 0x300 + err := fcm.UpdateHighestReceived(4, 0x201) + Expect(err).To(MatchError(ErrConnectionFlowControlViolation)) + }) + }) + + Context("window updates", func() { + It("gets stream level window updates", func() { + err := fcm.UpdateHighestReceived(4, 0x100) + Expect(err).ToNot(HaveOccurred()) + err = fcm.AddBytesRead(4, 0x100-0x10) + Expect(err).ToNot(HaveOccurred()) + doIt, offset, err := fcm.MaybeTriggerStreamWindowUpdate(4) + Expect(err).ToNot(HaveOccurred()) + Expect(doIt).To(BeTrue()) + Expect(offset).ToNot(Equal(protocol.ByteCount(0x100))) + }) + + It("gets connection level window updates", func() { + err := fcm.UpdateHighestReceived(4, 0x100) + Expect(err).ToNot(HaveOccurred()) + err = fcm.UpdateHighestReceived(6, 0x100) + Expect(err).ToNot(HaveOccurred()) + err = fcm.AddBytesRead(4, 0x100-0x10) + Expect(err).ToNot(HaveOccurred()) + err = fcm.AddBytesRead(6, 0x100-0x10) + Expect(err).ToNot(HaveOccurred()) + doIt, offset := fcm.MaybeTriggerConnectionWindowUpdate() + Expect(err).ToNot(HaveOccurred()) + Expect(doIt).To(BeTrue()) + Expect(offset).ToNot(Equal(protocol.ByteCount(0x200))) + }) + }) + }) +}) diff --git a/flowcontrol/flow_controller.go b/flowcontrol/flow_controller.go index b244d609..8c3981b0 100644 --- a/flowcontrol/flow_controller.go +++ b/flowcontrol/flow_controller.go @@ -25,7 +25,8 @@ type flowController struct { } // NewFlowController gets a new flow controller -func NewFlowController(streamID protocol.StreamID, connectionParametersManager *handshake.ConnectionParametersManager) FlowController { +// TODO: make private +func NewFlowController(streamID protocol.StreamID, connectionParametersManager *handshake.ConnectionParametersManager) *flowController { fc := flowController{ streamID: streamID, connectionParametersManager: connectionParametersManager, diff --git a/flowcontrol/flow_controller_test.go b/flowcontrol/flow_controller_test.go index ffc0861c..f99b3022 100644 --- a/flowcontrol/flow_controller_test.go +++ b/flowcontrol/flow_controller_test.go @@ -35,24 +35,24 @@ var _ = Describe("Flow controller", func() { }) It("reads the stream send and receive windows when acting as stream-level flow controller", func() { - fc := NewFlowController(5, cpm).(*flowController) + fc := NewFlowController(5, cpm) Expect(fc.streamID).To(Equal(protocol.StreamID(5))) Expect(fc.receiveFlowControlWindow).To(Equal(protocol.ByteCount(2000))) }) It("reads the stream send and receive windows when acting as stream-level flow controller", func() { - fc := NewFlowController(0, cpm).(*flowController) + fc := NewFlowController(0, cpm) Expect(fc.streamID).To(Equal(protocol.StreamID(0))) Expect(fc.receiveFlowControlWindow).To(Equal(protocol.ByteCount(4000))) }) It("does not set the stream flow control windows for sending", func() { - fc := NewFlowController(5, cpm).(*flowController) + fc := NewFlowController(5, cpm) Expect(fc.sendFlowControlWindow).To(BeZero()) }) It("does not set the connection flow control windows for sending", func() { - fc := NewFlowController(0, cpm).(*flowController) + fc := NewFlowController(0, cpm) Expect(fc.sendFlowControlWindow).To(BeZero()) }) }) diff --git a/flowcontrol/interface.go b/flowcontrol/interface.go index f7cccb54..8aa662c4 100644 --- a/flowcontrol/interface.go +++ b/flowcontrol/interface.go @@ -15,3 +15,12 @@ type FlowController interface { CheckFlowControlViolation() bool GetHighestReceived() protocol.ByteCount } + +// A FlowControlManager manages the flow control +type FlowControlManager interface { + NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool) + UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error + AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error + MaybeTriggerStreamWindowUpdate(streamID protocol.StreamID) (bool, protocol.ByteCount, error) + MaybeTriggerConnectionWindowUpdate() (bool, protocol.ByteCount) +}