convert ReceiveStream interface to a struct (#5173)

This commit is contained in:
Marten Seemann
2025-06-01 12:08:18 +08:00
committed by GitHub
parent 78e77bcfdb
commit 1169d97e81
19 changed files with 99 additions and 420 deletions

View File

@@ -14,15 +14,8 @@ import (
"github.com/quic-go/quic-go/internal/wire"
)
type receiveStreamI interface {
ReceiveStream
handleStreamFrame(*wire.StreamFrame, time.Time) error
handleResetStreamFrame(*wire.ResetStreamFrame, time.Time) error
closeForShutdown(error)
}
type receiveStream struct {
// A ReceiveStream is a unidirectional Receive Stream.
type ReceiveStream struct {
mutex sync.Mutex
streamID protocol.StreamID
@@ -56,18 +49,14 @@ type receiveStream struct {
flowController flowcontrol.StreamFlowController
}
var (
_ ReceiveStream = &receiveStream{}
_ receiveStreamI = &receiveStream{}
_ streamControlFrameGetter = &receiveStream{}
)
var _ streamControlFrameGetter = &ReceiveStream{}
func newReceiveStream(
streamID protocol.StreamID,
sender streamSender,
flowController flowcontrol.StreamFlowController,
) *receiveStream {
return &receiveStream{
) *ReceiveStream {
return &ReceiveStream{
streamID: streamID,
sender: sender,
flowController: flowController,
@@ -78,12 +67,15 @@ func newReceiveStream(
}
}
func (s *receiveStream) StreamID() protocol.StreamID {
// StreamID returns the stream ID.
func (s *ReceiveStream) StreamID() protocol.StreamID {
return s.streamID
}
// Read implements io.Reader. It is not thread safe!
func (s *receiveStream) Read(p []byte) (int, error) {
// Read reads data from the stream.
// Read can be made to time out using SetDeadline and SetReadDeadline.
// If the stream was canceled, the error is a StreamError.
func (s *ReceiveStream) Read(p []byte) (int, error) {
// Concurrent use of Read is not permitted (and doesn't make any sense),
// but sometimes people do it anyway.
// Make sure that we only execute one call at any given time to avoid hard to debug failures.
@@ -107,7 +99,7 @@ func (s *receiveStream) Read(p []byte) (int, error) {
return n, err
}
func (s *receiveStream) isNewlyCompleted() bool {
func (s *ReceiveStream) isNewlyCompleted() bool {
if s.completed {
return false
}
@@ -128,7 +120,7 @@ func (s *receiveStream) isNewlyCompleted() bool {
return false
}
func (s *receiveStream) readImpl(p []byte) (hasStreamWindowUpdate bool, hasConnWindowUpdate bool, _ int, _ error) {
func (s *ReceiveStream) readImpl(p []byte) (hasStreamWindowUpdate bool, hasConnWindowUpdate bool, _ int, _ error) {
if s.currentFrameIsLast && s.currentFrame == nil {
s.errorRead = true
return false, false, 0, io.EOF
@@ -229,7 +221,7 @@ func (s *receiveStream) readImpl(p []byte) (hasStreamWindowUpdate bool, hasConnW
return hasStreamWindowUpdate, hasConnWindowUpdate, bytesRead, nil
}
func (s *receiveStream) dequeueNextFrame() {
func (s *ReceiveStream) dequeueNextFrame() {
var offset protocol.ByteCount
// We're done with the last frame. Release the buffer.
if s.currentFrameDone != nil {
@@ -240,7 +232,11 @@ func (s *receiveStream) dequeueNextFrame() {
s.readPosInFrame = 0
}
func (s *receiveStream) CancelRead(errorCode StreamErrorCode) {
// CancelRead aborts receiving on this stream.
// It will ask the peer to stop transmitting stream data.
// Read will unblock immediately, and future Read calls will fail.
// When called multiple times or after reading the io.EOF it is a no-op.
func (s *ReceiveStream) CancelRead(errorCode StreamErrorCode) {
s.mutex.Lock()
queuedNewControlFrame := s.cancelReadImpl(errorCode)
completed := s.isNewlyCompleted()
@@ -255,7 +251,7 @@ func (s *receiveStream) CancelRead(errorCode StreamErrorCode) {
}
}
func (s *receiveStream) cancelReadImpl(errorCode qerr.StreamErrorCode) (queuedNewControlFrame bool) {
func (s *ReceiveStream) cancelReadImpl(errorCode qerr.StreamErrorCode) (queuedNewControlFrame bool) {
if s.cancelledLocally { // duplicate call to CancelRead
return false
}
@@ -272,7 +268,7 @@ func (s *receiveStream) cancelReadImpl(errorCode qerr.StreamErrorCode) (queuedNe
return true
}
func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame, now time.Time) error {
func (s *ReceiveStream) handleStreamFrame(frame *wire.StreamFrame, now time.Time) error {
s.mutex.Lock()
err := s.handleStreamFrameImpl(frame, now)
completed := s.isNewlyCompleted()
@@ -285,7 +281,7 @@ func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame, now time.Time
return err
}
func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame, now time.Time) error {
func (s *ReceiveStream) handleStreamFrameImpl(frame *wire.StreamFrame, now time.Time) error {
maxOffset := frame.Offset + frame.DataLen()
if err := s.flowController.UpdateHighestReceived(maxOffset, frame.Fin, now); err != nil {
return err
@@ -303,7 +299,7 @@ func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame, now time.
return nil
}
func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame, now time.Time) error {
func (s *ReceiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame, now time.Time) error {
s.mutex.Lock()
err := s.handleResetStreamFrameImpl(frame, now)
completed := s.isNewlyCompleted()
@@ -315,7 +311,7 @@ func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame, now
return err
}
func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame, now time.Time) error {
func (s *ReceiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame, now time.Time) error {
if s.closeForShutdownErr != nil {
return nil
}
@@ -339,7 +335,7 @@ func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame,
return nil
}
func (s *receiveStream) getControlFrame(now time.Time) (_ ackhandler.Frame, ok, hasMore bool) {
func (s *ReceiveStream) getControlFrame(now time.Time) (_ ackhandler.Frame, ok, hasMore bool) {
s.mutex.Lock()
defer s.mutex.Unlock()
@@ -362,7 +358,10 @@ func (s *receiveStream) getControlFrame(now time.Time) (_ ackhandler.Frame, ok,
}, true, false
}
func (s *receiveStream) SetReadDeadline(t time.Time) error {
// SetReadDeadline sets the deadline for future Read calls and
// any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (s *ReceiveStream) SetReadDeadline(t time.Time) error {
s.mutex.Lock()
s.deadline = t
s.mutex.Unlock()
@@ -373,7 +372,7 @@ func (s *receiveStream) SetReadDeadline(t time.Time) error {
// CloseForShutdown closes a stream abruptly.
// It makes Read unblock (and return the error) immediately.
// The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET.
func (s *receiveStream) closeForShutdown(err error) {
func (s *ReceiveStream) closeForShutdown(err error) {
s.mutex.Lock()
s.closeForShutdownErr = err
s.mutex.Unlock()
@@ -381,7 +380,7 @@ func (s *receiveStream) closeForShutdown(err error) {
}
// signalRead performs a non-blocking send on the readChan
func (s *receiveStream) signalRead() {
func (s *ReceiveStream) signalRead() {
select {
case s.readChan <- struct{}{}:
default: