Merge pull request #897 from lucas-clemente/stricter-flow-control

make flow control stricter when receiving final offsets
This commit is contained in:
Marten Seemann
2017-10-27 17:17:54 +07:00
committed by GitHub
2 changed files with 44 additions and 3 deletions

View File

@@ -12,10 +12,12 @@ import (
type streamFlowController struct {
baseFlowController
connection connectionFlowControllerI
streamID protocol.StreamID
streamID protocol.StreamID
connection connectionFlowControllerI
contributesToConnection bool // does the stream contribute to connection level flow control
receivedFinalOffset bool
}
var _ StreamFlowController = &streamFlowController{}
@@ -50,7 +52,17 @@ func (c *streamFlowController) UpdateHighestReceived(byteOffset protocol.ByteCou
c.mutex.Lock()
defer c.mutex.Unlock()
// TODO(#382): check for StreamDataAfterTermination errors, when receiving an offset after we already received a final offset
// when receiving a final offset, check that this final offset is consistent with a final offset we might have received earlier
if final && c.receivedFinalOffset && byteOffset != c.highestReceived {
return qerr.Error(qerr.StreamDataAfterTermination, fmt.Sprintf("Received inconsistent final offset for stream %d (old: %d, new: %d bytes)", c.streamID, c.highestReceived, byteOffset))
}
// if we already received a final offset, check that the offset in the STREAM frames is below the final offset
if c.receivedFinalOffset && byteOffset > c.highestReceived {
return qerr.StreamDataAfterTermination
}
if final {
c.receivedFinalOffset = true
}
if byteOffset == c.highestReceived {
return nil
}

View File

@@ -110,6 +110,35 @@ var _ = Describe("Stream Flow controller", func() {
err := controller.UpdateHighestReceived(99, true)
Expect(err).To(MatchError(qerr.StreamDataAfterTermination))
})
It("accepts delayed data after receiving a final offset", func() {
err := controller.UpdateHighestReceived(300, true)
Expect(err).ToNot(HaveOccurred())
err = controller.UpdateHighestReceived(250, false)
Expect(err).ToNot(HaveOccurred())
})
It("errors when receiving a higher offset after receiving a final offset", func() {
err := controller.UpdateHighestReceived(200, true)
Expect(err).ToNot(HaveOccurred())
err = controller.UpdateHighestReceived(250, false)
Expect(err).To(MatchError(qerr.StreamDataAfterTermination))
})
It("accepts duplicate final offsets", func() {
err := controller.UpdateHighestReceived(200, true)
Expect(err).ToNot(HaveOccurred())
err = controller.UpdateHighestReceived(200, true)
Expect(err).ToNot(HaveOccurred())
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(200)))
})
It("errors when receiving inconsistent final offsets", func() {
err := controller.UpdateHighestReceived(200, true)
Expect(err).ToNot(HaveOccurred())
err = controller.UpdateHighestReceived(201, true)
Expect(err).To(MatchError("StreamDataAfterTermination: Received inconsistent final offset for stream 10 (old: 200, new: 201 bytes)"))
})
})
Context("registering data read", func() {