forked from quic-go/quic-go
mark stream completed when CancelWrite is called after receiving STOP_SENDING (#4605)
* garbage collect stream when CancelWrite is called after receiving STOP_SENDING * review comments * Hold lock while checking isNewlyCompleted --------- Co-authored-by: Marco Munizaga <git@marcopolo.io>
This commit is contained in:
@@ -379,7 +379,7 @@ func (s *sendStream) isNewlyCompleted() bool {
|
|||||||
// 1. the application called CancelWrite, or
|
// 1. the application called CancelWrite, or
|
||||||
// 2. we received a STOP_SENDING, and
|
// 2. we received a STOP_SENDING, and
|
||||||
// * the application consumed the error via Write, or
|
// * the application consumed the error via Write, or
|
||||||
// * the application called CLsoe
|
// * the application called Close
|
||||||
if s.cancelWriteErr != nil && (s.cancellationFlagged || s.finishedWriting) {
|
if s.cancelWriteErr != nil && (s.cancellationFlagged || s.finishedWriting) {
|
||||||
s.completed = true
|
s.completed = true
|
||||||
return true
|
return true
|
||||||
@@ -421,6 +421,17 @@ func (s *sendStream) cancelWriteImpl(errorCode qerr.StreamErrorCode, remote bool
|
|||||||
s.mutex.Lock()
|
s.mutex.Lock()
|
||||||
if !remote {
|
if !remote {
|
||||||
s.cancellationFlagged = true
|
s.cancellationFlagged = true
|
||||||
|
if s.cancelWriteErr != nil {
|
||||||
|
completed := s.isNewlyCompleted()
|
||||||
|
s.mutex.Unlock()
|
||||||
|
// The user has called CancelWrite. If the previous cancellation was
|
||||||
|
// because of a STOP_SENDING, we don't need to flag the error to the
|
||||||
|
// user any more.
|
||||||
|
if completed {
|
||||||
|
s.sender.onStreamCompleted(s.streamID)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if s.cancelWriteErr != nil {
|
if s.cancelWriteErr != nil {
|
||||||
s.mutex.Unlock()
|
s.mutex.Unlock()
|
||||||
|
|||||||
@@ -909,6 +909,20 @@ var _ = Describe("Send Stream", func() {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("discards the stream when CancelWrite is called after receiving STOP_SENDING", func() {
|
||||||
|
mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{
|
||||||
|
StreamID: streamID,
|
||||||
|
ErrorCode: 101,
|
||||||
|
})
|
||||||
|
str.handleStopSendingFrame(&wire.StopSendingFrame{
|
||||||
|
StreamID: streamID,
|
||||||
|
ErrorCode: 101,
|
||||||
|
})
|
||||||
|
|
||||||
|
mockSender.EXPECT().onStreamCompleted(gomock.Any())
|
||||||
|
str.CancelWrite(101)
|
||||||
|
})
|
||||||
|
|
||||||
It("unblocks Write", func() {
|
It("unblocks Write", func() {
|
||||||
mockSender.EXPECT().onHasStreamData(streamID, str)
|
mockSender.EXPECT().onHasStreamData(streamID, str)
|
||||||
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
mockSender.EXPECT().queueControlFrame(gomock.Any())
|
||||||
|
|||||||
Reference in New Issue
Block a user