forked from quic-go/quic-go
surface stream error as stream context cancelation cause (#3970)
* send stream: surface error as stream context cancellation cause * Update send_stream_test.go Co-authored-by: Marten Seemann <martenseemann@gmail.com> --------- Co-authored-by: Marten Seemann <martenseemann@gmail.com>
This commit is contained in:
@@ -122,6 +122,8 @@ type SendStream interface {
|
|||||||
// The Context is canceled as soon as the write-side of the stream is closed.
|
// The Context is canceled as soon as the write-side of the stream is closed.
|
||||||
// This happens when Close() or CancelWrite() is called, or when the peer
|
// This happens when Close() or CancelWrite() is called, or when the peer
|
||||||
// cancels the read-side of their stream.
|
// cancels the read-side of their stream.
|
||||||
|
// The cancellation cause is set to the error that caused the stream to
|
||||||
|
// close, or `context.Canceled` in case the stream is closed without error.
|
||||||
Context() context.Context
|
Context() context.Context
|
||||||
// SetWriteDeadline sets the deadline for future Write calls
|
// SetWriteDeadline sets the deadline for future Write calls
|
||||||
// and any currently-blocked Write call.
|
// and any currently-blocked Write call.
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ type sendStream struct {
|
|||||||
retransmissionQueue []*wire.StreamFrame
|
retransmissionQueue []*wire.StreamFrame
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
ctxCancel context.CancelFunc
|
ctxCancel context.CancelCauseFunc
|
||||||
|
|
||||||
streamID protocol.StreamID
|
streamID protocol.StreamID
|
||||||
sender streamSender
|
sender streamSender
|
||||||
@@ -71,7 +71,7 @@ func newSendStream(
|
|||||||
writeChan: make(chan struct{}, 1),
|
writeChan: make(chan struct{}, 1),
|
||||||
writeOnce: make(chan struct{}, 1), // cap: 1, to protect against concurrent use of Write
|
writeOnce: make(chan struct{}, 1), // cap: 1, to protect against concurrent use of Write
|
||||||
}
|
}
|
||||||
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
|
s.ctx, s.ctxCancel = context.WithCancelCause(context.Background())
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -366,7 +366,7 @@ func (s *sendStream) Close() error {
|
|||||||
s.mutex.Unlock()
|
s.mutex.Unlock()
|
||||||
return fmt.Errorf("close called for canceled stream %d", s.streamID)
|
return fmt.Errorf("close called for canceled stream %d", s.streamID)
|
||||||
}
|
}
|
||||||
s.ctxCancel()
|
s.ctxCancel(nil)
|
||||||
s.finishedWriting = true
|
s.finishedWriting = true
|
||||||
s.mutex.Unlock()
|
s.mutex.Unlock()
|
||||||
|
|
||||||
@@ -385,8 +385,8 @@ func (s *sendStream) cancelWriteImpl(errorCode qerr.StreamErrorCode, remote bool
|
|||||||
s.mutex.Unlock()
|
s.mutex.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.ctxCancel()
|
|
||||||
s.cancelWriteErr = &StreamError{StreamID: s.streamID, ErrorCode: errorCode, Remote: remote}
|
s.cancelWriteErr = &StreamError{StreamID: s.streamID, ErrorCode: errorCode, Remote: remote}
|
||||||
|
s.ctxCancel(s.cancelWriteErr)
|
||||||
s.numOutstandingFrames = 0
|
s.numOutstandingFrames = 0
|
||||||
s.retransmissionQueue = nil
|
s.retransmissionQueue = nil
|
||||||
newlyCompleted := s.isNewlyCompleted()
|
newlyCompleted := s.isNewlyCompleted()
|
||||||
@@ -435,7 +435,7 @@ func (s *sendStream) SetWriteDeadline(t time.Time) error {
|
|||||||
// The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
|
// The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
|
||||||
func (s *sendStream) closeForShutdown(err error) {
|
func (s *sendStream) closeForShutdown(err error) {
|
||||||
s.mutex.Lock()
|
s.mutex.Lock()
|
||||||
s.ctxCancel()
|
s.ctxCancel(err)
|
||||||
s.closeForShutdownErr = err
|
s.closeForShutdownErr = err
|
||||||
s.mutex.Unlock()
|
s.mutex.Unlock()
|
||||||
s.signalWrite()
|
s.signalWrite()
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package quic
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
mrand "math/rand"
|
mrand "math/rand"
|
||||||
@@ -318,6 +319,7 @@ var _ = Describe("Send Stream", func() {
|
|||||||
Expect(str.Context().Done()).ToNot(BeClosed())
|
Expect(str.Context().Done()).ToNot(BeClosed())
|
||||||
Expect(str.Close()).To(Succeed())
|
Expect(str.Close()).To(Succeed())
|
||||||
Expect(str.Context().Done()).To(BeClosed())
|
Expect(str.Context().Done()).To(BeClosed())
|
||||||
|
Expect(context.Cause(str.Context())).To(MatchError(context.Canceled))
|
||||||
})
|
})
|
||||||
|
|
||||||
Context("flow control blocking", func() {
|
Context("flow control blocking", func() {
|
||||||
@@ -668,6 +670,7 @@ var _ = Describe("Send Stream", func() {
|
|||||||
Expect(str.Context().Done()).ToNot(BeClosed())
|
Expect(str.Context().Done()).ToNot(BeClosed())
|
||||||
str.closeForShutdown(testErr)
|
str.closeForShutdown(testErr)
|
||||||
Expect(str.Context().Done()).To(BeClosed())
|
Expect(str.Context().Done()).To(BeClosed())
|
||||||
|
Expect(context.Cause(str.Context())).To(MatchError(testErr))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@@ -846,6 +849,8 @@ var _ = Describe("Send Stream", func() {
|
|||||||
Expect(str.Context().Done()).ToNot(BeClosed())
|
Expect(str.Context().Done()).ToNot(BeClosed())
|
||||||
str.CancelWrite(1234)
|
str.CancelWrite(1234)
|
||||||
Expect(str.Context().Done()).To(BeClosed())
|
Expect(str.Context().Done()).To(BeClosed())
|
||||||
|
Expect(context.Cause(str.Context())).To(BeAssignableToTypeOf(&StreamError{}))
|
||||||
|
Expect(context.Cause(str.Context()).(*StreamError).ErrorCode).To(Equal(StreamErrorCode(1234)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("doesn't allow further calls to Write", func() {
|
It("doesn't allow further calls to Write", func() {
|
||||||
|
|||||||
Reference in New Issue
Block a user