explictly expose all method on the Stream (#5214)

This hides the fact that a Stream is composed of a ReceiveStream and a
SendStream. This is an implementation detail and should not be exposed.
This commit is contained in:
Marten Seemann
2025-06-09 17:14:58 +08:00
committed by GitHub
parent 1b07674b19
commit eb08018a5c
4 changed files with 106 additions and 24 deletions

View File

@@ -41,8 +41,8 @@ const (
NoViablePathError = qerr.NoViablePathError
)
// A StreamError is used for Stream.CancelRead and Stream.CancelWrite.
// It is also returned from Stream.Read and Stream.Write if the peer canceled reading or writing.
// A StreamError is used to signal stream cancellations.
// It is returned from the Read and Write methods of the [ReceiveStream], [SendStream] and [Stream].
type StreamError struct {
StreamID StreamID
ErrorCode StreamErrorCode

View File

@@ -49,7 +49,10 @@ type ReceiveStream struct {
flowController flowcontrol.StreamFlowController
}
var _ streamControlFrameGetter = &ReceiveStream{}
var (
_ streamControlFrameGetter = &ReceiveStream{}
_ receiveStreamFrameHandler = &ReceiveStream{}
)
func newReceiveStream(
streamID protocol.StreamID,
@@ -73,8 +76,8 @@ func (s *ReceiveStream) StreamID() protocol.StreamID {
}
// Read reads data from the stream.
// Read can be made to time out using SetDeadline and SetReadDeadline.
// If the stream was canceled, the error is a StreamError.
// Read can be made to time out using [ReceiveStream.SetReadDeadline].
// If the stream was canceled, the error is a [StreamError].
func (s *ReceiveStream) Read(p []byte) (int, error) {
// Concurrent use of Read is not permitted (and doesn't make any sense),
// but sometimes people do it anyway.
@@ -233,7 +236,7 @@ func (s *ReceiveStream) dequeueNextFrame() {
}
// CancelRead aborts receiving on this stream.
// It will ask the peer to stop transmitting stream data.
// It instructs the peer to stop transmitting stream data.
// Read will unblock immediately, and future Read calls will fail.
// When called multiple times or after reading the io.EOF it is a no-op.
func (s *ReceiveStream) CancelRead(errorCode StreamErrorCode) {

View File

@@ -54,7 +54,11 @@ type SendStream struct {
flowController flowcontrol.StreamFlowController
}
var _ streamControlFrameGetter = &SendStream{}
var (
_ streamControlFrameGetter = &SendStream{}
_ outgoingStream = &SendStream{}
_ sendStreamFrameHandler = &SendStream{}
)
func newSendStream(
ctx context.Context,
@@ -79,8 +83,8 @@ func (s *SendStream) StreamID() StreamID {
}
// Write writes data to the stream.
// Write can be made to time out using SetDeadline and SetWriteDeadline.
// If the stream was canceled, the error is a StreamError.
// Write can be made to time out using [SendStream.SetWriteDeadline].
// If the stream was canceled, the error is a [StreamError].
func (s *SendStream) Write(p []byte) (int, error) {
// Concurrent use of Write is not permitted (and doesn't make any sense),
// but sometimes people do it anyway.
@@ -414,8 +418,8 @@ func (s *SendStream) Close() error {
// Data already written, but not yet delivered to the peer is not guaranteed to be delivered reliably.
// Write will unblock immediately, and future calls to Write will fail.
// When called multiple times it is a no-op.
// When called after Close, it aborts delivery. Note that there is no guarantee if
// the peer will receive the FIN or the reset first.
// When called after Close, it aborts reliable delivery of outstanding stream data.
// Note that there is no guarantee if the peer will receive the FIN or the cancellation error first.
func (s *SendStream) CancelWrite(errorCode StreamErrorCode) {
s.cancelWrite(errorCode, false)
}

101
stream.go
View File

@@ -10,6 +10,7 @@ import (
"github.com/quic-go/quic-go/internal/ackhandler"
"github.com/quic-go/quic-go/internal/flowcontrol"
"github.com/quic-go/quic-go/internal/protocol"
"github.com/quic-go/quic-go/internal/wire"
)
type deadlineError struct{}
@@ -49,8 +50,8 @@ func (s *uniStreamSender) onHasStreamControlFrame(id protocol.StreamID, str stre
var _ streamSender = &uniStreamSender{}
type Stream struct {
*ReceiveStream
*SendStream
receiveStr *ReceiveStream
sendStr *SendStream
completedMutex sync.Mutex
sender streamSender
@@ -58,6 +59,12 @@ type Stream struct {
sendStreamCompleted bool
}
var (
_ outgoingStream = &Stream{}
_ sendStreamFrameHandler = &Stream{}
_ receiveStreamFrameHandler = &Stream{}
)
// newStream creates a new Stream
func newStream(
ctx context.Context,
@@ -78,7 +85,7 @@ func newStream(
sender.onHasStreamControlFrame(streamID, s)
},
}
s.SendStream = newSendStream(ctx, streamID, senderForSendStream, flowController)
s.sendStr = newSendStream(ctx, streamID, senderForSendStream, flowController)
senderForReceiveStream := &uniStreamSender{
streamSender: sender,
onStreamCompletedImpl: func() {
@@ -91,31 +98,99 @@ func newStream(
sender.onHasStreamControlFrame(streamID, s)
},
}
s.ReceiveStream = newReceiveStream(streamID, senderForReceiveStream, flowController)
s.receiveStr = newReceiveStream(streamID, senderForReceiveStream, flowController)
return s
}
// need to define StreamID() here, since both receiveStream and readStream have a StreamID()
// StreamID returns the stream ID.
func (s *Stream) StreamID() protocol.StreamID {
// the result is same for receiveStream and sendStream
return s.SendStream.StreamID()
return s.sendStr.StreamID()
}
// Read reads data from the stream.
// Read can be made to time out using [Stream.SetReadDeadline] and [Stream.SetDeadline].
// If the stream was canceled, the error is a [StreamError].
func (s *Stream) Read(p []byte) (int, error) {
return s.receiveStr.Read(p)
}
// Write writes data to the stream.
// Write can be made to time out using [Stream.SetWriteDeadline] or [Stream.SetDeadline].
// If the stream was canceled, the error is a [StreamError].
func (s *Stream) Write(p []byte) (int, error) {
return s.sendStr.Write(p)
}
// CancelWrite aborts sending on this stream.
// See [SendStream.CancelWrite] for more details.
func (s *Stream) CancelWrite(errorCode StreamErrorCode) {
s.sendStr.CancelWrite(errorCode)
}
// CancelRead aborts receiving on this stream.
// See [ReceiveStream.CancelRead] for more details.
func (s *Stream) CancelRead(errorCode StreamErrorCode) {
s.receiveStr.CancelRead(errorCode)
}
// The Context is canceled as soon as the write-side of the stream is closed.
// See [SendStream.Context] for more details.
func (s *Stream) Context() context.Context {
return s.sendStr.Context()
}
// Close closes the send-direction of the stream.
// It does not close the receive-direction of the stream.
func (s *Stream) Close() error {
return s.SendStream.Close()
return s.sendStr.Close()
}
func (s *Stream) handleResetStreamFrame(frame *wire.ResetStreamFrame, rcvTime time.Time) error {
return s.receiveStr.handleResetStreamFrame(frame, rcvTime)
}
func (s *Stream) handleStreamFrame(frame *wire.StreamFrame, rcvTime time.Time) error {
return s.receiveStr.handleStreamFrame(frame, rcvTime)
}
func (s *Stream) handleStopSendingFrame(frame *wire.StopSendingFrame) {
s.sendStr.handleStopSendingFrame(frame)
}
func (s *Stream) updateSendWindow(limit protocol.ByteCount) {
s.sendStr.updateSendWindow(limit)
}
func (s *Stream) popStreamFrame(maxBytes protocol.ByteCount, v protocol.Version) (_ ackhandler.StreamFrame, _ *wire.StreamDataBlockedFrame, hasMore bool) {
return s.sendStr.popStreamFrame(maxBytes, v)
}
func (s *Stream) getControlFrame(now time.Time) (_ ackhandler.Frame, ok, hasMore bool) {
f, ok, _ := s.SendStream.getControlFrame(now)
f, ok, _ := s.sendStr.getControlFrame(now)
if ok {
return f, true, true
}
return s.ReceiveStream.getControlFrame(now)
return s.receiveStr.getControlFrame(now)
}
// SetReadDeadline sets the deadline for future Read calls.
// See [ReceiveStream.SetReadDeadline] for more details.
func (s *Stream) SetReadDeadline(t time.Time) error {
return s.receiveStr.SetReadDeadline(t)
}
// SetWriteDeadline sets the deadline for future Write calls.
// See [SendStream.SetWriteDeadline] for more details.
func (s *Stream) SetWriteDeadline(t time.Time) error {
return s.sendStr.SetWriteDeadline(t)
}
// SetDeadline sets the read and write deadlines associated with the stream.
// It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
func (s *Stream) SetDeadline(t time.Time) error {
_ = s.SetReadDeadline(t) // SetReadDeadline never errors
_ = s.SetWriteDeadline(t) // SetWriteDeadline never errors
_ = s.receiveStr.SetReadDeadline(t) // SetReadDeadline never errors
_ = s.sendStr.SetWriteDeadline(t) // SetWriteDeadline never errors
return nil
}
@@ -123,8 +198,8 @@ func (s *Stream) SetDeadline(t time.Time) error {
// It makes Read and Write unblock (and return the error) immediately.
// The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
func (s *Stream) closeForShutdown(err error) {
s.SendStream.closeForShutdown(err)
s.ReceiveStream.closeForShutdown(err)
s.sendStr.closeForShutdown(err)
s.receiveStr.closeForShutdown(err)
}
// checkIfCompleted is called from the uniStreamSender, when one of the stream halves is completed.