diff --git a/errors.go b/errors.go index 4a69a7f19..b65ef83f8 100644 --- a/errors.go +++ b/errors.go @@ -41,8 +41,8 @@ const ( NoViablePathError = qerr.NoViablePathError ) -// A StreamError is used for Stream.CancelRead and Stream.CancelWrite. -// It is also returned from Stream.Read and Stream.Write if the peer canceled reading or writing. +// A StreamError is used to signal stream cancellations. +// It is returned from the Read and Write methods of the [ReceiveStream], [SendStream] and [Stream]. type StreamError struct { StreamID StreamID ErrorCode StreamErrorCode diff --git a/receive_stream.go b/receive_stream.go index 246bdac4d..3b2d618ce 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -49,7 +49,10 @@ type ReceiveStream struct { flowController flowcontrol.StreamFlowController } -var _ streamControlFrameGetter = &ReceiveStream{} +var ( + _ streamControlFrameGetter = &ReceiveStream{} + _ receiveStreamFrameHandler = &ReceiveStream{} +) func newReceiveStream( streamID protocol.StreamID, @@ -73,8 +76,8 @@ func (s *ReceiveStream) StreamID() protocol.StreamID { } // Read reads data from the stream. -// Read can be made to time out using SetDeadline and SetReadDeadline. -// If the stream was canceled, the error is a StreamError. +// Read can be made to time out using [ReceiveStream.SetReadDeadline]. +// If the stream was canceled, the error is a [StreamError]. func (s *ReceiveStream) Read(p []byte) (int, error) { // Concurrent use of Read is not permitted (and doesn't make any sense), // but sometimes people do it anyway. @@ -233,7 +236,7 @@ func (s *ReceiveStream) dequeueNextFrame() { } // CancelRead aborts receiving on this stream. -// It will ask the peer to stop transmitting stream data. +// It instructs the peer to stop transmitting stream data. // Read will unblock immediately, and future Read calls will fail. // When called multiple times or after reading the io.EOF it is a no-op. func (s *ReceiveStream) CancelRead(errorCode StreamErrorCode) { diff --git a/send_stream.go b/send_stream.go index b7424cbaf..0c76ee5e3 100644 --- a/send_stream.go +++ b/send_stream.go @@ -54,7 +54,11 @@ type SendStream struct { flowController flowcontrol.StreamFlowController } -var _ streamControlFrameGetter = &SendStream{} +var ( + _ streamControlFrameGetter = &SendStream{} + _ outgoingStream = &SendStream{} + _ sendStreamFrameHandler = &SendStream{} +) func newSendStream( ctx context.Context, @@ -79,8 +83,8 @@ func (s *SendStream) StreamID() StreamID { } // Write writes data to the stream. -// Write can be made to time out using SetDeadline and SetWriteDeadline. -// If the stream was canceled, the error is a StreamError. +// Write can be made to time out using [SendStream.SetWriteDeadline]. +// If the stream was canceled, the error is a [StreamError]. func (s *SendStream) Write(p []byte) (int, error) { // Concurrent use of Write is not permitted (and doesn't make any sense), // but sometimes people do it anyway. @@ -414,8 +418,8 @@ func (s *SendStream) Close() error { // Data already written, but not yet delivered to the peer is not guaranteed to be delivered reliably. // Write will unblock immediately, and future calls to Write will fail. // When called multiple times it is a no-op. -// When called after Close, it aborts delivery. Note that there is no guarantee if -// the peer will receive the FIN or the reset first. +// When called after Close, it aborts reliable delivery of outstanding stream data. +// Note that there is no guarantee if the peer will receive the FIN or the cancellation error first. func (s *SendStream) CancelWrite(errorCode StreamErrorCode) { s.cancelWrite(errorCode, false) } diff --git a/stream.go b/stream.go index 9b18b9305..9c5a40e2a 100644 --- a/stream.go +++ b/stream.go @@ -10,6 +10,7 @@ import ( "github.com/quic-go/quic-go/internal/ackhandler" "github.com/quic-go/quic-go/internal/flowcontrol" "github.com/quic-go/quic-go/internal/protocol" + "github.com/quic-go/quic-go/internal/wire" ) type deadlineError struct{} @@ -49,8 +50,8 @@ func (s *uniStreamSender) onHasStreamControlFrame(id protocol.StreamID, str stre var _ streamSender = &uniStreamSender{} type Stream struct { - *ReceiveStream - *SendStream + receiveStr *ReceiveStream + sendStr *SendStream completedMutex sync.Mutex sender streamSender @@ -58,6 +59,12 @@ type Stream struct { sendStreamCompleted bool } +var ( + _ outgoingStream = &Stream{} + _ sendStreamFrameHandler = &Stream{} + _ receiveStreamFrameHandler = &Stream{} +) + // newStream creates a new Stream func newStream( ctx context.Context, @@ -78,7 +85,7 @@ func newStream( sender.onHasStreamControlFrame(streamID, s) }, } - s.SendStream = newSendStream(ctx, streamID, senderForSendStream, flowController) + s.sendStr = newSendStream(ctx, streamID, senderForSendStream, flowController) senderForReceiveStream := &uniStreamSender{ streamSender: sender, onStreamCompletedImpl: func() { @@ -91,31 +98,99 @@ func newStream( sender.onHasStreamControlFrame(streamID, s) }, } - s.ReceiveStream = newReceiveStream(streamID, senderForReceiveStream, flowController) + s.receiveStr = newReceiveStream(streamID, senderForReceiveStream, flowController) return s } -// need to define StreamID() here, since both receiveStream and readStream have a StreamID() +// StreamID returns the stream ID. func (s *Stream) StreamID() protocol.StreamID { // the result is same for receiveStream and sendStream - return s.SendStream.StreamID() + return s.sendStr.StreamID() } +// Read reads data from the stream. +// Read can be made to time out using [Stream.SetReadDeadline] and [Stream.SetDeadline]. +// If the stream was canceled, the error is a [StreamError]. +func (s *Stream) Read(p []byte) (int, error) { + return s.receiveStr.Read(p) +} + +// Write writes data to the stream. +// Write can be made to time out using [Stream.SetWriteDeadline] or [Stream.SetDeadline]. +// If the stream was canceled, the error is a [StreamError]. +func (s *Stream) Write(p []byte) (int, error) { + return s.sendStr.Write(p) +} + +// CancelWrite aborts sending on this stream. +// See [SendStream.CancelWrite] for more details. +func (s *Stream) CancelWrite(errorCode StreamErrorCode) { + s.sendStr.CancelWrite(errorCode) +} + +// CancelRead aborts receiving on this stream. +// See [ReceiveStream.CancelRead] for more details. +func (s *Stream) CancelRead(errorCode StreamErrorCode) { + s.receiveStr.CancelRead(errorCode) +} + +// The Context is canceled as soon as the write-side of the stream is closed. +// See [SendStream.Context] for more details. +func (s *Stream) Context() context.Context { + return s.sendStr.Context() +} + +// Close closes the send-direction of the stream. +// It does not close the receive-direction of the stream. func (s *Stream) Close() error { - return s.SendStream.Close() + return s.sendStr.Close() +} + +func (s *Stream) handleResetStreamFrame(frame *wire.ResetStreamFrame, rcvTime time.Time) error { + return s.receiveStr.handleResetStreamFrame(frame, rcvTime) +} + +func (s *Stream) handleStreamFrame(frame *wire.StreamFrame, rcvTime time.Time) error { + return s.receiveStr.handleStreamFrame(frame, rcvTime) +} + +func (s *Stream) handleStopSendingFrame(frame *wire.StopSendingFrame) { + s.sendStr.handleStopSendingFrame(frame) +} + +func (s *Stream) updateSendWindow(limit protocol.ByteCount) { + s.sendStr.updateSendWindow(limit) +} + +func (s *Stream) popStreamFrame(maxBytes protocol.ByteCount, v protocol.Version) (_ ackhandler.StreamFrame, _ *wire.StreamDataBlockedFrame, hasMore bool) { + return s.sendStr.popStreamFrame(maxBytes, v) } func (s *Stream) getControlFrame(now time.Time) (_ ackhandler.Frame, ok, hasMore bool) { - f, ok, _ := s.SendStream.getControlFrame(now) + f, ok, _ := s.sendStr.getControlFrame(now) if ok { return f, true, true } - return s.ReceiveStream.getControlFrame(now) + return s.receiveStr.getControlFrame(now) } +// SetReadDeadline sets the deadline for future Read calls. +// See [ReceiveStream.SetReadDeadline] for more details. +func (s *Stream) SetReadDeadline(t time.Time) error { + return s.receiveStr.SetReadDeadline(t) +} + +// SetWriteDeadline sets the deadline for future Write calls. +// See [SendStream.SetWriteDeadline] for more details. +func (s *Stream) SetWriteDeadline(t time.Time) error { + return s.sendStr.SetWriteDeadline(t) +} + +// SetDeadline sets the read and write deadlines associated with the stream. +// It is equivalent to calling both SetReadDeadline and SetWriteDeadline. func (s *Stream) SetDeadline(t time.Time) error { - _ = s.SetReadDeadline(t) // SetReadDeadline never errors - _ = s.SetWriteDeadline(t) // SetWriteDeadline never errors + _ = s.receiveStr.SetReadDeadline(t) // SetReadDeadline never errors + _ = s.sendStr.SetWriteDeadline(t) // SetWriteDeadline never errors return nil } @@ -123,8 +198,8 @@ func (s *Stream) SetDeadline(t time.Time) error { // It makes Read and Write unblock (and return the error) immediately. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST. func (s *Stream) closeForShutdown(err error) { - s.SendStream.closeForShutdown(err) - s.ReceiveStream.closeForShutdown(err) + s.sendStr.closeForShutdown(err) + s.receiveStr.closeForShutdown(err) } // checkIfCompleted is called from the uniStreamSender, when one of the stream halves is completed.