diff --git a/internal/flowcontrol/stream_flow_controller.go b/internal/flowcontrol/stream_flow_controller.go index 8cad9e662..96e13dc3b 100644 --- a/internal/flowcontrol/stream_flow_controller.go +++ b/internal/flowcontrol/stream_flow_controller.go @@ -12,10 +12,12 @@ import ( type streamFlowController struct { baseFlowController - connection connectionFlowControllerI + streamID protocol.StreamID - streamID protocol.StreamID + connection connectionFlowControllerI contributesToConnection bool // does the stream contribute to connection level flow control + + receivedFinalOffset bool } var _ StreamFlowController = &streamFlowController{} @@ -50,7 +52,17 @@ func (c *streamFlowController) UpdateHighestReceived(byteOffset protocol.ByteCou c.mutex.Lock() defer c.mutex.Unlock() - // TODO(#382): check for StreamDataAfterTermination errors, when receiving an offset after we already received a final offset + // when receiving a final offset, check that this final offset is consistent with a final offset we might have received earlier + if final && c.receivedFinalOffset && byteOffset != c.highestReceived { + return qerr.Error(qerr.StreamDataAfterTermination, fmt.Sprintf("Received inconsistent final offset for stream %d (old: %d, new: %d bytes)", c.streamID, c.highestReceived, byteOffset)) + } + // if we already received a final offset, check that the offset in the STREAM frames is below the final offset + if c.receivedFinalOffset && byteOffset > c.highestReceived { + return qerr.StreamDataAfterTermination + } + if final { + c.receivedFinalOffset = true + } if byteOffset == c.highestReceived { return nil } diff --git a/internal/flowcontrol/stream_flow_controller_test.go b/internal/flowcontrol/stream_flow_controller_test.go index 0273ad545..0718c6a06 100644 --- a/internal/flowcontrol/stream_flow_controller_test.go +++ b/internal/flowcontrol/stream_flow_controller_test.go @@ -110,6 +110,35 @@ var _ = Describe("Stream Flow controller", func() { err := controller.UpdateHighestReceived(99, true) Expect(err).To(MatchError(qerr.StreamDataAfterTermination)) }) + + It("accepts delayed data after receiving a final offset", func() { + err := controller.UpdateHighestReceived(300, true) + Expect(err).ToNot(HaveOccurred()) + err = controller.UpdateHighestReceived(250, false) + Expect(err).ToNot(HaveOccurred()) + }) + + It("errors when receiving a higher offset after receiving a final offset", func() { + err := controller.UpdateHighestReceived(200, true) + Expect(err).ToNot(HaveOccurred()) + err = controller.UpdateHighestReceived(250, false) + Expect(err).To(MatchError(qerr.StreamDataAfterTermination)) + }) + + It("accepts duplicate final offsets", func() { + err := controller.UpdateHighestReceived(200, true) + Expect(err).ToNot(HaveOccurred()) + err = controller.UpdateHighestReceived(200, true) + Expect(err).ToNot(HaveOccurred()) + Expect(controller.highestReceived).To(Equal(protocol.ByteCount(200))) + }) + + It("errors when receiving inconsistent final offsets", func() { + err := controller.UpdateHighestReceived(200, true) + Expect(err).ToNot(HaveOccurred()) + err = controller.UpdateHighestReceived(201, true) + Expect(err).To(MatchError("StreamDataAfterTermination: Received inconsistent final offset for stream 10 (old: 200, new: 201 bytes)")) + }) }) Context("registering data read", func() {