forked from quic-go/quic-go
By introducing a callback to the stream, which the stream calls as soon as it is completed, we can get rid of checking every single open stream if it is completed.
179 lines
5.2 KiB
Go
179 lines
5.2 KiB
Go
package quic
|
|
|
|
import (
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
|
|
"github.com/lucas-clemente/quic-go/internal/protocol"
|
|
"github.com/lucas-clemente/quic-go/internal/wire"
|
|
)
|
|
|
|
const (
|
|
errorCodeStopping protocol.ApplicationErrorCode = 0
|
|
errorCodeStoppingGQUIC protocol.ApplicationErrorCode = 7
|
|
)
|
|
|
|
// The streamSender is notified by the stream about various events.
|
|
type streamSender interface {
|
|
queueControlFrame(wire.Frame)
|
|
onHasWindowUpdate(protocol.StreamID)
|
|
onHasStreamData(protocol.StreamID)
|
|
onStreamCompleted(protocol.StreamID)
|
|
}
|
|
|
|
// Each of the both stream halves gets its own uniStreamSender.
|
|
// This is necessary in order to keep track when both halves have been completed.
|
|
type uniStreamSender struct {
|
|
streamSender
|
|
onStreamCompletedImpl func()
|
|
}
|
|
|
|
func (s *uniStreamSender) queueControlFrame(f wire.Frame) {
|
|
s.streamSender.queueControlFrame(f)
|
|
}
|
|
|
|
func (s *uniStreamSender) onHasWindowUpdate(id protocol.StreamID) {
|
|
s.streamSender.onHasWindowUpdate(id)
|
|
}
|
|
|
|
func (s *uniStreamSender) onHasStreamData(id protocol.StreamID) {
|
|
s.streamSender.onHasStreamData(id)
|
|
}
|
|
|
|
func (s *uniStreamSender) onStreamCompleted(protocol.StreamID) {
|
|
s.onStreamCompletedImpl()
|
|
}
|
|
|
|
var _ streamSender = &uniStreamSender{}
|
|
|
|
type streamI interface {
|
|
Stream
|
|
|
|
handleStreamFrame(*wire.StreamFrame) error
|
|
handleRstStreamFrame(*wire.RstStreamFrame) error
|
|
handleStopSendingFrame(*wire.StopSendingFrame)
|
|
popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool)
|
|
closeForShutdown(error)
|
|
// methods needed for flow control
|
|
getWindowUpdate() protocol.ByteCount
|
|
handleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
|
|
}
|
|
|
|
// A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
|
|
//
|
|
// Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
|
|
type stream struct {
|
|
receiveStream
|
|
sendStream
|
|
|
|
completedMutex sync.Mutex
|
|
sender streamSender
|
|
receiveStreamCompleted bool
|
|
sendStreamCompleted bool
|
|
|
|
version protocol.VersionNumber
|
|
}
|
|
|
|
var _ Stream = &stream{}
|
|
var _ streamI = &stream{}
|
|
|
|
type deadlineError struct{}
|
|
|
|
func (deadlineError) Error() string { return "deadline exceeded" }
|
|
func (deadlineError) Temporary() bool { return true }
|
|
func (deadlineError) Timeout() bool { return true }
|
|
|
|
var errDeadline net.Error = &deadlineError{}
|
|
|
|
type streamCanceledError struct {
|
|
error
|
|
errorCode protocol.ApplicationErrorCode
|
|
}
|
|
|
|
func (streamCanceledError) Canceled() bool { return true }
|
|
func (e streamCanceledError) ErrorCode() protocol.ApplicationErrorCode { return e.errorCode }
|
|
|
|
var _ StreamError = &streamCanceledError{}
|
|
|
|
// newStream creates a new Stream
|
|
func newStream(streamID protocol.StreamID,
|
|
sender streamSender,
|
|
flowController flowcontrol.StreamFlowController,
|
|
version protocol.VersionNumber,
|
|
) *stream {
|
|
s := &stream{sender: sender}
|
|
senderForSendStream := &uniStreamSender{
|
|
streamSender: sender,
|
|
onStreamCompletedImpl: func() {
|
|
s.completedMutex.Lock()
|
|
s.sendStreamCompleted = true
|
|
s.checkIfCompleted()
|
|
s.completedMutex.Unlock()
|
|
},
|
|
}
|
|
s.sendStream = *newSendStream(streamID, senderForSendStream, flowController, version)
|
|
senderForReceiveStream := &uniStreamSender{
|
|
streamSender: sender,
|
|
onStreamCompletedImpl: func() {
|
|
s.completedMutex.Lock()
|
|
s.receiveStreamCompleted = true
|
|
s.checkIfCompleted()
|
|
s.completedMutex.Unlock()
|
|
},
|
|
}
|
|
s.receiveStream = *newReceiveStream(streamID, senderForReceiveStream, flowController)
|
|
return s
|
|
}
|
|
|
|
// need to define StreamID() here, since both receiveStream and readStream have a StreamID()
|
|
func (s *stream) StreamID() protocol.StreamID {
|
|
// the result is same for receiveStream and sendStream
|
|
return s.sendStream.StreamID()
|
|
}
|
|
|
|
func (s *stream) Close() error {
|
|
if err := s.sendStream.Close(); err != nil {
|
|
return err
|
|
}
|
|
// in gQUIC, we need to send a RST_STREAM with the final offset if CancelRead() was called
|
|
s.receiveStream.onClose(s.sendStream.getWriteOffset())
|
|
return nil
|
|
}
|
|
|
|
func (s *stream) SetDeadline(t time.Time) error {
|
|
_ = s.SetReadDeadline(t) // SetReadDeadline never errors
|
|
_ = s.SetWriteDeadline(t) // SetWriteDeadline never errors
|
|
return nil
|
|
}
|
|
|
|
// CloseForShutdown closes a stream abruptly.
|
|
// It makes Read and Write unblock (and return the error) immediately.
|
|
// The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
|
|
func (s *stream) closeForShutdown(err error) {
|
|
s.sendStream.closeForShutdown(err)
|
|
s.receiveStream.closeForShutdown(err)
|
|
}
|
|
|
|
func (s *stream) handleRstStreamFrame(frame *wire.RstStreamFrame) error {
|
|
if err := s.receiveStream.handleRstStreamFrame(frame); err != nil {
|
|
return err
|
|
}
|
|
if !s.version.UsesIETFFrameFormat() {
|
|
s.handleStopSendingFrame(&wire.StopSendingFrame{
|
|
StreamID: s.StreamID(),
|
|
ErrorCode: frame.ErrorCode,
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkIfCompleted is called from the uniStreamSender, when one of the stream halves is completed.
|
|
// It makes sure that the onStreamCompleted callback is only called if both receive and send side have completed.
|
|
func (s *stream) checkIfCompleted() {
|
|
if s.sendStreamCompleted && s.receiveStreamCompleted {
|
|
s.sender.onStreamCompleted(s.StreamID())
|
|
}
|
|
}
|