add method to reset a stream at a byte offset to flow controller

ref #377
This commit is contained in:
Marten Seemann
2017-01-03 11:13:43 +07:00
parent fe9da30cb2
commit fb3f753c94
6 changed files with 115 additions and 9 deletions

View File

@@ -7,6 +7,7 @@ import (
"github.com/lucas-clemente/quic-go/congestion"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/qerr"
"github.com/lucas-clemente/quic-go/utils"
)
@@ -63,6 +64,37 @@ func (f *flowControlManager) RemoveStream(streamID protocol.StreamID) {
f.mutex.Unlock()
}
// ResetStream should be called when receiving a RstStreamFrame
// it updates the byte offset to the value in the RstStreamFrame
// streamID must not be 0 here
func (f *flowControlManager) ResetStream(streamID protocol.StreamID, byteOffset protocol.ByteCount) error {
f.mutex.Lock()
defer f.mutex.Unlock()
streamFlowController, err := f.getFlowController(streamID)
if err != nil {
return err
}
increment, err := streamFlowController.UpdateHighestReceived(byteOffset)
if err != nil {
return qerr.StreamDataAfterTermination
}
if streamFlowController.CheckFlowControlViolation() {
return ErrStreamFlowControlViolation
}
if f.contributesToConnectionFlowControl[streamID] {
connectionFlowController := f.streamFlowController[0]
connectionFlowController.IncrementHighestReceived(increment)
if connectionFlowController.CheckFlowControlViolation() {
return ErrConnectionFlowControlViolation
}
}
return nil
}
// UpdateHighestReceived updates the highest received byte offset for a stream
// it adds the number of additional bytes to connection level flow control
// streamID must not be 0 here
@@ -74,7 +106,9 @@ func (f *flowControlManager) UpdateHighestReceived(streamID protocol.StreamID, b
if err != nil {
return err
}
increment := streamFlowController.UpdateHighestReceived(byteOffset)
// UpdateHighestReceived returns an ErrReceivedSmallerByteOffset when StreamFrames got reordered
// this error can be ignored here
increment, _ := streamFlowController.UpdateHighestReceived(byteOffset)
if streamFlowController.CheckFlowControlViolation() {
return ErrStreamFlowControlViolation

View File

@@ -4,6 +4,7 @@ import (
"github.com/lucas-clemente/quic-go/congestion"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/qerr"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
@@ -50,7 +51,7 @@ var _ = Describe("Flow Control Manager", func() {
fcm.NewStream(6, true)
})
It("updates the connection level flow controller if the stream does not contribute", func() {
It("updates the connection level flow controller if the stream contributes", func() {
err := fcm.UpdateHighestReceived(4, 0x100)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0x100)))
@@ -69,7 +70,7 @@ var _ = Describe("Flow Control Manager", func() {
err := fcm.UpdateHighestReceived(1, 0x100)
// fcm.streamFlowController[4].receiveFlowControlWindow = 0x1000
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0)))
Expect(fcm.streamFlowController[0].highestReceived).To(BeZero())
Expect(fcm.streamFlowController[1].highestReceived).To(Equal(protocol.ByteCount(0x100)))
})
@@ -125,6 +126,53 @@ var _ = Describe("Flow Control Manager", func() {
})
})
Context("resetting a stream", func() {
BeforeEach(func() {
fcm.NewStream(1, false)
fcm.NewStream(4, true)
fcm.NewStream(6, true)
})
It("updates the connection level flow controller if the stream contributes", func() {
err := fcm.ResetStream(4, 0x100)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0x100)))
Expect(fcm.streamFlowController[4].highestReceived).To(Equal(protocol.ByteCount(0x100)))
})
It("does not update the connection level flow controller if the stream does not contribute", func() {
err := fcm.ResetStream(1, 0x100)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(BeZero())
Expect(fcm.streamFlowController[1].highestReceived).To(Equal(protocol.ByteCount(0x100)))
})
It("errors if the byteOffset is smaller than a byteOffset that set earlier", func() {
err := fcm.UpdateHighestReceived(4, 0x100)
Expect(err).ToNot(HaveOccurred())
err = fcm.ResetStream(4, 0x50)
Expect(err).To(MatchError(qerr.StreamDataAfterTermination))
})
It("returns an error when called with an unknown stream", func() {
err := fcm.ResetStream(1337, 0x1337)
Expect(err).To(MatchError(errMapAccess))
})
Context("flow control violations", func() {
It("errors when encountering a stream level flow control violation", func() {
err := fcm.ResetStream(4, 0x101)
Expect(err).To(MatchError(ErrStreamFlowControlViolation))
})
It("errors when encountering a connection-level flow control violation", func() {
fcm.streamFlowController[4].receiveFlowControlWindow = 0x300
err := fcm.ResetStream(4, 0x201)
Expect(err).To(MatchError(ErrConnectionFlowControlViolation))
})
})
})
Context("sending data", func() {
It("adds bytes sent for all stream contributing to connection level flow control", func() {
fcm.NewStream(1, false)

View File

@@ -1,6 +1,7 @@
package flowcontrol
import (
"errors"
"time"
"github.com/lucas-clemente/quic-go/congestion"
@@ -27,6 +28,9 @@ type flowController struct {
maxReceiveFlowControlWindowIncrement protocol.ByteCount
}
// ErrReceivedSmallerByteOffset occurs if the ByteOffset received is smaller than a ByteOffset that was set previously
var ErrReceivedSmallerByteOffset = errors.New("Received a smaller byte offset")
// newFlowController gets a new flow controller
func newFlowController(streamID protocol.StreamID, connectionParameters handshake.ConnectionParametersManager, rttStats *congestion.RTTStats) *flowController {
fc := flowController{
@@ -87,13 +91,19 @@ func (c *flowController) SendWindowOffset() protocol.ByteCount {
// UpdateHighestReceived updates the highestReceived value, if the byteOffset is higher
// Should **only** be used for the stream-level FlowController
func (c *flowController) UpdateHighestReceived(byteOffset protocol.ByteCount) protocol.ByteCount {
// it returns an ErrReceivedSmallerByteOffset if the received byteOffset is smaller than any byteOffset received before
// This error occurs every time StreamFrames get reordered and has to be ignored in that case
// It should only be treated as an error when resetting a stream
func (c *flowController) UpdateHighestReceived(byteOffset protocol.ByteCount) (protocol.ByteCount, error) {
if byteOffset == c.highestReceived {
return 0, nil
}
if byteOffset > c.highestReceived {
increment := byteOffset - c.highestReceived
c.highestReceived = byteOffset
return increment
return increment, nil
}
return 0
return 0, ErrReceivedSmallerByteOffset
}
// IncrementHighestReceived adds an increment to the highestReceived value

View File

@@ -206,18 +206,27 @@ var _ = Describe("Flow controller", func() {
It("updates the highestReceived", func() {
controller.highestReceived = 1337
increment := controller.UpdateHighestReceived(1338)
increment, err := controller.UpdateHighestReceived(1338)
Expect(err).ToNot(HaveOccurred())
Expect(increment).To(Equal(protocol.ByteCount(1338 - 1337)))
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1338)))
})
It("does not decrease the highestReceived", func() {
controller.highestReceived = 1337
increment := controller.UpdateHighestReceived(1000)
Expect(increment).To(Equal(protocol.ByteCount(0)))
increment, err := controller.UpdateHighestReceived(1000)
Expect(err).To(MatchError(ErrReceivedSmallerByteOffset))
Expect(increment).To(BeZero())
Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337)))
})
It("does not error when setting the same byte offset", func() {
controller.highestReceived = 1337
increment, err := controller.UpdateHighestReceived(1337)
Expect(err).ToNot(HaveOccurred())
Expect(increment).To(BeZero())
})
It("increases the highestReceived by a given increment", func() {
controller.highestReceived = 1337
controller.IncrementHighestReceived(123)

View File

@@ -13,6 +13,7 @@ type FlowControlManager interface {
NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool)
RemoveStream(streamID protocol.StreamID)
// methods needed for receiving data
ResetStream(streamID protocol.StreamID, byteOffset protocol.ByteCount) error
UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error
AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error
GetWindowUpdates() []WindowUpdate