From ac54ac66c7b973d75ecbf159da9d8d3f151c8244 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 16 Jun 2016 10:55:34 +0700 Subject: [PATCH] add methods for sending data to FlowControlManager --- flowcontrol/flow_control_manager.go | 56 +++++++++++++++++-- flowcontrol/flow_control_manager_test.go | 68 ++++++++++++++++++++++++ flowcontrol/flow_controller.go | 8 +++ flowcontrol/flow_controller_test.go | 7 +++ flowcontrol/interface.go | 7 +++ stream_test.go | 13 +++++ 6 files changed, 155 insertions(+), 4 deletions(-) diff --git a/flowcontrol/flow_control_manager.go b/flowcontrol/flow_control_manager.go index 91ccd345..2c209bb0 100644 --- a/flowcontrol/flow_control_manager.go +++ b/flowcontrol/flow_control_manager.go @@ -52,11 +52,8 @@ func (f *flowControlManager) NewStream(streamID protocol.StreamID, contributesTo // UpdateHighestReceived updates the highest received byte offset for a stream // it adds the number of additional bytes to connection level flow control +// streamID must not be 0 here func (f *flowControlManager) UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error { - if streamID == 0 { - return errors.New("UpdateHightestReceived requires an actual StreamID.") - } - f.mutex.Lock() defer f.mutex.Unlock() @@ -81,6 +78,7 @@ func (f *flowControlManager) UpdateHighestReceived(streamID protocol.StreamID, b return nil } +// streamID must not be 0 here func (f *flowControlManager) AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error { f.mutex.Lock() defer f.mutex.Unlock() @@ -99,6 +97,7 @@ func (f *flowControlManager) AddBytesRead(streamID protocol.StreamID, n protocol return nil } +// streamID must not be 0 here func (f *flowControlManager) MaybeTriggerStreamWindowUpdate(streamID protocol.StreamID) (bool, protocol.ByteCount, error) { f.mutex.Lock() defer f.mutex.Unlock() @@ -119,6 +118,55 @@ func (f *flowControlManager) MaybeTriggerConnectionWindowUpdate() (bool, protoco return f.streamFlowController[0].MaybeTriggerWindowUpdate() } +// streamID must not be 0 here +func (f *flowControlManager) AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error { + f.mutex.Lock() + defer f.mutex.Unlock() + + streamFlowController, err := f.getFlowController(streamID) + if err != nil { + return err + } + + streamFlowController.AddBytesSent(n) + + if f.contributesToConnectionFlowControl[streamID] { + f.streamFlowController[0].AddBytesSent(n) + } + + return nil +} + +// must not be called with StreamID 0 +func (f *flowControlManager) SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + + streamFlowController, err := f.getFlowController(streamID) + if err != nil { + return 0, err + } + + return streamFlowController.SendWindowOffset(), nil +} + +func (f *flowControlManager) RemainingConnectionWindowSize() protocol.ByteCount { + return f.streamFlowController[0].SendWindowSize() +} + +// streamID may be 0 here +func (f *flowControlManager) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + + streamFlowController, err := f.getFlowController(streamID) + if err != nil { + return false, err + } + + return streamFlowController.UpdateSendWindow(offset), nil +} + func (f *flowControlManager) getFlowController(streamID protocol.StreamID) (FlowController, error) { streamFlowController, ok := f.streamFlowController[streamID] if !ok { diff --git a/flowcontrol/flow_control_manager_test.go b/flowcontrol/flow_control_manager_test.go index c86adc7c..4c36eff3 100644 --- a/flowcontrol/flow_control_manager_test.go +++ b/flowcontrol/flow_control_manager_test.go @@ -63,6 +63,11 @@ var _ = Describe("Flow Control Manager", func() { Expect(fcm.streamFlowController[1].highestReceived).To(Equal(protocol.ByteCount(0x100))) }) + It("returns an error when called with an unknown stream", func() { + err := fcm.UpdateHighestReceived(1337, 0x1337) + Expect(err).To(MatchError(errMapAccess)) + }) + Context("flow control violations", func() { It("errors when encountering a stream level flow control violation", func() { err := fcm.UpdateHighestReceived(4, 0x101) @@ -104,4 +109,67 @@ var _ = Describe("Flow Control Manager", func() { }) }) }) + + Context("sending data", func() { + It("adds bytes sent for all stream contributing to connection level flow control", func() { + fcm.NewStream(1, false) + fcm.NewStream(3, true) + fcm.NewStream(5, true) + err := fcm.AddBytesSent(1, 0x100) + Expect(err).ToNot(HaveOccurred()) + err = fcm.AddBytesSent(3, 0x200) + Expect(err).ToNot(HaveOccurred()) + err = fcm.AddBytesSent(5, 0x500) + Expect(err).ToNot(HaveOccurred()) + Expect(fcm.streamFlowController[0].bytesSent).To(Equal(protocol.ByteCount(0x200 + 0x500))) + }) + + Context("window updates", func() { + It("updates the window for a normal stream", func() { + fcm.NewStream(5, true) + updated, err := fcm.UpdateWindow(5, 0x1000) + Expect(err).ToNot(HaveOccurred()) + Expect(updated).To(BeTrue()) + }) + + It("updates the connection level window", func() { + updated, err := fcm.UpdateWindow(0, 0x1000) + Expect(err).ToNot(HaveOccurred()) + Expect(updated).To(BeTrue()) + }) + }) + + Context("window sizes", func() { + It("gets the window size of a stream", func() { + fcm.NewStream(5, true) + updated, err := fcm.UpdateWindow(5, 0x1000) + Expect(err).ToNot(HaveOccurred()) + Expect(updated).To(BeTrue()) + fcm.AddBytesSent(5, 0x500) // WindowSize should return the same value no matter how much was sent + size, err := fcm.SendWindowSize(5) + Expect(err).ToNot(HaveOccurred()) + Expect(size).To(Equal(protocol.ByteCount(0x1000))) + }) + + It("gets the window size of a stream", func() { + fcm.NewStream(5, true) + updated, err := fcm.UpdateWindow(0, 0x1000) + Expect(err).ToNot(HaveOccurred()) + Expect(updated).To(BeTrue()) + fcm.AddBytesSent(5, 0x456) // WindowSize should return the same value no matter how much was sent + size := fcm.RemainingConnectionWindowSize() + Expect(size).To(Equal(protocol.ByteCount(0x1000 - 0x456))) + }) + + It("does not reduce the size of the connection level window, if the stream does not contribute", func() { + fcm.NewStream(3, false) + updated, err := fcm.UpdateWindow(0, 0x1000) + Expect(err).ToNot(HaveOccurred()) + Expect(updated).To(BeTrue()) + fcm.AddBytesSent(3, 0x456) // WindowSize should return the same value no matter how much was sent + size := fcm.RemainingConnectionWindowSize() + Expect(size).To(Equal(protocol.ByteCount(0x1000))) + }) + }) + }) }) diff --git a/flowcontrol/flow_controller.go b/flowcontrol/flow_controller.go index d5de2b4f..36169cc4 100644 --- a/flowcontrol/flow_controller.go +++ b/flowcontrol/flow_controller.go @@ -73,6 +73,7 @@ func (c *flowController) UpdateSendWindow(newOffset protocol.ByteCount) bool { return false } +// TODO: remove once the Stream doesn't use it anymore func (c *flowController) SendWindowSize() protocol.ByteCount { c.mutex.RLock() defer c.mutex.RUnlock() @@ -85,6 +86,13 @@ func (c *flowController) SendWindowSize() protocol.ByteCount { return sendFlowControlWindow - c.bytesSent } +func (c *flowController) SendWindowOffset() protocol.ByteCount { + c.mutex.RLock() + defer c.mutex.RUnlock() + + return c.getSendFlowControlWindow() +} + // UpdateHighestReceived updates the highestReceived value, if the byteOffset is higher // Should **only** be used for the stream-level FlowController func (c *flowController) UpdateHighestReceived(byteOffset protocol.ByteCount) protocol.ByteCount { diff --git a/flowcontrol/flow_controller_test.go b/flowcontrol/flow_controller_test.go index f99b3022..a3ffec47 100644 --- a/flowcontrol/flow_controller_test.go +++ b/flowcontrol/flow_controller_test.go @@ -79,10 +79,17 @@ var _ = Describe("Flow controller", func() { Expect(controller.SendWindowSize()).To(Equal(protocol.ByteCount(12 - 5))) }) + It("gets the offset of the flow control window", func() { + controller.bytesSent = 5 + controller.sendFlowControlWindow = 12 + Expect(controller.SendWindowOffset()).To(Equal(protocol.ByteCount(12))) + }) + It("updates the size of the flow control window", func() { controller.bytesSent = 5 updateSuccessful := controller.UpdateSendWindow(15) Expect(updateSuccessful).To(BeTrue()) + Expect(controller.SendWindowOffset()).To(Equal(protocol.ByteCount(15))) Expect(controller.SendWindowSize()).To(Equal(protocol.ByteCount(15 - 5))) }) diff --git a/flowcontrol/interface.go b/flowcontrol/interface.go index 2591e008..3cdf9681 100644 --- a/flowcontrol/interface.go +++ b/flowcontrol/interface.go @@ -7,6 +7,7 @@ type FlowController interface { AddBytesSent(n protocol.ByteCount) UpdateSendWindow(newOffset protocol.ByteCount) bool SendWindowSize() protocol.ByteCount + SendWindowOffset() protocol.ByteCount UpdateHighestReceived(byteOffset protocol.ByteCount) protocol.ByteCount IncrementHighestReceived(increment protocol.ByteCount) AddBytesRead(n protocol.ByteCount) @@ -18,8 +19,14 @@ type FlowController interface { // A FlowControlManager manages the flow control type FlowControlManager interface { NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool) + // methods needed for receiving data UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error MaybeTriggerStreamWindowUpdate(streamID protocol.StreamID) (bool, protocol.ByteCount, error) MaybeTriggerConnectionWindowUpdate() (bool, protocol.ByteCount) + // methods needed for sending data + AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error + SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error) + RemainingConnectionWindowSize() protocol.ByteCount + UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) } diff --git a/stream_test.go b/stream_test.go index 55a3675a..6d7a9fa0 100644 --- a/stream_test.go +++ b/stream_test.go @@ -74,6 +74,19 @@ func (m *mockFlowControlHandler) UpdateHighestReceived(streamID protocol.StreamI return nil } +func (m *mockFlowControlHandler) AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error { + panic("not implemented") +} +func (m *mockFlowControlHandler) SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error) { + panic("not implemented") +} +func (m *mockFlowControlHandler) RemainingConnectionWindowSize() protocol.ByteCount { + panic("not implemented") +} +func (m *mockFlowControlHandler) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) { + panic("not implemented") +} + var _ = Describe("Stream", func() { var ( str *stream