improve tests and test coverage of flow control manager

This commit is contained in:
Marten Seemann
2017-02-08 10:47:34 +07:00
parent 1aaf049a11
commit 84bda1a9f4
2 changed files with 98 additions and 64 deletions

View File

@@ -40,6 +40,7 @@ func NewFlowControlManager(connectionParameters handshake.ConnectionParametersMa
}
// NewStream creates new flow controllers for a stream
// it does nothing if the stream already exists
func (f *flowControlManager) NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool) {
f.mutex.Lock()
defer f.mutex.Unlock()

View File

@@ -15,8 +15,8 @@ var _ = Describe("Flow Control Manager", func() {
BeforeEach(func() {
cpm = &mockConnectionParametersManager{
receiveStreamFlowControlWindow: 0x100,
receiveConnectionFlowControlWindow: 0x200,
receiveStreamFlowControlWindow: 100,
receiveConnectionFlowControlWindow: 200,
}
fcm = NewFlowControlManager(cpm, &congestion.RTTStats{}).(*flowControlManager)
})
@@ -28,11 +28,19 @@ var _ = Describe("Flow Control Manager", func() {
Context("creating new streams", func() {
It("creates a new stream", func() {
fcm.NewStream(5, true)
fcm.NewStream(5, false)
Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(5)))
Expect(fcm.streamFlowController[5]).ToNot(BeNil())
Expect(fcm.contributesToConnectionFlowControl).To(HaveKey(protocol.StreamID(5)))
Expect(fcm.contributesToConnectionFlowControl[5]).To(BeTrue())
Expect(fcm.contributesToConnectionFlowControl).To(HaveKeyWithValue(protocol.StreamID(5), false))
})
It("doesn't create a new flow controller if called for an existing stream", func() {
fcm.NewStream(5, true)
Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(5)))
fcm.streamFlowController[5].bytesRead = 0x1337
fcm.NewStream(5, false)
Expect(fcm.streamFlowController[5].bytesRead).To(BeEquivalentTo(0x1337))
Expect(fcm.contributesToConnectionFlowControl).To(HaveKeyWithValue(protocol.StreamID(5), true))
})
})
@@ -52,26 +60,26 @@ var _ = Describe("Flow Control Manager", func() {
})
It("updates the connection level flow controller if the stream contributes", func() {
err := fcm.UpdateHighestReceived(4, 0x100)
err := fcm.UpdateHighestReceived(4, 100)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0x100)))
Expect(fcm.streamFlowController[4].highestReceived).To(Equal(protocol.ByteCount(0x100)))
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(100)))
Expect(fcm.streamFlowController[4].highestReceived).To(Equal(protocol.ByteCount(100)))
})
It("adds the offsets of multiple streams for the connection flow control window", func() {
err := fcm.UpdateHighestReceived(4, 0x100)
err := fcm.UpdateHighestReceived(4, 100)
Expect(err).ToNot(HaveOccurred())
err = fcm.UpdateHighestReceived(6, 0x50)
err = fcm.UpdateHighestReceived(6, 50)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0x100 + 0x50)))
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(100 + 50)))
})
It("does not update the connection level flow controller if the stream does not contribute", func() {
err := fcm.UpdateHighestReceived(1, 0x100)
err := fcm.UpdateHighestReceived(1, 100)
// fcm.streamFlowController[4].receiveWindow = 0x1000
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(BeZero())
Expect(fcm.streamFlowController[1].highestReceived).To(Equal(protocol.ByteCount(0x100)))
Expect(fcm.streamFlowController[1].highestReceived).To(Equal(protocol.ByteCount(100)))
})
It("returns an error when called with an unknown stream", func() {
@@ -79,49 +87,59 @@ var _ = Describe("Flow Control Manager", func() {
Expect(err).To(MatchError(errMapAccess))
})
It("gets the offset of the receive window", func() {
offset, err := fcm.GetReceiveWindow(4)
Expect(err).ToNot(HaveOccurred())
Expect(offset).To(Equal(protocol.ByteCount(100)))
})
It("errors when asked for the receive window of a stream that doesn't exist", func() {
_, err := fcm.GetReceiveWindow(17)
Expect(err).To(MatchError(errMapAccess))
})
Context("flow control violations", func() {
It("errors when encountering a stream level flow control violation", func() {
err := fcm.UpdateHighestReceived(4, 0x101)
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 257 bytes on stream 4, allowed 256 bytes"))) // 0x100 = 256, 0x101 = 257
err := fcm.UpdateHighestReceived(4, 101)
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 101 bytes on stream 4, allowed 100 bytes")))
})
It("errors when encountering a connection-level flow control violation", func() {
fcm.streamFlowController[4].receiveWindow = 0x300
err := fcm.UpdateHighestReceived(4, 0x201)
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 513 bytes for the connection, allowed 512 bytes"))) // 0x200 = 512, 0x201 = 513
fcm.streamFlowController[4].receiveWindow = 300
err := fcm.UpdateHighestReceived(4, 201)
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 201 bytes for the connection, allowed 200 bytes")))
})
})
Context("window updates", func() {
It("gets stream level window updates", func() {
err := fcm.UpdateHighestReceived(4, 0x100)
err := fcm.UpdateHighestReceived(4, 100)
Expect(err).ToNot(HaveOccurred())
err = fcm.AddBytesRead(4, 0x100-0x10)
err = fcm.AddBytesRead(4, 90)
Expect(err).ToNot(HaveOccurred())
updates := fcm.GetWindowUpdates()
Expect(updates).To(HaveLen(1))
Expect(updates[0].StreamID).To(Equal(protocol.StreamID(4)))
Expect(updates[0].Offset).To(Equal(protocol.ByteCount(0x1f0)))
Expect(updates[0].Offset).To(Equal(protocol.ByteCount(190)))
})
It("gets connection level window updates", func() {
err := fcm.UpdateHighestReceived(4, 0x100)
err := fcm.UpdateHighestReceived(4, 100)
Expect(err).ToNot(HaveOccurred())
err = fcm.UpdateHighestReceived(6, 0x100)
err = fcm.UpdateHighestReceived(6, 100)
Expect(err).ToNot(HaveOccurred())
err = fcm.AddBytesRead(4, 0x100-0x10)
err = fcm.AddBytesRead(4, 90)
Expect(err).ToNot(HaveOccurred())
err = fcm.AddBytesRead(6, 0x100-0x10)
err = fcm.AddBytesRead(6, 90)
Expect(err).ToNot(HaveOccurred())
updates := fcm.GetWindowUpdates()
Expect(updates).To(HaveLen(3))
if updates[0].StreamID == 0 {
Expect(updates[0].Offset).ToNot(Equal(protocol.ByteCount(0x200)))
} else if updates[1].StreamID == 0 {
Expect(updates[1].Offset).ToNot(Equal(protocol.ByteCount(0x200)))
} else {
Expect(updates[2].Offset).ToNot(Equal(protocol.ByteCount(0x200)))
}
Expect(updates).ToNot(ContainElement(WindowUpdate{StreamID: 0, Offset: 200}))
})
It("errors when AddBytesRead is called for a stream doesn't exist", func() {
err := fcm.AddBytesRead(17, 1000)
Expect(err).To(MatchError(errMapAccess))
})
})
})
@@ -131,28 +149,28 @@ var _ = Describe("Flow Control Manager", func() {
fcm.NewStream(1, false)
fcm.NewStream(4, true)
fcm.NewStream(6, true)
fcm.streamFlowController[1].bytesSent = 0x41
fcm.streamFlowController[4].bytesSent = 0x42
fcm.streamFlowController[1].bytesSent = 41
fcm.streamFlowController[4].bytesSent = 42
})
It("updates the connection level flow controller if the stream contributes", func() {
err := fcm.ResetStream(4, 0x100)
err := fcm.ResetStream(4, 100)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(0x100)))
Expect(fcm.streamFlowController[4].highestReceived).To(Equal(protocol.ByteCount(0x100)))
Expect(fcm.streamFlowController[0].highestReceived).To(Equal(protocol.ByteCount(100)))
Expect(fcm.streamFlowController[4].highestReceived).To(Equal(protocol.ByteCount(100)))
})
It("does not update the connection level flow controller if the stream does not contribute", func() {
err := fcm.ResetStream(1, 0x100)
err := fcm.ResetStream(1, 100)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].highestReceived).To(BeZero())
Expect(fcm.streamFlowController[1].highestReceived).To(Equal(protocol.ByteCount(0x100)))
Expect(fcm.streamFlowController[1].highestReceived).To(Equal(protocol.ByteCount(100)))
})
It("errors if the byteOffset is smaller than a byteOffset that set earlier", func() {
err := fcm.UpdateHighestReceived(4, 0x100)
err := fcm.UpdateHighestReceived(4, 100)
Expect(err).ToNot(HaveOccurred())
err = fcm.ResetStream(4, 0x50)
err = fcm.ResetStream(4, 50)
Expect(err).To(MatchError(qerr.StreamDataAfterTermination))
})
@@ -163,14 +181,14 @@ var _ = Describe("Flow Control Manager", func() {
Context("flow control violations", func() {
It("errors when encountering a stream level flow control violation", func() {
err := fcm.ResetStream(4, 0x101)
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 257 bytes on stream 4, allowed 256 bytes"))) // 0x100 = 256, 0x101 = 257
err := fcm.ResetStream(4, 101)
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 101 bytes on stream 4, allowed 100 bytes")))
})
It("errors when encountering a connection-level flow control violation", func() {
fcm.streamFlowController[4].receiveWindow = 0x300
err := fcm.ResetStream(4, 0x201)
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 513 bytes for the connection, allowed 512 bytes"))) // 0x200 = 512, 0x201 = 513
fcm.streamFlowController[4].receiveWindow = 300
err := fcm.ResetStream(4, 201)
Expect(err).To(MatchError(qerr.Error(qerr.FlowControlReceivedTooMuchData, "Received 201 bytes for the connection, allowed 200 bytes")))
})
})
})
@@ -180,73 +198,88 @@ var _ = Describe("Flow Control Manager", func() {
fcm.NewStream(1, false)
fcm.NewStream(3, true)
fcm.NewStream(5, true)
err := fcm.AddBytesSent(1, 0x100)
err := fcm.AddBytesSent(1, 100)
Expect(err).ToNot(HaveOccurred())
err = fcm.AddBytesSent(3, 0x200)
err = fcm.AddBytesSent(3, 200)
Expect(err).ToNot(HaveOccurred())
err = fcm.AddBytesSent(5, 0x500)
err = fcm.AddBytesSent(5, 500)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.streamFlowController[0].bytesSent).To(Equal(protocol.ByteCount(0x200 + 0x500)))
Expect(fcm.streamFlowController[0].bytesSent).To(Equal(protocol.ByteCount(200 + 500)))
})
It("errors when called for a stream doesn't exist", func() {
err := fcm.AddBytesSent(17, 1000)
Expect(err).To(MatchError(errMapAccess))
})
Context("window updates", func() {
It("updates the window for a normal stream", func() {
fcm.NewStream(5, true)
updated, err := fcm.UpdateWindow(5, 0x1000)
updated, err := fcm.UpdateWindow(5, 1000)
Expect(err).ToNot(HaveOccurred())
Expect(updated).To(BeTrue())
})
It("updates the connection level window", func() {
updated, err := fcm.UpdateWindow(0, 0x1000)
updated, err := fcm.UpdateWindow(0, 1000)
Expect(err).ToNot(HaveOccurred())
Expect(updated).To(BeTrue())
})
It("errors when called for a stream that doesn't exist", func() {
_, err := fcm.UpdateWindow(17, 1000)
Expect(err).To(MatchError(errMapAccess))
})
})
Context("window sizes", func() {
It("gets the window size of a stream", func() {
fcm.NewStream(5, false)
updated, err := fcm.UpdateWindow(5, 0x1000)
updated, err := fcm.UpdateWindow(5, 1000)
Expect(err).ToNot(HaveOccurred())
Expect(updated).To(BeTrue())
fcm.AddBytesSent(5, 0x500)
fcm.AddBytesSent(5, 500)
size, err := fcm.SendWindowSize(5)
Expect(err).ToNot(HaveOccurred())
Expect(size).To(Equal(protocol.ByteCount(0x1000 - 0x500)))
Expect(size).To(Equal(protocol.ByteCount(1000 - 500)))
})
It("gets the connection window size", func() {
fcm.NewStream(5, true)
updated, err := fcm.UpdateWindow(0, 0x1000)
updated, err := fcm.UpdateWindow(0, 1000)
Expect(err).ToNot(HaveOccurred())
Expect(updated).To(BeTrue())
fcm.AddBytesSent(5, 0x500)
fcm.AddBytesSent(5, 500)
size := fcm.RemainingConnectionWindowSize()
Expect(size).To(Equal(protocol.ByteCount(0x1000 - 0x500)))
Expect(size).To(Equal(protocol.ByteCount(1000 - 500)))
})
It("erros when asked for the send window size of a stream that doesn't exist", func() {
_, err := fcm.SendWindowSize(17)
Expect(err).To(MatchError(errMapAccess))
})
It("limits the stream window size by the connection window size", func() {
fcm.NewStream(5, true)
updated, err := fcm.UpdateWindow(0, 0x500)
updated, err := fcm.UpdateWindow(0, 500)
Expect(err).ToNot(HaveOccurred())
Expect(updated).To(BeTrue())
updated, err = fcm.UpdateWindow(5, 0x1000)
updated, err = fcm.UpdateWindow(5, 1000)
Expect(err).ToNot(HaveOccurred())
Expect(updated).To(BeTrue())
size, err := fcm.SendWindowSize(5)
Expect(err).NotTo(HaveOccurred())
Expect(size).To(Equal(protocol.ByteCount(0x500)))
Expect(size).To(Equal(protocol.ByteCount(500)))
})
It("does not reduce the size of the connection level window, if the stream does not contribute", func() {
fcm.NewStream(3, false)
updated, err := fcm.UpdateWindow(0, 0x1000)
updated, err := fcm.UpdateWindow(0, 1000)
Expect(err).ToNot(HaveOccurred())
Expect(updated).To(BeTrue())
fcm.AddBytesSent(3, 0x456) // WindowSize should return the same value no matter how much was sent
fcm.AddBytesSent(3, 456) // WindowSize should return the same value no matter how much was sent
size := fcm.RemainingConnectionWindowSize()
Expect(size).To(Equal(protocol.ByteCount(0x1000)))
Expect(size).To(Equal(protocol.ByteCount(1000)))
})
})
})