reliably queue MAX_DATA frames (#4844)

This commit is contained in:
Marten Seemann
2025-01-08 09:53:23 +08:00
committed by GitHub
parent 076db77a26
commit 5a6187c870
12 changed files with 154 additions and 52 deletions

View File

@@ -91,16 +91,19 @@ func (s *receiveStream) Read(p []byte) (int, error) {
defer func() { <-s.readOnce }()
s.mutex.Lock()
queuedNewControlFrame, n, err := s.readImpl(p)
queuedStreamWindowUpdate, queuedConnWindowUpdate, n, err := s.readImpl(p)
completed := s.isNewlyCompleted()
s.mutex.Unlock()
if completed {
s.sender.onStreamCompleted(s.streamID)
}
if queuedNewControlFrame {
if queuedStreamWindowUpdate {
s.sender.onHasStreamControlFrame(s.streamID, s)
}
if queuedConnWindowUpdate {
s.sender.onHasConnectionData()
}
return n, err
}
@@ -125,20 +128,19 @@ func (s *receiveStream) isNewlyCompleted() bool {
return false
}
func (s *receiveStream) readImpl(p []byte) (bool, int, error) {
func (s *receiveStream) readImpl(p []byte) (hasStreamWindowUpdate bool, hasConnWindowUpdate bool, _ int, _ error) {
if s.currentFrameIsLast && s.currentFrame == nil {
s.errorRead = true
return false, 0, io.EOF
return false, false, 0, io.EOF
}
if s.cancelledRemotely || s.cancelledLocally {
s.errorRead = true
return false, 0, s.cancelErr
return false, false, 0, s.cancelErr
}
if s.closeForShutdownErr != nil {
return false, 0, s.closeForShutdownErr
return false, false, 0, s.closeForShutdownErr
}
var queuedNewControlFrame bool
var bytesRead int
var deadlineTimer *utils.Timer
for bytesRead < len(p) {
@@ -146,23 +148,23 @@ func (s *receiveStream) readImpl(p []byte) (bool, int, error) {
s.dequeueNextFrame()
}
if s.currentFrame == nil && bytesRead > 0 {
return queuedNewControlFrame, bytesRead, s.closeForShutdownErr
return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, s.closeForShutdownErr
}
for {
// Stop waiting on errors
if s.closeForShutdownErr != nil {
return queuedNewControlFrame, bytesRead, s.closeForShutdownErr
return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, s.closeForShutdownErr
}
if s.cancelledRemotely || s.cancelledLocally {
s.errorRead = true
return queuedNewControlFrame, 0, s.cancelErr
return hasStreamWindowUpdate, hasConnWindowUpdate, 0, s.cancelErr
}
deadline := s.deadline
if !deadline.IsZero() {
if !time.Now().Before(deadline) {
return queuedNewControlFrame, bytesRead, errDeadline
return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, errDeadline
}
if deadlineTimer == nil {
deadlineTimer = utils.NewTimer()
@@ -192,10 +194,10 @@ func (s *receiveStream) readImpl(p []byte) (bool, int, error) {
}
if bytesRead > len(p) {
return queuedNewControlFrame, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
}
if s.readPosInFrame > len(s.currentFrame) {
return queuedNewControlFrame, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
}
m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
@@ -205,9 +207,13 @@ func (s *receiveStream) readImpl(p []byte) (bool, int, error) {
// when a RESET_STREAM was received, the flow controller was already
// informed about the final byteOffset for this stream
if !s.cancelledRemotely {
if queueMaxStreamData := s.flowController.AddBytesRead(protocol.ByteCount(m)); queueMaxStreamData {
hasStream, hasConn := s.flowController.AddBytesRead(protocol.ByteCount(m))
if hasStream {
s.queuedMaxStreamData = true
queuedNewControlFrame = true
hasStreamWindowUpdate = true
}
if hasConn {
hasConnWindowUpdate = true
}
}
@@ -217,10 +223,10 @@ func (s *receiveStream) readImpl(p []byte) (bool, int, error) {
s.currentFrameDone()
}
s.errorRead = true
return queuedNewControlFrame, bytesRead, io.EOF
return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, io.EOF
}
}
return queuedNewControlFrame, bytesRead, nil
return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, nil
}
func (s *receiveStream) dequeueNextFrame() {