forked from quic-go/quic-go
handle WINDOW_UPDATEs for streams and connection separately
In IETF QUIC, stream 0 is a valid stream ID, and is not used to encode WINDOW_UPDATEs for the connection any more.
This commit is contained in:
@@ -228,15 +228,8 @@ func (f *flowControlManager) RemainingConnectionWindowSize() protocol.ByteCount
|
||||
return f.connFlowController.SendWindowSize()
|
||||
}
|
||||
|
||||
// streamID may be 0 here
|
||||
func (f *flowControlManager) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
if streamID == 0 {
|
||||
return f.connFlowController.UpdateSendWindow(offset), nil
|
||||
}
|
||||
|
||||
// streamID must not be 0 here
|
||||
func (f *flowControlManager) UpdateStreamWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) {
|
||||
fc, err := f.getFlowController(streamID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@@ -244,6 +237,10 @@ func (f *flowControlManager) UpdateWindow(streamID protocol.StreamID, offset pro
|
||||
return fc.UpdateSendWindow(offset), nil
|
||||
}
|
||||
|
||||
func (f *flowControlManager) UpdateConnectionWindow(offset protocol.ByteCount) bool {
|
||||
return f.connFlowController.UpdateSendWindow(offset)
|
||||
}
|
||||
|
||||
func (f *flowControlManager) getFlowController(streamID protocol.StreamID) (*streamFlowController, error) {
|
||||
streamFlowController, ok := f.streamFlowController[streamID]
|
||||
if !ok {
|
||||
|
||||
@@ -302,19 +302,18 @@ var _ = Describe("Flow Control Manager", func() {
|
||||
Context("window updates", func() {
|
||||
It("updates the window for a normal stream", func() {
|
||||
fcm.NewStream(5, true)
|
||||
updated, err := fcm.UpdateWindow(5, 1000)
|
||||
updated, err := fcm.UpdateStreamWindow(5, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(updated).To(BeTrue())
|
||||
})
|
||||
|
||||
It("updates the connection level window", func() {
|
||||
updated, err := fcm.UpdateWindow(0, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
updated := fcm.UpdateConnectionWindow(1000)
|
||||
Expect(updated).To(BeTrue())
|
||||
})
|
||||
|
||||
It("errors when called for a stream that doesn't exist", func() {
|
||||
_, err := fcm.UpdateWindow(17, 1000)
|
||||
_, err := fcm.UpdateStreamWindow(17, 1000)
|
||||
Expect(err).To(MatchError(errMapAccess))
|
||||
})
|
||||
})
|
||||
@@ -322,7 +321,7 @@ var _ = Describe("Flow Control Manager", func() {
|
||||
Context("window sizes", func() {
|
||||
It("gets the window size of a stream", func() {
|
||||
fcm.NewStream(5, false)
|
||||
updated, err := fcm.UpdateWindow(5, 1000)
|
||||
updated, err := fcm.UpdateStreamWindow(5, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(updated).To(BeTrue())
|
||||
fcm.AddBytesSent(5, 500)
|
||||
@@ -333,8 +332,7 @@ var _ = Describe("Flow Control Manager", func() {
|
||||
|
||||
It("gets the connection window size", func() {
|
||||
fcm.NewStream(5, true)
|
||||
updated, err := fcm.UpdateWindow(0, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
updated := fcm.UpdateConnectionWindow(1000)
|
||||
Expect(updated).To(BeTrue())
|
||||
fcm.AddBytesSent(5, 500)
|
||||
size := fcm.RemainingConnectionWindowSize()
|
||||
@@ -348,10 +346,9 @@ var _ = Describe("Flow Control Manager", func() {
|
||||
|
||||
It("limits the stream window size by the connection window size", func() {
|
||||
fcm.NewStream(5, true)
|
||||
updated, err := fcm.UpdateWindow(0, 500)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
updated := fcm.UpdateConnectionWindow(500)
|
||||
Expect(updated).To(BeTrue())
|
||||
updated, err = fcm.UpdateWindow(5, 1000)
|
||||
updated, err := fcm.UpdateStreamWindow(5, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(updated).To(BeTrue())
|
||||
size, err := fcm.SendWindowSize(5)
|
||||
@@ -361,8 +358,7 @@ var _ = Describe("Flow Control Manager", func() {
|
||||
|
||||
It("does not reduce the size of the connection level window, if the stream does not contribute", func() {
|
||||
fcm.NewStream(3, false)
|
||||
updated, err := fcm.UpdateWindow(0, 1000)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
updated := fcm.UpdateConnectionWindow(1000)
|
||||
Expect(updated).To(BeTrue())
|
||||
fcm.AddBytesSent(3, 456) // WindowSize should return the same value no matter how much was sent
|
||||
size := fcm.RemainingConnectionWindowSize()
|
||||
|
||||
@@ -24,5 +24,6 @@ type FlowControlManager interface {
|
||||
AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error
|
||||
SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error)
|
||||
RemainingConnectionWindowSize() protocol.ByteCount
|
||||
UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error)
|
||||
UpdateStreamWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error)
|
||||
UpdateConnectionWindow(offset protocol.ByteCount) bool
|
||||
}
|
||||
|
||||
@@ -163,15 +163,27 @@ func (_mr *MockFlowControlManagerMockRecorder) RemainingConnectionWindowSize() *
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "RemainingConnectionWindowSize", reflect.TypeOf((*MockFlowControlManager)(nil).RemainingConnectionWindowSize))
|
||||
}
|
||||
|
||||
// UpdateWindow mocks base method
|
||||
func (_m *MockFlowControlManager) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) {
|
||||
ret := _m.ctrl.Call(_m, "UpdateWindow", streamID, offset)
|
||||
// UpdateStreamWindow mocks base method
|
||||
func (_m *MockFlowControlManager) UpdateStreamWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) {
|
||||
ret := _m.ctrl.Call(_m, "UpdateStreamWindow", streamID, offset)
|
||||
ret0, _ := ret[0].(bool)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// UpdateWindow indicates an expected call of UpdateWindow
|
||||
func (_mr *MockFlowControlManagerMockRecorder) UpdateWindow(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateWindow", reflect.TypeOf((*MockFlowControlManager)(nil).UpdateWindow), arg0, arg1)
|
||||
// UpdateStreamWindow indicates an expected call of UpdateStreamWindow
|
||||
func (_mr *MockFlowControlManagerMockRecorder) UpdateStreamWindow(arg0, arg1 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateStreamWindow", reflect.TypeOf((*MockFlowControlManager)(nil).UpdateStreamWindow), arg0, arg1)
|
||||
}
|
||||
|
||||
// UpdateConnectionWindow mocks base method
|
||||
func (_m *MockFlowControlManager) UpdateConnectionWindow(offset protocol.ByteCount) bool {
|
||||
ret := _m.ctrl.Call(_m, "UpdateConnectionWindow", offset)
|
||||
ret0, _ := ret[0].(bool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// UpdateConnectionWindow indicates an expected call of UpdateConnectionWindow
|
||||
func (_mr *MockFlowControlManagerMockRecorder) UpdateConnectionWindow(arg0 interface{}) *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "UpdateConnectionWindow", reflect.TypeOf((*MockFlowControlManager)(nil).UpdateConnectionWindow), arg0)
|
||||
}
|
||||
|
||||
21
session.go
21
session.go
@@ -537,16 +537,19 @@ func (s *session) handleStreamFrame(frame *wire.StreamFrame) error {
|
||||
}
|
||||
|
||||
func (s *session) handleWindowUpdateFrame(frame *wire.WindowUpdateFrame) error {
|
||||
if frame.StreamID != 0 {
|
||||
str, err := s.streamsMap.GetOrOpenStream(frame.StreamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if str == nil {
|
||||
return errWindowUpdateOnClosedStream
|
||||
}
|
||||
if frame.StreamID == 0 {
|
||||
s.flowControlManager.UpdateConnectionWindow(frame.ByteOffset)
|
||||
return nil
|
||||
}
|
||||
_, err := s.flowControlManager.UpdateWindow(frame.StreamID, frame.ByteOffset)
|
||||
|
||||
str, err := s.streamsMap.GetOrOpenStream(frame.StreamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if str == nil {
|
||||
return errWindowUpdateOnClosedStream
|
||||
}
|
||||
_, err = s.flowControlManager.UpdateStreamWindow(frame.StreamID, frame.ByteOffset)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
@@ -514,33 +516,42 @@ var _ = Describe("Session", func() {
|
||||
})
|
||||
|
||||
Context("handling WINDOW_UPDATE frames", func() {
|
||||
var fcm *mocks.MockFlowControlManager
|
||||
|
||||
BeforeEach(func() {
|
||||
sess.flowControlManager.UpdateTransportParameters(&handshake.TransportParameters{ConnectionFlowControlWindow: 0x1000})
|
||||
fcm = mocks.NewMockFlowControlManager(mockCtrl)
|
||||
sess.flowControlManager = fcm
|
||||
fcm.EXPECT().NewStream(gomock.Any(), gomock.Any()).AnyTimes()
|
||||
})
|
||||
|
||||
It("updates the Flow Control Window of a stream", func() {
|
||||
offset := protocol.ByteCount(0x1234)
|
||||
fcm.EXPECT().UpdateStreamWindow(protocol.StreamID(5), offset)
|
||||
_, err := sess.GetOrOpenStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = sess.handleWindowUpdateFrame(&wire.WindowUpdateFrame{
|
||||
StreamID: 5,
|
||||
ByteOffset: 100,
|
||||
ByteOffset: offset,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(sess.flowControlManager.SendWindowSize(5)).To(Equal(protocol.ByteCount(100)))
|
||||
})
|
||||
|
||||
It("updates the Flow Control Window of the connection", func() {
|
||||
offset := protocol.ByteCount(0x800000)
|
||||
fcm.EXPECT().UpdateConnectionWindow(offset)
|
||||
err := sess.handleWindowUpdateFrame(&wire.WindowUpdateFrame{
|
||||
StreamID: 0,
|
||||
ByteOffset: 0x800000,
|
||||
ByteOffset: offset,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
It("opens a new stream when receiving a WINDOW_UPDATE for an unknown stream", func() {
|
||||
offset := protocol.ByteCount(0x1337)
|
||||
fcm.EXPECT().UpdateStreamWindow(protocol.StreamID(5), offset)
|
||||
err := sess.handleWindowUpdateFrame(&wire.WindowUpdateFrame{
|
||||
StreamID: 5,
|
||||
ByteOffset: 1337,
|
||||
ByteOffset: offset,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
str, err := sess.streamsMap.GetOrOpenStream(5)
|
||||
|
||||
Reference in New Issue
Block a user