diff --git a/flowcontrol/flow_control_manager.go b/flowcontrol/flow_control_manager.go index 0ab63634..0af52e9c 100644 --- a/flowcontrol/flow_control_manager.go +++ b/flowcontrol/flow_control_manager.go @@ -4,13 +4,16 @@ import ( "errors" "sync" + "github.com/lucas-clemente/quic-go/congestion" "github.com/lucas-clemente/quic-go/handshake" "github.com/lucas-clemente/quic-go/protocol" "github.com/lucas-clemente/quic-go/utils" ) type flowControlManager struct { - connectionParametersManager *handshake.ConnectionParametersManager + connectionParametersManager *handshake.ConnectionParametersManager + rttStats *congestion.RTTStats + streamFlowController map[protocol.StreamID]*flowController contributesToConnectionFlowControl map[protocol.StreamID]bool mutex sync.RWMutex @@ -26,14 +29,15 @@ var ( var errMapAccess = errors.New("Error accessing the flowController map.") // NewFlowControlManager creates a new flow control manager -func NewFlowControlManager(connectionParametersManager *handshake.ConnectionParametersManager) FlowControlManager { +func NewFlowControlManager(connectionParametersManager *handshake.ConnectionParametersManager, rttStats *congestion.RTTStats) FlowControlManager { fcm := flowControlManager{ connectionParametersManager: connectionParametersManager, + rttStats: rttStats, streamFlowController: make(map[protocol.StreamID]*flowController), contributesToConnectionFlowControl: make(map[protocol.StreamID]bool), } // initialize connection level flow controller - fcm.streamFlowController[0] = newFlowController(0, connectionParametersManager) + fcm.streamFlowController[0] = newFlowController(0, connectionParametersManager, rttStats) fcm.contributesToConnectionFlowControl[0] = false return &fcm } @@ -47,7 +51,7 @@ func (f *flowControlManager) NewStream(streamID protocol.StreamID, contributesTo return } - f.streamFlowController[streamID] = newFlowController(streamID, f.connectionParametersManager) + f.streamFlowController[streamID] = newFlowController(streamID, f.connectionParametersManager, f.rttStats) f.contributesToConnectionFlowControl[streamID] = contributesToConnectionFlow } diff --git a/flowcontrol/flow_control_manager_test.go b/flowcontrol/flow_control_manager_test.go index aaebfbcf..362e5849 100644 --- a/flowcontrol/flow_control_manager_test.go +++ b/flowcontrol/flow_control_manager_test.go @@ -1,6 +1,7 @@ package flowcontrol import ( + "github.com/lucas-clemente/quic-go/congestion" "github.com/lucas-clemente/quic-go/handshake" "github.com/lucas-clemente/quic-go/protocol" . "github.com/onsi/ginkgo" @@ -15,7 +16,7 @@ var _ = Describe("Flow Control Manager", func() { cpm = &handshake.ConnectionParametersManager{} setConnectionParametersManagerWindow(cpm, "receiveStreamFlowControlWindow", 0x100) setConnectionParametersManagerWindow(cpm, "receiveConnectionFlowControlWindow", 0x200) - fcm = NewFlowControlManager(cpm).(*flowControlManager) + fcm = NewFlowControlManager(cpm, &congestion.RTTStats{}).(*flowControlManager) }) It("creates a connection level flow controller", func() { diff --git a/flowcontrol/flow_controller.go b/flowcontrol/flow_controller.go index 21020ad9..562369aa 100644 --- a/flowcontrol/flow_controller.go +++ b/flowcontrol/flow_controller.go @@ -1,6 +1,7 @@ package flowcontrol import ( + "github.com/lucas-clemente/quic-go/congestion" "github.com/lucas-clemente/quic-go/handshake" "github.com/lucas-clemente/quic-go/protocol" ) @@ -9,6 +10,7 @@ type flowController struct { streamID protocol.StreamID connectionParametersManager *handshake.ConnectionParametersManager + rttStats *congestion.RTTStats bytesSent protocol.ByteCount sendFlowControlWindow protocol.ByteCount @@ -20,10 +22,11 @@ type flowController struct { } // newFlowController gets a new flow controller -func newFlowController(streamID protocol.StreamID, connectionParametersManager *handshake.ConnectionParametersManager) *flowController { +func newFlowController(streamID protocol.StreamID, connectionParametersManager *handshake.ConnectionParametersManager, rttStats *congestion.RTTStats) *flowController { fc := flowController{ streamID: streamID, connectionParametersManager: connectionParametersManager, + rttStats: rttStats, } if streamID == 0 { diff --git a/flowcontrol/flow_controller_test.go b/flowcontrol/flow_controller_test.go index e2489788..383fc578 100644 --- a/flowcontrol/flow_controller_test.go +++ b/flowcontrol/flow_controller_test.go @@ -4,6 +4,7 @@ import ( "reflect" "unsafe" + "github.com/lucas-clemente/quic-go/congestion" "github.com/lucas-clemente/quic-go/handshake" "github.com/lucas-clemente/quic-go/protocol" . "github.com/onsi/ginkgo" @@ -25,9 +26,11 @@ var _ = Describe("Flow controller", func() { Context("Constructor", func() { var cpm *handshake.ConnectionParametersManager + var rttStats *congestion.RTTStats BeforeEach(func() { cpm = &handshake.ConnectionParametersManager{} + rttStats = &congestion.RTTStats{} setConnectionParametersManagerWindow(cpm, "sendStreamFlowControlWindow", 1000) setConnectionParametersManagerWindow(cpm, "receiveStreamFlowControlWindow", 2000) setConnectionParametersManagerWindow(cpm, "sendConnectionFlowControlWindow", 3000) @@ -35,24 +38,24 @@ var _ = Describe("Flow controller", func() { }) It("reads the stream send and receive windows when acting as stream-level flow controller", func() { - fc := newFlowController(5, cpm) + fc := newFlowController(5, cpm, rttStats) Expect(fc.streamID).To(Equal(protocol.StreamID(5))) Expect(fc.receiveFlowControlWindow).To(Equal(protocol.ByteCount(2000))) }) It("reads the stream send and receive windows when acting as stream-level flow controller", func() { - fc := newFlowController(0, cpm) + fc := newFlowController(0, cpm, rttStats) Expect(fc.streamID).To(Equal(protocol.StreamID(0))) Expect(fc.receiveFlowControlWindow).To(Equal(protocol.ByteCount(4000))) }) It("does not set the stream flow control windows for sending", func() { - fc := newFlowController(5, cpm) + fc := newFlowController(5, cpm, rttStats) Expect(fc.sendFlowControlWindow).To(BeZero()) }) It("does not set the connection flow control windows for sending", func() { - fc := newFlowController(0, cpm) + fc := newFlowController(0, cpm, rttStats) Expect(fc.sendFlowControlWindow).To(BeZero()) }) }) diff --git a/session.go b/session.go index 376538f5..3d13025d 100644 --- a/session.go +++ b/session.go @@ -95,7 +95,6 @@ type Session struct { // newSession makes a new session func newSession(conn connection, v protocol.VersionNumber, connectionID protocol.ConnectionID, sCfg *handshake.ServerConfig, streamCallback StreamCallback, closeCallback closeCallback) (packetHandler, error) { connectionParametersManager := handshake.NewConnectionParamatersManager() - flowControlManager := flowcontrol.NewFlowControlManager(connectionParametersManager) var sentPacketHandler ackhandler.SentPacketHandler var receivedPacketHandler ackhandler.ReceivedPacketHandler @@ -104,6 +103,7 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol sentPacketHandler = ackhandler.NewSentPacketHandler(rttStats) receivedPacketHandler = ackhandler.NewReceivedPacketHandler() + flowControlManager := flowcontrol.NewFlowControlManager(connectionParametersManager, rttStats) now := time.Now() session := &Session{ diff --git a/stream_test.go b/stream_test.go index ce73b231..1eaa5479 100644 --- a/stream_test.go +++ b/stream_test.go @@ -5,6 +5,7 @@ import ( "io" "time" + "github.com/lucas-clemente/quic-go/congestion" "github.com/lucas-clemente/quic-go/flowcontrol" "github.com/lucas-clemente/quic-go/frames" "github.com/lucas-clemente/quic-go/handshake" @@ -110,7 +111,7 @@ var _ = Describe("Stream", func() { onDataCalled = false var streamID protocol.StreamID = 1337 cpm := handshake.NewConnectionParamatersManager() - flowControlManager := flowcontrol.NewFlowControlManager(cpm) + flowControlManager := flowcontrol.NewFlowControlManager(cpm, &congestion.RTTStats{}) flowControlManager.NewStream(streamID, true) str, _ = newStream(streamID, onData, flowControlManager) })