diff --git a/receive_stream.go b/receive_stream.go index 491cd85bb..9f06f21fa 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -84,7 +84,7 @@ func (s *receiveStream) Read(p []byte) (int, error) { s.mutex.Unlock() if completed { - s.streamCompleted() + s.sender.onStreamCompleted(s.streamID) } return n, err } @@ -201,7 +201,8 @@ func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) { s.mutex.Unlock() if completed { - s.streamCompleted() + s.flowController.Abandon() + s.sender.onStreamCompleted(s.streamID) } } @@ -226,7 +227,8 @@ func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error { s.mutex.Unlock() if completed { - s.streamCompleted() + s.flowController.Abandon() + s.sender.onStreamCompleted(s.streamID) } return err } @@ -236,11 +238,13 @@ func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil { return false, err } + var newlyRcvdFinalOffset bool if frame.FinBit { + newlyRcvdFinalOffset = s.finalOffset == protocol.MaxByteCount s.finalOffset = maxOffset } if s.canceledRead { - return frame.FinBit, nil + return newlyRcvdFinalOffset, nil } if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.PutBack); err != nil { return false, err @@ -255,7 +259,8 @@ func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) err s.mutex.Unlock() if completed { - s.streamCompleted() + s.flowController.Abandon() + s.sender.onStreamCompleted(s.streamID) } return err } @@ -267,6 +272,7 @@ func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil { return false, err } + newlyRcvdFinalOffset := s.finalOffset == protocol.MaxByteCount s.finalOffset = frame.ByteOffset // ignore duplicate RESET_STREAM frames for this stream (after checking their final offset) @@ -279,7 +285,7 @@ func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) error: fmt.Errorf("stream %d was reset with error code %d", s.streamID, frame.ErrorCode), } s.signalRead() - return true, nil + return newlyRcvdFinalOffset, nil } func (s *receiveStream) CloseRemote(offset protocol.ByteCount) { @@ -309,17 +315,6 @@ func (s *receiveStream) getWindowUpdate() protocol.ByteCount { return s.flowController.GetWindowUpdate() } -func (s *receiveStream) streamCompleted() { - s.mutex.Lock() - finRead := s.finRead - s.mutex.Unlock() - - if !finRead { - s.flowController.Abandon() - } - s.sender.onStreamCompleted(s.streamID) -} - // signalRead performs a non-blocking send on the readChan func (s *receiveStream) signalRead() { select { diff --git a/receive_stream_test.go b/receive_stream_test.go index 2ca9d9ede..479b600fd 100644 --- a/receive_stream_test.go +++ b/receive_stream_test.go @@ -538,6 +538,25 @@ var _ = Describe("Receive Stream", func() { FinBit: true, })).To(Succeed()) }) + + It("handles duplicate FinBits after the stream was canceled", func() { + mockSender.EXPECT().queueControlFrame(gomock.Any()) + str.CancelRead(1234) + gomock.InOrder( + mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true), + mockFC.EXPECT().Abandon(), + mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true), + ) + mockSender.EXPECT().onStreamCompleted(streamID) + Expect(str.handleStreamFrame(&wire.StreamFrame{ + Offset: 1000, + FinBit: true, + })).To(Succeed()) + Expect(str.handleStreamFrame(&wire.StreamFrame{ + Offset: 1000, + FinBit: true, + })).To(Succeed()) + }) }) Context("receiving RESET_STREAM frames", func() { @@ -564,7 +583,7 @@ var _ = Describe("Receive Stream", func() { mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true), mockFC.EXPECT().Abandon(), ) - str.handleResetStreamFrame(rst) + Expect(str.handleResetStreamFrame(rst)).To(Succeed()) Eventually(done).Should(BeClosed()) }) @@ -597,6 +616,20 @@ var _ = Describe("Receive Stream", func() { Expect(str.handleResetStreamFrame(rst)).To(Succeed()) }) + It("doesn't call onStreamCompleted again when the final offset was already received via FinBit", func() { + mockSender.EXPECT().queueControlFrame(gomock.Any()) + str.CancelRead(1234) + mockSender.EXPECT().onStreamCompleted(streamID) + mockFC.EXPECT().Abandon() + mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Times(2) + Expect(str.handleStreamFrame(&wire.StreamFrame{ + StreamID: streamID, + Offset: rst.ByteOffset, + FinBit: true, + })).To(Succeed()) + Expect(str.handleResetStreamFrame(rst)).To(Succeed()) + }) + It("doesn't do anyting when it was closed for shutdown", func() { str.closeForShutdown(nil) err := str.handleResetStreamFrame(rst)