forked from quic-go/quic-go
71
stream.go
71
stream.go
@@ -18,7 +18,10 @@ type streamHandler interface {
|
||||
streamBlocked(streamID protocol.StreamID)
|
||||
}
|
||||
|
||||
var errFlowControlViolation = qerr.FlowControlReceivedTooMuchData
|
||||
var (
|
||||
errFlowControlViolation = qerr.FlowControlReceivedTooMuchData
|
||||
errConnectionFlowControlViolation = qerr.FlowControlReceivedTooMuchData
|
||||
)
|
||||
|
||||
// A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
|
||||
type stream struct {
|
||||
@@ -41,17 +44,27 @@ type stream struct {
|
||||
frameQueue streamFrameSorter
|
||||
newFrameOrErrCond sync.Cond
|
||||
|
||||
flowController *flowController
|
||||
flowController *flowController
|
||||
connectionFlowController *flowController
|
||||
contributesToConnectionFlowControl bool
|
||||
|
||||
windowUpdateOrErrCond sync.Cond
|
||||
}
|
||||
|
||||
// newStream creates a new Stream
|
||||
func newStream(session streamHandler, connectionParameterManager *handshake.ConnectionParametersManager, StreamID protocol.StreamID) (*stream, error) {
|
||||
func newStream(session streamHandler, connectionParameterManager *handshake.ConnectionParametersManager, connectionFlowController *flowController, StreamID protocol.StreamID) (*stream, error) {
|
||||
s := &stream{
|
||||
session: session,
|
||||
streamID: StreamID,
|
||||
flowController: newFlowController(StreamID, connectionParameterManager),
|
||||
session: session,
|
||||
streamID: StreamID,
|
||||
connectionFlowController: connectionFlowController,
|
||||
contributesToConnectionFlowControl: true,
|
||||
flowController: newFlowController(StreamID, connectionParameterManager),
|
||||
}
|
||||
|
||||
// crypto and header stream don't contribute to connection level flow control
|
||||
// TODO: only include the header stream here when using HTTP2
|
||||
if s.streamID == 1 || s.streamID == 3 {
|
||||
s.contributesToConnectionFlowControl = false
|
||||
}
|
||||
|
||||
s.newFrameOrErrCond.L = &s.mutex
|
||||
@@ -113,7 +126,11 @@ func (s *stream) Read(p []byte) (int, error) {
|
||||
s.readPosInFrame += m
|
||||
bytesRead += m
|
||||
s.readOffset += protocol.ByteCount(m)
|
||||
|
||||
s.flowController.AddBytesRead(protocol.ByteCount(m))
|
||||
if s.contributesToConnectionFlowControl {
|
||||
s.connectionFlowController.AddBytesRead(protocol.ByteCount(m))
|
||||
}
|
||||
|
||||
s.maybeTriggerWindowUpdate()
|
||||
|
||||
@@ -139,6 +156,10 @@ func (s *stream) ReadByte() (byte, error) {
|
||||
return p[0], err
|
||||
}
|
||||
|
||||
func (s *stream) ConnectionFlowControlWindowUpdated() {
|
||||
s.windowUpdateOrErrCond.Broadcast()
|
||||
}
|
||||
|
||||
func (s *stream) UpdateSendFlowControlWindow(n protocol.ByteCount) {
|
||||
if s.flowController.UpdateSendWindow(n) {
|
||||
s.windowUpdateOrErrCond.Broadcast()
|
||||
@@ -159,9 +180,15 @@ func (s *stream) Write(p []byte) (int, error) {
|
||||
for dataWritten < len(p) {
|
||||
s.mutex.Lock()
|
||||
remainingBytesInWindow := s.flowController.SendWindowSize()
|
||||
if s.contributesToConnectionFlowControl {
|
||||
remainingBytesInWindow = utils.MinByteCount(remainingBytesInWindow, s.connectionFlowController.SendWindowSize())
|
||||
}
|
||||
for remainingBytesInWindow == 0 && s.err == nil {
|
||||
s.windowUpdateOrErrCond.Wait()
|
||||
remainingBytesInWindow = s.flowController.SendWindowSize()
|
||||
if s.contributesToConnectionFlowControl {
|
||||
remainingBytesInWindow = utils.MinByteCount(remainingBytesInWindow, s.connectionFlowController.SendWindowSize())
|
||||
}
|
||||
}
|
||||
s.mutex.Unlock()
|
||||
|
||||
@@ -178,12 +205,16 @@ func (s *stream) Write(p []byte) (int, error) {
|
||||
Offset: s.writeOffset,
|
||||
Data: data,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
dataWritten += dataLen
|
||||
s.flowController.AddBytesSent(protocol.ByteCount(dataLen))
|
||||
if s.contributesToConnectionFlowControl {
|
||||
s.connectionFlowController.AddBytesSent(protocol.ByteCount(dataLen))
|
||||
}
|
||||
s.writeOffset += protocol.ByteCount(dataLen)
|
||||
|
||||
s.maybeTriggerBlocked()
|
||||
@@ -205,10 +236,16 @@ func (s *stream) Close() error {
|
||||
// AddStreamFrame adds a new stream frame
|
||||
func (s *stream) AddStreamFrame(frame *frames.StreamFrame) error {
|
||||
maxOffset := frame.Offset + protocol.ByteCount(len(frame.Data))
|
||||
s.flowController.UpdateHighestReceived(maxOffset)
|
||||
increment := s.flowController.UpdateHighestReceived(maxOffset)
|
||||
if s.contributesToConnectionFlowControl {
|
||||
s.connectionFlowController.IncrementHighestReceived(increment)
|
||||
}
|
||||
if s.flowController.CheckFlowControlViolation() {
|
||||
return errFlowControlViolation
|
||||
}
|
||||
if s.connectionFlowController.CheckFlowControlViolation() {
|
||||
return errConnectionFlowControlViolation
|
||||
}
|
||||
|
||||
s.mutex.Lock()
|
||||
s.frameQueue.Push(frame)
|
||||
@@ -218,19 +255,33 @@ func (s *stream) AddStreamFrame(frame *frames.StreamFrame) error {
|
||||
}
|
||||
|
||||
func (s *stream) maybeTriggerWindowUpdate() {
|
||||
// check for stream level window updates
|
||||
doUpdate, byteOffset := s.flowController.MaybeTriggerWindowUpdate()
|
||||
|
||||
if doUpdate {
|
||||
s.session.updateReceiveFlowControlWindow(s.streamID, byteOffset)
|
||||
}
|
||||
|
||||
// check for connection level window updates
|
||||
doUpdate, byteOffset = s.connectionFlowController.MaybeTriggerWindowUpdate()
|
||||
if doUpdate {
|
||||
s.session.updateReceiveFlowControlWindow(0, byteOffset)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *stream) maybeTriggerBlocked() {
|
||||
doIt := s.flowController.MaybeTriggerBlocked()
|
||||
streamBlocked := s.flowController.MaybeTriggerBlocked()
|
||||
|
||||
if doIt {
|
||||
if streamBlocked {
|
||||
s.session.streamBlocked(s.streamID)
|
||||
}
|
||||
|
||||
if s.contributesToConnectionFlowControl {
|
||||
connectionBlocked := s.connectionFlowController.MaybeTriggerBlocked()
|
||||
|
||||
if connectionBlocked {
|
||||
s.session.streamBlocked(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterError is called by session to indicate that an error occurred and the
|
||||
|
||||
Reference in New Issue
Block a user