From e087ee7e9f972ff10cc226091600cf287e11017a Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 16 Oct 2017 18:21:01 +0700 Subject: [PATCH] handle WINDOW_UPDATEs for streams and connection separately In IETF QUIC, stream 0 is a valid stream ID, and is not used to encode WINDOW_UPDATEs for the connection any more. --- internal/flowcontrol/flow_control_manager.go | 15 +++++------- .../flowcontrol/flow_control_manager_test.go | 20 +++++++--------- internal/flowcontrol/interface.go | 3 ++- internal/mocks/flow_control_manager.go | 24 ++++++++++++++----- session.go | 21 +++++++++------- session_test.go | 21 ++++++++++++---- 6 files changed, 62 insertions(+), 42 deletions(-) diff --git a/internal/flowcontrol/flow_control_manager.go b/internal/flowcontrol/flow_control_manager.go index 148aa9cc9..55be904e7 100644 --- a/internal/flowcontrol/flow_control_manager.go +++ b/internal/flowcontrol/flow_control_manager.go @@ -228,15 +228,8 @@ func (f *flowControlManager) RemainingConnectionWindowSize() protocol.ByteCount return f.connFlowController.SendWindowSize() } -// streamID may be 0 here -func (f *flowControlManager) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) { - f.mutex.Lock() - defer f.mutex.Unlock() - - if streamID == 0 { - return f.connFlowController.UpdateSendWindow(offset), nil - } - +// streamID must not be 0 here +func (f *flowControlManager) UpdateStreamWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) { fc, err := f.getFlowController(streamID) if err != nil { return false, err @@ -244,6 +237,10 @@ func (f *flowControlManager) UpdateWindow(streamID protocol.StreamID, offset pro return fc.UpdateSendWindow(offset), nil } +func (f *flowControlManager) UpdateConnectionWindow(offset protocol.ByteCount) bool { + return f.connFlowController.UpdateSendWindow(offset) +} + func (f *flowControlManager) getFlowController(streamID protocol.StreamID) (*streamFlowController, error) { streamFlowController, ok := f.streamFlowController[streamID] if !ok { diff --git a/internal/flowcontrol/flow_control_manager_test.go b/internal/flowcontrol/flow_control_manager_test.go index e135a5b72..4e6b9c87f 100644 --- a/internal/flowcontrol/flow_control_manager_test.go +++ b/internal/flowcontrol/flow_control_manager_test.go @@ -302,19 +302,18 @@ var _ = Describe("Flow Control Manager", func() { Context("window updates", func() { It("updates the window for a normal stream", func() { fcm.NewStream(5, true) - updated, err := fcm.UpdateWindow(5, 1000) + updated, err := fcm.UpdateStreamWindow(5, 1000) Expect(err).ToNot(HaveOccurred()) Expect(updated).To(BeTrue()) }) It("updates the connection level window", func() { - updated, err := fcm.UpdateWindow(0, 1000) - Expect(err).ToNot(HaveOccurred()) + updated := fcm.UpdateConnectionWindow(1000) Expect(updated).To(BeTrue()) }) It("errors when called for a stream that doesn't exist", func() { - _, err := fcm.UpdateWindow(17, 1000) + _, err := fcm.UpdateStreamWindow(17, 1000) Expect(err).To(MatchError(errMapAccess)) }) }) @@ -322,7 +321,7 @@ var _ = Describe("Flow Control Manager", func() { Context("window sizes", func() { It("gets the window size of a stream", func() { fcm.NewStream(5, false) - updated, err := fcm.UpdateWindow(5, 1000) + updated, err := fcm.UpdateStreamWindow(5, 1000) Expect(err).ToNot(HaveOccurred()) Expect(updated).To(BeTrue()) fcm.AddBytesSent(5, 500) @@ -333,8 +332,7 @@ var _ = Describe("Flow Control Manager", func() { It("gets the connection window size", func() { fcm.NewStream(5, true) - updated, err := fcm.UpdateWindow(0, 1000) - Expect(err).ToNot(HaveOccurred()) + updated := fcm.UpdateConnectionWindow(1000) Expect(updated).To(BeTrue()) fcm.AddBytesSent(5, 500) size := fcm.RemainingConnectionWindowSize() @@ -348,10 +346,9 @@ var _ = Describe("Flow Control Manager", func() { It("limits the stream window size by the connection window size", func() { fcm.NewStream(5, true) - updated, err := fcm.UpdateWindow(0, 500) - Expect(err).ToNot(HaveOccurred()) + updated := fcm.UpdateConnectionWindow(500) Expect(updated).To(BeTrue()) - updated, err = fcm.UpdateWindow(5, 1000) + updated, err := fcm.UpdateStreamWindow(5, 1000) Expect(err).ToNot(HaveOccurred()) Expect(updated).To(BeTrue()) size, err := fcm.SendWindowSize(5) @@ -361,8 +358,7 @@ var _ = Describe("Flow Control Manager", func() { It("does not reduce the size of the connection level window, if the stream does not contribute", func() { fcm.NewStream(3, false) - updated, err := fcm.UpdateWindow(0, 1000) - Expect(err).ToNot(HaveOccurred()) + updated := fcm.UpdateConnectionWindow(1000) Expect(updated).To(BeTrue()) fcm.AddBytesSent(3, 456) // WindowSize should return the same value no matter how much was sent size := fcm.RemainingConnectionWindowSize() diff --git a/internal/flowcontrol/interface.go b/internal/flowcontrol/interface.go index d62ba79a2..69b84a9c2 100644 --- a/internal/flowcontrol/interface.go +++ b/internal/flowcontrol/interface.go @@ -24,5 +24,6 @@ type FlowControlManager interface { AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error) RemainingConnectionWindowSize() protocol.ByteCount - UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) + UpdateStreamWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) + UpdateConnectionWindow(offset protocol.ByteCount) bool } diff --git a/internal/mocks/flow_control_manager.go b/internal/mocks/flow_control_manager.go index 86a11337f..0aecc9aee 100644 --- a/internal/mocks/flow_control_manager.go +++ b/internal/mocks/flow_control_manager.go @@ -163,15 +163,27 @@ func (_mr *MockFlowControlManagerMockRecorder) RemainingConnectionWindowSize() * return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "RemainingConnectionWindowSize", reflect.TypeOf((*MockFlowControlManager)(nil).RemainingConnectionWindowSize)) } -// UpdateWindow mocks base method -func (_m *MockFlowControlManager) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) { - ret := _m.ctrl.Call(_m, "UpdateWindow", streamID, offset) +// UpdateStreamWindow mocks base method +func (_m *MockFlowControlManager) UpdateStreamWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) { + ret := _m.ctrl.Call(_m, "UpdateStreamWindow", streamID, offset) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) return ret0, ret1 } -// UpdateWindow indicates an expected call of UpdateWindow -func (_mr *MockFlowControlManagerMockRecorder) UpdateWindow(arg0, arg1 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateWindow", reflect.TypeOf((*MockFlowControlManager)(nil).UpdateWindow), arg0, arg1) +// UpdateStreamWindow indicates an expected call of UpdateStreamWindow +func (_mr *MockFlowControlManagerMockRecorder) UpdateStreamWindow(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateStreamWindow", reflect.TypeOf((*MockFlowControlManager)(nil).UpdateStreamWindow), arg0, arg1) +} + +// UpdateConnectionWindow mocks base method +func (_m *MockFlowControlManager) UpdateConnectionWindow(offset protocol.ByteCount) bool { + ret := _m.ctrl.Call(_m, "UpdateConnectionWindow", offset) + ret0, _ := ret[0].(bool) + return ret0 +} + +// UpdateConnectionWindow indicates an expected call of UpdateConnectionWindow +func (_mr *MockFlowControlManagerMockRecorder) UpdateConnectionWindow(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateConnectionWindow", reflect.TypeOf((*MockFlowControlManager)(nil).UpdateConnectionWindow), arg0) } diff --git a/session.go b/session.go index f9cf1939b..1da6623a4 100644 --- a/session.go +++ b/session.go @@ -537,16 +537,19 @@ func (s *session) handleStreamFrame(frame *wire.StreamFrame) error { } func (s *session) handleWindowUpdateFrame(frame *wire.WindowUpdateFrame) error { - if frame.StreamID != 0 { - str, err := s.streamsMap.GetOrOpenStream(frame.StreamID) - if err != nil { - return err - } - if str == nil { - return errWindowUpdateOnClosedStream - } + if frame.StreamID == 0 { + s.flowControlManager.UpdateConnectionWindow(frame.ByteOffset) + return nil } - _, err := s.flowControlManager.UpdateWindow(frame.StreamID, frame.ByteOffset) + + str, err := s.streamsMap.GetOrOpenStream(frame.StreamID) + if err != nil { + return err + } + if str == nil { + return errWindowUpdateOnClosedStream + } + _, err = s.flowControlManager.UpdateStreamWindow(frame.StreamID, frame.ByteOffset) return err } diff --git a/session_test.go b/session_test.go index 27b24143a..c9d6c1dcf 100644 --- a/session_test.go +++ b/session_test.go @@ -11,6 +11,8 @@ import ( "strings" "time" + "github.com/golang/mock/gomock" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -514,33 +516,42 @@ var _ = Describe("Session", func() { }) Context("handling WINDOW_UPDATE frames", func() { + var fcm *mocks.MockFlowControlManager + BeforeEach(func() { - sess.flowControlManager.UpdateTransportParameters(&handshake.TransportParameters{ConnectionFlowControlWindow: 0x1000}) + fcm = mocks.NewMockFlowControlManager(mockCtrl) + sess.flowControlManager = fcm + fcm.EXPECT().NewStream(gomock.Any(), gomock.Any()).AnyTimes() }) It("updates the Flow Control Window of a stream", func() { + offset := protocol.ByteCount(0x1234) + fcm.EXPECT().UpdateStreamWindow(protocol.StreamID(5), offset) _, err := sess.GetOrOpenStream(5) Expect(err).ToNot(HaveOccurred()) err = sess.handleWindowUpdateFrame(&wire.WindowUpdateFrame{ StreamID: 5, - ByteOffset: 100, + ByteOffset: offset, }) Expect(err).ToNot(HaveOccurred()) - Expect(sess.flowControlManager.SendWindowSize(5)).To(Equal(protocol.ByteCount(100))) }) It("updates the Flow Control Window of the connection", func() { + offset := protocol.ByteCount(0x800000) + fcm.EXPECT().UpdateConnectionWindow(offset) err := sess.handleWindowUpdateFrame(&wire.WindowUpdateFrame{ StreamID: 0, - ByteOffset: 0x800000, + ByteOffset: offset, }) Expect(err).ToNot(HaveOccurred()) }) It("opens a new stream when receiving a WINDOW_UPDATE for an unknown stream", func() { + offset := protocol.ByteCount(0x1337) + fcm.EXPECT().UpdateStreamWindow(protocol.StreamID(5), offset) err := sess.handleWindowUpdateFrame(&wire.WindowUpdateFrame{ StreamID: 5, - ByteOffset: 1337, + ByteOffset: offset, }) Expect(err).ToNot(HaveOccurred()) str, err := sess.streamsMap.GetOrOpenStream(5)