Merge pull request #2213 from lucas-clemente/stream-completed-fixes

only call the onStreamCompleted callback once in the receiveStream
This commit is contained in:
Marten Seemann
2019-11-09 17:17:10 +07:00
committed by GitHub
2 changed files with 46 additions and 18 deletions

View File

@@ -84,7 +84,7 @@ func (s *receiveStream) Read(p []byte) (int, error) {
s.mutex.Unlock()
if completed {
s.streamCompleted()
s.sender.onStreamCompleted(s.streamID)
}
return n, err
}
@@ -201,7 +201,8 @@ func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) {
s.mutex.Unlock()
if completed {
s.streamCompleted()
s.flowController.Abandon()
s.sender.onStreamCompleted(s.streamID)
}
}
@@ -226,7 +227,8 @@ func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
s.mutex.Unlock()
if completed {
s.streamCompleted()
s.flowController.Abandon()
s.sender.onStreamCompleted(s.streamID)
}
return err
}
@@ -236,11 +238,13 @@ func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /*
if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
return false, err
}
var newlyRcvdFinalOffset bool
if frame.FinBit {
newlyRcvdFinalOffset = s.finalOffset == protocol.MaxByteCount
s.finalOffset = maxOffset
}
if s.canceledRead {
return frame.FinBit, nil
return newlyRcvdFinalOffset, nil
}
if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.PutBack); err != nil {
return false, err
@@ -255,7 +259,8 @@ func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) err
s.mutex.Unlock()
if completed {
s.streamCompleted()
s.flowController.Abandon()
s.sender.onStreamCompleted(s.streamID)
}
return err
}
@@ -267,6 +272,7 @@ func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame)
if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil {
return false, err
}
newlyRcvdFinalOffset := s.finalOffset == protocol.MaxByteCount
s.finalOffset = frame.ByteOffset
// ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
@@ -279,7 +285,7 @@ func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame)
error: fmt.Errorf("stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
}
s.signalRead()
return true, nil
return newlyRcvdFinalOffset, nil
}
func (s *receiveStream) CloseRemote(offset protocol.ByteCount) {
@@ -309,17 +315,6 @@ func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
return s.flowController.GetWindowUpdate()
}
func (s *receiveStream) streamCompleted() {
s.mutex.Lock()
finRead := s.finRead
s.mutex.Unlock()
if !finRead {
s.flowController.Abandon()
}
s.sender.onStreamCompleted(s.streamID)
}
// signalRead performs a non-blocking send on the readChan
func (s *receiveStream) signalRead() {
select {

View File

@@ -538,6 +538,25 @@ var _ = Describe("Receive Stream", func() {
FinBit: true,
})).To(Succeed())
})
It("handles duplicate FinBits after the stream was canceled", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
str.CancelRead(1234)
gomock.InOrder(
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true),
mockFC.EXPECT().Abandon(),
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true),
)
mockSender.EXPECT().onStreamCompleted(streamID)
Expect(str.handleStreamFrame(&wire.StreamFrame{
Offset: 1000,
FinBit: true,
})).To(Succeed())
Expect(str.handleStreamFrame(&wire.StreamFrame{
Offset: 1000,
FinBit: true,
})).To(Succeed())
})
})
Context("receiving RESET_STREAM frames", func() {
@@ -564,7 +583,7 @@ var _ = Describe("Receive Stream", func() {
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
mockFC.EXPECT().Abandon(),
)
str.handleResetStreamFrame(rst)
Expect(str.handleResetStreamFrame(rst)).To(Succeed())
Eventually(done).Should(BeClosed())
})
@@ -597,6 +616,20 @@ var _ = Describe("Receive Stream", func() {
Expect(str.handleResetStreamFrame(rst)).To(Succeed())
})
It("doesn't call onStreamCompleted again when the final offset was already received via FinBit", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
str.CancelRead(1234)
mockSender.EXPECT().onStreamCompleted(streamID)
mockFC.EXPECT().Abandon()
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Times(2)
Expect(str.handleStreamFrame(&wire.StreamFrame{
StreamID: streamID,
Offset: rst.ByteOffset,
FinBit: true,
})).To(Succeed())
Expect(str.handleResetStreamFrame(rst)).To(Succeed())
})
It("doesn't do anyting when it was closed for shutdown", func() {
str.closeForShutdown(nil)
err := str.handleResetStreamFrame(rst)