use FlowControlManager in StreamFrameQueue

This commit is contained in:
Marten Seemann
2016-06-16 12:14:50 +07:00
committed by Lucas Clemente
parent e8e6c8922d
commit ef9baf67fc
6 changed files with 78 additions and 96 deletions

View File

@@ -50,16 +50,18 @@ var _ = Describe("Packet packer", func() {
)
BeforeEach(func() {
fcm := newMockFlowControlHandler()
fcm.sendWindowSizes[3] = protocol.MaxByteCount
fcm.sendWindowSizes[5] = protocol.MaxByteCount
fcm.sendWindowSizes[7] = protocol.MaxByteCount
packer = &packetPacker{
cryptoSetup: &handshake.CryptoSetup{},
connectionParametersManager: handshake.NewConnectionParamatersManager(),
sentPacketHandler: newMockSentPacketHandler(),
blockedManager: newBlockedManager(),
streamFrameQueue: newStreamFrameQueue(),
streamFrameQueue: newStreamFrameQueue(fcm),
}
packer.streamFrameQueue.UpdateWindow(3, protocol.MaxByteCount)
packer.streamFrameQueue.UpdateWindow(5, protocol.MaxByteCount)
packer.streamFrameQueue.UpdateWindow(7, protocol.MaxByteCount)
publicHeaderLen = 1 + 8 + 1 // 1 flag byte, 8 connection ID, 1 packet number
packer.version = protocol.Version34
})

View File

@@ -96,6 +96,7 @@ type Session struct {
func newSession(conn connection, v protocol.VersionNumber, connectionID protocol.ConnectionID, sCfg *handshake.ServerConfig, streamCallback StreamCallback, closeCallback closeCallback) (packetHandler, error) {
stopWaitingManager := ackhandlerlegacy.NewStopWaitingManager()
connectionParametersManager := handshake.NewConnectionParamatersManager()
flowControlManager := flowcontrol.NewFlowControlManager(connectionParametersManager)
session := &Session{
connectionID: connectionID,
@@ -107,11 +108,11 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol
sentPacketHandler: ackhandlerlegacy.NewSentPacketHandler(stopWaitingManager),
receivedPacketHandler: ackhandlerlegacy.NewReceivedPacketHandler(),
stopWaitingManager: stopWaitingManager,
flowControlManager: flowcontrol.NewFlowControlManager(connectionParametersManager),
flowController: flowcontrol.NewFlowController(0, connectionParametersManager),
flowControlManager: flowControlManager,
windowUpdateManager: newWindowUpdateManager(),
blockedManager: newBlockedManager(),
streamFrameQueue: newStreamFrameQueue(),
streamFrameQueue: newStreamFrameQueue(flowControlManager),
receivedPackets: make(chan receivedPacket, protocol.MaxSessionUnprocessedPackets),
closeChan: make(chan struct{}, 1),
sendingScheduled: make(chan struct{}, 1),
@@ -374,6 +375,12 @@ func (s *Session) handleWindowUpdateFrame(frame *frames.WindowUpdateFrame) error
}
}
// TODO: only use this once the other flowController is removed
_, err := s.flowControlManager.UpdateWindow(frame.StreamID, frame.ByteOffset)
if err != nil {
return err
}
return nil
}

View File

@@ -73,9 +73,6 @@ var _ = Describe("Session", func() {
Expect(err).NotTo(HaveOccurred())
session = pSession.(*Session)
Expect(session.streams).To(HaveLen(1)) // Crypto stream
// TODO: remove this once the streamFrameQueue is properly initialized
session.streamFrameQueue.UpdateWindow(5, protocol.MaxByteCount)
})
Context("when handling stream frames", func() {
@@ -340,7 +337,6 @@ var _ = Describe("Session", func() {
ByteOffset: 0x800000,
})
Expect(err).ToNot(HaveOccurred())
Expect(session.flowController.SendWindowSize()).To(Equal(protocol.ByteCount(0x800000)))
})
It("errors when the stream is not known", func() {
@@ -461,6 +457,7 @@ var _ = Describe("Session", func() {
})
It("sends queued stream frames", func() {
session.OpenStream(5)
session.queueStreamFrame(&frames.StreamFrame{
StreamID: 5,
Data: []byte("foobar"),
@@ -613,65 +610,77 @@ var _ = Describe("Session", func() {
Context("bundling of small packets", func() {
It("bundles two small frames into one packet", func() {
session.OpenStream(5)
go session.run()
session.queueStreamFrame(&frames.StreamFrame{
err := session.queueStreamFrame(&frames.StreamFrame{
StreamID: 5,
Data: []byte("foobar1"),
})
session.queueStreamFrame(&frames.StreamFrame{
Expect(err).ToNot(HaveOccurred())
err = session.queueStreamFrame(&frames.StreamFrame{
StreamID: 5,
Data: []byte("foobar2"),
})
Expect(err).ToNot(HaveOccurred())
time.Sleep(10 * time.Millisecond)
Expect(conn.written).To(HaveLen(1))
})
It("sends out two big frames in two packet", func() {
session.OpenStream(5)
go session.run()
session.queueStreamFrame(&frames.StreamFrame{
err := session.queueStreamFrame(&frames.StreamFrame{
StreamID: 5,
Data: bytes.Repeat([]byte{'e'}, int(protocol.SmallPacketPayloadSizeThreshold+50)),
})
session.queueStreamFrame(&frames.StreamFrame{
Expect(err).ToNot(HaveOccurred())
err = session.queueStreamFrame(&frames.StreamFrame{
StreamID: 5,
Data: bytes.Repeat([]byte{'f'}, int(protocol.SmallPacketPayloadSizeThreshold+50)),
})
Expect(err).ToNot(HaveOccurred())
time.Sleep(10 * time.Millisecond)
Expect(conn.written).To(HaveLen(2))
})
It("sends out two small frames that are written to long after one another into two packet", func() {
session.OpenStream(5)
go session.run()
session.queueStreamFrame(&frames.StreamFrame{
err := session.queueStreamFrame(&frames.StreamFrame{
StreamID: 5,
Data: []byte("foobar1"),
})
Expect(err).ToNot(HaveOccurred())
time.Sleep(20 * protocol.SmallPacketSendDelay)
session.queueStreamFrame(&frames.StreamFrame{
err = session.queueStreamFrame(&frames.StreamFrame{
StreamID: 5,
Data: []byte("foobar2"),
})
Expect(err).ToNot(HaveOccurred())
time.Sleep(10 * time.Millisecond)
Expect(conn.written).To(HaveLen(2))
})
It("sends a queued ACK frame only once", func() {
session.OpenStream(5)
go session.run()
packetNumber := protocol.PacketNumber(0x1337)
session.receivedPacketHandler.ReceivedPacket(packetNumber, true)
session.queueStreamFrame(&frames.StreamFrame{
err := session.queueStreamFrame(&frames.StreamFrame{
StreamID: 5,
Data: []byte("foobar1"),
})
Expect(err).ToNot(HaveOccurred())
time.Sleep(20 * protocol.SmallPacketSendDelay)
session.queueStreamFrame(&frames.StreamFrame{
err = session.queueStreamFrame(&frames.StreamFrame{
StreamID: 5,
Data: []byte("foobar2"),
})
Expect(err).ToNot(HaveOccurred())
time.Sleep(10 * time.Millisecond)
Expect(conn.written).To(HaveLen(2))
Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x37, 0x13})))

View File

@@ -4,6 +4,7 @@ import (
"errors"
"sync"
"github.com/lucas-clemente/quic-go/flowcontrol"
"github.com/lucas-clemente/quic-go/frames"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/qerr"
@@ -20,9 +21,7 @@ type streamFrameQueue struct {
frameMap map[protocol.StreamID][]*frames.StreamFrame
frameQueueMutex sync.RWMutex
flowControlWindows map[protocol.StreamID]protocol.ByteCount
bytesSent protocol.ByteCount
windowMutex sync.Mutex
flowControlManager flowcontrol.FlowControlManager
activeStreams []protocol.StreamID
activeStreamsPosition int
@@ -31,10 +30,10 @@ type streamFrameQueue struct {
byteLen protocol.ByteCount
}
func newStreamFrameQueue() *streamFrameQueue {
func newStreamFrameQueue(flowControlManager flowcontrol.FlowControlManager) *streamFrameQueue {
return &streamFrameQueue{
frameMap: make(map[protocol.StreamID][]*frames.StreamFrame),
flowControlWindows: make(map[protocol.StreamID]protocol.ByteCount),
flowControlManager: flowControlManager,
}
}
@@ -75,24 +74,10 @@ func (q *streamFrameQueue) ByteLen() protocol.ByteCount {
return q.byteLen
}
// UpdateWindow updates the flow control window
// streamID may be 0, for the connection level flow control window
func (q *streamFrameQueue) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) {
q.windowMutex.Lock()
defer q.windowMutex.Unlock()
windowSize, ok := q.flowControlWindows[streamID]
if !ok || offset > windowSize {
q.flowControlWindows[streamID] = offset
}
}
// Pop returns the next element and deletes it from the queue
func (q *streamFrameQueue) Pop(maxLength protocol.ByteCount) (*frames.StreamFrame, error) {
q.frameQueueMutex.Lock()
q.windowMutex.Lock()
defer q.frameQueueMutex.Unlock()
defer q.windowMutex.Unlock()
var isPrioFrame bool
var frame *frames.StreamFrame
@@ -127,6 +112,9 @@ func (q *streamFrameQueue) Pop(maxLength protocol.ByteCount) (*frames.StreamFram
}
frame = q.frameMap[streamID][0]
maxFrameDataSize, err = q.getMaximumFrameDataSize(frame)
if err != nil {
return nil, err
}
if maxFrameDataSize > 0 {
foundFrame = true
}
@@ -161,9 +149,7 @@ func (q *streamFrameQueue) Pop(maxLength protocol.ByteCount) (*frames.StreamFram
q.byteLen -= frame.DataLen()
// TODO: find a better solution for identifying streams that don't contribute to connection level flow control
if frame.StreamID != 1 && frame.StreamID != 3 {
q.bytesSent += frame.DataLen()
}
q.flowControlManager.AddBytesSent(streamID, frame.DataLen())
q.len--
return frame, nil
@@ -193,8 +179,6 @@ func (q *streamFrameQueue) RemoveStream(streamID protocol.StreamID) {
delete(q.frameMap, streamID)
}
delete(q.flowControlWindows, streamID)
for i, s := range q.activeStreams {
if s == streamID {
q.activeStreams[i] = 0
@@ -273,9 +257,9 @@ func (q *streamFrameQueue) maybeSplitOffFrame(frame *frames.StreamFrame, n proto
}
func (q *streamFrameQueue) getMaximumFrameDataSize(frame *frames.StreamFrame) (protocol.ByteCount, error) {
highestAllowedStreamOffset, ok := q.flowControlWindows[frame.StreamID]
if !ok {
return 0, errMapAccess
highestAllowedStreamOffset, err := q.flowControlManager.SendWindowSize(frame.StreamID)
if err != nil {
return 0, err
}
if frame.Offset > highestAllowedStreamOffset { // stream level flow control blocked
return 0, errStreamFlowControlBlocked

View File

@@ -13,7 +13,6 @@ var _ = Describe("streamFrameQueue", func() {
var queue *streamFrameQueue
BeforeEach(func() {
queue = newStreamFrameQueue()
prioFrame1 = &frames.StreamFrame{
StreamID: 5,
Data: []byte{0x13, 0x37},
@@ -35,11 +34,13 @@ var _ = Describe("streamFrameQueue", func() {
Data: []byte{0xBE, 0xEF},
}
queue.UpdateWindow(prioFrame1.StreamID, 1000)
queue.UpdateWindow(prioFrame2.StreamID, 1000)
queue.UpdateWindow(frame1.StreamID, 1000)
queue.UpdateWindow(frame2.StreamID, 1000)
queue.UpdateWindow(frame3.StreamID, 1000)
fcm := newMockFlowControlHandler()
fcm.sendWindowSizes[frame1.StreamID] = protocol.MaxByteCount
fcm.sendWindowSizes[frame2.StreamID] = protocol.MaxByteCount
fcm.sendWindowSizes[frame3.StreamID] = protocol.MaxByteCount
fcm.sendWindowSizes[prioFrame1.StreamID] = protocol.MaxByteCount
fcm.sendWindowSizes[prioFrame2.StreamID] = protocol.MaxByteCount
queue = newStreamFrameQueue(fcm)
})
It("sets the DataLenPresent on all StreamFrames", func() {
@@ -389,7 +390,7 @@ var _ = Describe("streamFrameQueue", func() {
Context("flow control", func() {
It("returns the whole frame if it fits", func() {
frame1.Offset = 10
queue.flowControlWindows[frame1.StreamID] = 10 + frame1.DataLen()
queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 10 + frame1.DataLen()
queue.Push(frame1, false)
frame, err := queue.Pop(1000)
Expect(err).ToNot(HaveOccurred())
@@ -398,23 +399,24 @@ var _ = Describe("streamFrameQueue", func() {
It("returns a split frame if the whole frame doesn't fit", func() {
queue.Push(frame1, false)
queue.flowControlWindows[frame1.StreamID] = 3
len := frame1.DataLen() - 1
queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = len
frame, err := queue.Pop(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame.DataLen()).To(Equal(protocol.ByteCount(3)))
Expect(frame.DataLen()).To(Equal(protocol.ByteCount(len)))
})
It("returns a split frame if the whole frame doesn't fit, for non-zero StreamFrame offset", func() {
frame1.Offset = 2
queue.Push(frame1, false)
queue.flowControlWindows[frame1.StreamID] = 4
queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 4
frame, err := queue.Pop(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame.DataLen()).To(Equal(protocol.ByteCount(2)))
})
It("skips a frame if the stream is flow control blocked", func() {
queue.flowControlWindows[frame1.StreamID] = 0
queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 0
queue.Push(frame1, false)
queue.Push(frame2, false)
frame, err := queue.Pop(1000)
@@ -423,8 +425,8 @@ var _ = Describe("streamFrameQueue", func() {
})
It("returns nil if no stream is not flow control blocked", func() {
queue.flowControlWindows[frame1.StreamID] = 0
queue.flowControlWindows[frame2.StreamID] = 0
queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 0
queue.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame2.StreamID] = 0
queue.Push(frame1, false)
queue.Push(frame2, false)
frame, err := queue.Pop(1000)
@@ -486,13 +488,6 @@ var _ = Describe("streamFrameQueue", func() {
Expect(frame).To(Equal(frame2))
})
It("deletes the entries from the flowControlWindows map", func() {
queue.UpdateWindow(1337, 0x4000)
Expect(queue.flowControlWindows).To(HaveKey(protocol.StreamID(1337)))
queue.RemoveStream(1337)
Expect(queue.flowControlWindows).ToNot(HaveKey(protocol.StreamID(1337)))
})
Context("garbage collection of activeStreams", func() {
It("adjusts the activeStreams slice", func() {
queue.activeStreams = []protocol.StreamID{5, 6, 10, 2, 3}
@@ -529,29 +524,4 @@ var _ = Describe("streamFrameQueue", func() {
})
})
})
Context("flow control", func() {
Context("updating the window", func() {
It("creates an entry for a new stream", func() {
queue.UpdateWindow(1337, 0x1337)
Expect(queue.flowControlWindows).To(HaveKey(protocol.StreamID(1337)))
Expect(queue.flowControlWindows[1337]).To(Equal(protocol.ByteCount(0x1337)))
})
It("updates the window for an existing stream", func() {
queue.UpdateWindow(1337, 0x1000)
Expect(queue.flowControlWindows[1337]).To(Equal(protocol.ByteCount(0x1000)))
queue.UpdateWindow(1337, 0x2000)
Expect(queue.flowControlWindows[1337]).To(Equal(protocol.ByteCount(0x2000)))
})
It("does not decrease the window size", func() {
queue.UpdateWindow(1337, 0x1000)
Expect(queue.flowControlWindows[1337]).To(Equal(protocol.ByteCount(0x1000)))
queue.UpdateWindow(1337, 0x500)
Expect(queue.flowControlWindows[1337]).To(Equal(protocol.ByteCount(0x1000)))
})
})
})
})

View File

@@ -40,6 +40,7 @@ func (m *mockStreamHandler) queueStreamFrame(f *frames.StreamFrame) error {
}
type mockFlowControlHandler struct {
sendWindowSizes map[protocol.StreamID]protocol.ByteCount
bytesReadForStream protocol.StreamID
bytesRead protocol.ByteCount
@@ -50,6 +51,12 @@ type mockFlowControlHandler struct {
triggerConnectionWindowUpdate bool
}
func newMockFlowControlHandler() *mockFlowControlHandler {
return &mockFlowControlHandler{
sendWindowSizes: make(map[protocol.StreamID]protocol.ByteCount),
}
}
func (m *mockFlowControlHandler) NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool) {
panic("not implemented")
}
@@ -75,10 +82,10 @@ func (m *mockFlowControlHandler) UpdateHighestReceived(streamID protocol.StreamI
}
func (m *mockFlowControlHandler) AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error {
panic("not implemented")
return nil
}
func (m *mockFlowControlHandler) SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error) {
panic("not implemented")
return m.sendWindowSizes[streamID], nil
}
func (m *mockFlowControlHandler) RemainingConnectionWindowSize() protocol.ByteCount {
panic("not implemented")
@@ -510,16 +517,18 @@ var _ = Describe("Stream", func() {
PContext("Blocked streams", func() {
It("notifies the session when a stream is flow control blocked", func() {
updated := str.flowController.UpdateSendWindow(1337)
updated, err := str.flowControlManager.UpdateWindow(str.streamID, 1337)
Expect(err).ToNot(HaveOccurred())
Expect(updated).To(BeTrue())
str.flowController.AddBytesSent(1337)
str.flowControlManager.AddBytesSent(str.streamID, 1337)
str.maybeTriggerBlocked()
Expect(handler.receivedBlockedCalled).To(BeTrue())
Expect(handler.receivedBlockedForStream).To(Equal(str.streamID))
})
It("notifies the session as soon as a stream is reaching the end of the window", func() {
updated := str.flowController.UpdateSendWindow(4)
updated, err := str.flowControlManager.UpdateWindow(str.streamID, 4)
Expect(err).ToNot(HaveOccurred())
Expect(updated).To(BeTrue())
str.Write([]byte{0xDE, 0xCA, 0xFB, 0xAD})
Expect(handler.receivedBlockedCalled).To(BeTrue())
@@ -527,7 +536,8 @@ var _ = Describe("Stream", func() {
})
It("notifies the session as soon as a stream is flow control blocked", func() {
updated := str.flowController.UpdateSendWindow(2)
updated, err := str.flowControlManager.UpdateWindow(str.streamID, 2)
Expect(err).ToNot(HaveOccurred())
Expect(updated).To(BeTrue())
go func() {
str.Write([]byte{0xDE, 0xCA, 0xFB, 0xAD})