forked from quic-go/quic-go
use FlowControlManager in Stream for Reading data
This commit is contained in:
37
stream.go
37
stream.go
@@ -47,6 +47,8 @@ type stream struct {
|
||||
frameQueue *streamFrameSorter
|
||||
newFrameOrErrCond sync.Cond
|
||||
|
||||
flowControlManager flowcontrol.FlowControlManager
|
||||
// TODO: remove those
|
||||
flowController flowcontrol.FlowController
|
||||
connectionFlowController flowcontrol.FlowController
|
||||
contributesToConnectionFlowControl bool
|
||||
@@ -55,10 +57,11 @@ type stream struct {
|
||||
}
|
||||
|
||||
// newStream creates a new Stream
|
||||
func newStream(session streamHandler, connectionParameterManager *handshake.ConnectionParametersManager, connectionFlowController flowcontrol.FlowController, StreamID protocol.StreamID) (*stream, error) {
|
||||
func newStream(session streamHandler, connectionParameterManager *handshake.ConnectionParametersManager, connectionFlowController flowcontrol.FlowController, flowControlManager flowcontrol.FlowControlManager, StreamID protocol.StreamID) (*stream, error) {
|
||||
s := &stream{
|
||||
session: session,
|
||||
streamID: StreamID,
|
||||
flowControlManager: flowControlManager,
|
||||
connectionFlowController: connectionFlowController,
|
||||
contributesToConnectionFlowControl: true,
|
||||
flowController: flowcontrol.NewFlowController(StreamID, connectionParameterManager),
|
||||
@@ -131,11 +134,7 @@ func (s *stream) Read(p []byte) (int, error) {
|
||||
bytesRead += m
|
||||
s.readOffset += protocol.ByteCount(m)
|
||||
|
||||
s.flowController.AddBytesRead(protocol.ByteCount(m))
|
||||
if s.contributesToConnectionFlowControl {
|
||||
s.connectionFlowController.AddBytesRead(protocol.ByteCount(m))
|
||||
}
|
||||
|
||||
s.flowControlManager.AddBytesRead(s.streamID, protocol.ByteCount(m))
|
||||
s.maybeTriggerWindowUpdate()
|
||||
|
||||
if s.readPosInFrame >= int(frame.DataLen()) {
|
||||
@@ -242,20 +241,21 @@ func (s *stream) Close() error {
|
||||
// AddStreamFrame adds a new stream frame
|
||||
func (s *stream) AddStreamFrame(frame *frames.StreamFrame) error {
|
||||
maxOffset := frame.Offset + frame.DataLen()
|
||||
increment := s.flowController.UpdateHighestReceived(maxOffset)
|
||||
if s.contributesToConnectionFlowControl {
|
||||
s.connectionFlowController.IncrementHighestReceived(increment)
|
||||
}
|
||||
if s.flowController.CheckFlowControlViolation() {
|
||||
err := s.flowControlManager.UpdateHighestReceived(s.streamID, maxOffset)
|
||||
|
||||
if err == flowcontrol.ErrStreamFlowControlViolation {
|
||||
return errFlowControlViolation
|
||||
}
|
||||
if s.connectionFlowController.CheckFlowControlViolation() {
|
||||
if err == flowcontrol.ErrConnectionFlowControlViolation {
|
||||
return errConnectionFlowControlViolation
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
err := s.frameQueue.Push(frame)
|
||||
err = s.frameQueue.Push(frame)
|
||||
if err != nil && err != errDuplicateStreamData {
|
||||
return err
|
||||
}
|
||||
@@ -268,18 +268,23 @@ func (s *stream) CloseRemote(offset protocol.ByteCount) {
|
||||
s.AddStreamFrame(&frames.StreamFrame{FinBit: true, Offset: offset})
|
||||
}
|
||||
|
||||
func (s *stream) maybeTriggerWindowUpdate() {
|
||||
func (s *stream) maybeTriggerWindowUpdate() error {
|
||||
// check for stream level window updates
|
||||
doUpdate, byteOffset := s.flowController.MaybeTriggerWindowUpdate()
|
||||
doUpdate, byteOffset, err := s.flowControlManager.MaybeTriggerStreamWindowUpdate(s.streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if doUpdate {
|
||||
s.session.updateReceiveFlowControlWindow(s.streamID, byteOffset)
|
||||
}
|
||||
|
||||
// check for connection level window updates
|
||||
doUpdate, byteOffset = s.connectionFlowController.MaybeTriggerWindowUpdate()
|
||||
doUpdate, byteOffset = s.flowControlManager.MaybeTriggerConnectionWindowUpdate()
|
||||
if doUpdate {
|
||||
s.session.updateReceiveFlowControlWindow(0, byteOffset)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stream) maybeTriggerBlocked() {
|
||||
|
||||
Reference in New Issue
Block a user