forked from quic-go/quic-go
simplify generation of STREAM_DATA_BLOCKED frames (#4608)
The stream always gets blocked at the current write offset. There's no need to return this offset from the flow controller.
This commit is contained in:
@@ -9,7 +9,6 @@ type flowController interface {
|
||||
AddBytesSent(protocol.ByteCount)
|
||||
// for receiving
|
||||
GetWindowUpdate() protocol.ByteCount // returns 0 if no update is necessary
|
||||
IsNewlyBlocked() (bool, protocol.ByteCount)
|
||||
}
|
||||
|
||||
// A StreamFlowController is a flow controller for a QUIC stream.
|
||||
@@ -23,6 +22,7 @@ type StreamFlowController interface {
|
||||
// Abandon is called when reading from the stream is aborted early,
|
||||
// and there won't be any further calls to AddBytesRead.
|
||||
Abandon()
|
||||
IsNewlyBlocked() bool
|
||||
}
|
||||
|
||||
// The ConnectionFlowController is the flow controller for the connection.
|
||||
@@ -30,6 +30,7 @@ type ConnectionFlowController interface {
|
||||
flowController
|
||||
AddBytesRead(protocol.ByteCount)
|
||||
Reset() error
|
||||
IsNewlyBlocked() (bool, protocol.ByteCount)
|
||||
}
|
||||
|
||||
type connectionFlowControllerI interface {
|
||||
|
||||
@@ -121,6 +121,11 @@ func (c *streamFlowController) SendWindowSize() protocol.ByteCount {
|
||||
return min(c.baseFlowController.sendWindowSize(), c.connection.SendWindowSize())
|
||||
}
|
||||
|
||||
func (c *streamFlowController) IsNewlyBlocked() bool {
|
||||
blocked, _ := c.baseFlowController.IsNewlyBlocked()
|
||||
return blocked
|
||||
}
|
||||
|
||||
func (c *streamFlowController) shouldQueueWindowUpdate() bool {
|
||||
return !c.receivedFinalOffset && c.hasWindowUpdate()
|
||||
}
|
||||
|
||||
@@ -188,12 +188,11 @@ func (c *MockStreamFlowControllerGetWindowUpdateCall) DoAndReturn(f func() proto
|
||||
}
|
||||
|
||||
// IsNewlyBlocked mocks base method.
|
||||
func (m *MockStreamFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) {
|
||||
func (m *MockStreamFlowController) IsNewlyBlocked() bool {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "IsNewlyBlocked")
|
||||
ret0, _ := ret[0].(bool)
|
||||
ret1, _ := ret[1].(protocol.ByteCount)
|
||||
return ret0, ret1
|
||||
return ret0
|
||||
}
|
||||
|
||||
// IsNewlyBlocked indicates an expected call of IsNewlyBlocked.
|
||||
@@ -209,19 +208,19 @@ type MockStreamFlowControllerIsNewlyBlockedCall struct {
|
||||
}
|
||||
|
||||
// Return rewrite *gomock.Call.Return
|
||||
func (c *MockStreamFlowControllerIsNewlyBlockedCall) Return(arg0 bool, arg1 protocol.ByteCount) *MockStreamFlowControllerIsNewlyBlockedCall {
|
||||
c.Call = c.Call.Return(arg0, arg1)
|
||||
func (c *MockStreamFlowControllerIsNewlyBlockedCall) Return(arg0 bool) *MockStreamFlowControllerIsNewlyBlockedCall {
|
||||
c.Call = c.Call.Return(arg0)
|
||||
return c
|
||||
}
|
||||
|
||||
// Do rewrite *gomock.Call.Do
|
||||
func (c *MockStreamFlowControllerIsNewlyBlockedCall) Do(f func() (bool, protocol.ByteCount)) *MockStreamFlowControllerIsNewlyBlockedCall {
|
||||
func (c *MockStreamFlowControllerIsNewlyBlockedCall) Do(f func() bool) *MockStreamFlowControllerIsNewlyBlockedCall {
|
||||
c.Call = c.Call.Do(f)
|
||||
return c
|
||||
}
|
||||
|
||||
// DoAndReturn rewrite *gomock.Call.DoAndReturn
|
||||
func (c *MockStreamFlowControllerIsNewlyBlockedCall) DoAndReturn(f func() (bool, protocol.ByteCount)) *MockStreamFlowControllerIsNewlyBlockedCall {
|
||||
func (c *MockStreamFlowControllerIsNewlyBlockedCall) DoAndReturn(f func() bool) *MockStreamFlowControllerIsNewlyBlockedCall {
|
||||
c.Call = c.Call.DoAndReturn(f)
|
||||
return c
|
||||
}
|
||||
|
||||
@@ -262,10 +262,10 @@ func (s *sendStream) popNewOrRetransmittedStreamFrame(maxBytes protocol.ByteCoun
|
||||
|
||||
sendWindow := s.flowController.SendWindowSize()
|
||||
if sendWindow == 0 {
|
||||
if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked {
|
||||
if s.flowController.IsNewlyBlocked() {
|
||||
s.sender.queueControlFrame(&wire.StreamDataBlockedFrame{
|
||||
StreamID: s.streamID,
|
||||
MaximumStreamData: offset,
|
||||
MaximumStreamData: s.writeOffset,
|
||||
})
|
||||
return nil, false
|
||||
}
|
||||
|
||||
@@ -324,56 +324,27 @@ var _ = Describe("Send Stream", func() {
|
||||
|
||||
Context("flow control blocking", func() {
|
||||
It("queues a BLOCKED frame if the stream is flow control blocked", func() {
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(0))
|
||||
mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(12))
|
||||
mockSender.EXPECT().queueControlFrame(&wire.StreamDataBlockedFrame{
|
||||
StreamID: streamID,
|
||||
MaximumStreamData: 12,
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
defer close(done)
|
||||
mockSender.EXPECT().onHasStreamData(streamID, str)
|
||||
_, err := str.Write([]byte("foobar"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
}()
|
||||
waitForWrite()
|
||||
_, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
|
||||
Expect(ok).To(BeFalse())
|
||||
Expect(hasMoreData).To(BeFalse())
|
||||
// make the Write go routine return
|
||||
str.closeForShutdown(nil)
|
||||
Eventually(done).Should(BeClosed())
|
||||
})
|
||||
|
||||
It("says that it doesn't have any more data, when it is flow control blocked", func() {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
defer close(done)
|
||||
mockSender.EXPECT().onHasStreamData(streamID, str)
|
||||
_, err := str.Write([]byte("foobar"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
}()
|
||||
waitForWrite()
|
||||
|
||||
// first pop a STREAM frame of the maximum size allowed by flow control
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(3))
|
||||
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3))
|
||||
f, ok, hasMoreData := str.popStreamFrame(expectedFrameHeaderLen(0)+3, protocol.Version1)
|
||||
Expect(ok).To(BeTrue())
|
||||
Expect(f).ToNot(BeNil())
|
||||
Expect(hasMoreData).To(BeTrue())
|
||||
|
||||
// try to pop again, this time noticing that we're blocked
|
||||
mockFC.EXPECT().SendWindowSize()
|
||||
// don't use offset 3 here, to make sure the BLOCKED frame contains the number returned by the flow controller
|
||||
mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(10))
|
||||
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(0))
|
||||
mockFC.EXPECT().IsNewlyBlocked().Return(true)
|
||||
mockSender.EXPECT().queueControlFrame(&wire.StreamDataBlockedFrame{
|
||||
StreamID: streamID,
|
||||
MaximumStreamData: 10,
|
||||
MaximumStreamData: 3,
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
defer close(done)
|
||||
mockSender.EXPECT().onHasStreamData(streamID, str)
|
||||
_, err := str.Write([]byte("foobar"))
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
}()
|
||||
waitForWrite()
|
||||
f, ok, hasMoreData := str.popStreamFrame(1000, protocol.Version1)
|
||||
Expect(ok).To(BeTrue())
|
||||
Expect(hasMoreData).To(BeTrue())
|
||||
Expect(f.Frame.Data).To(HaveLen(3))
|
||||
_, ok, hasMoreData = str.popStreamFrame(1000, protocol.Version1)
|
||||
Expect(ok).To(BeFalse())
|
||||
Expect(hasMoreData).To(BeFalse())
|
||||
|
||||
Reference in New Issue
Block a user