pass RTTStats to the FlowControllers

ref #106
This commit is contained in:
Marten Seemann
2016-10-30 17:36:36 +07:00
parent c8b7246159
commit 32d89eee02
6 changed files with 24 additions and 12 deletions

View File

@@ -4,13 +4,16 @@ import (
"errors"
"sync"
"github.com/lucas-clemente/quic-go/congestion"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/utils"
)
type flowControlManager struct {
connectionParametersManager *handshake.ConnectionParametersManager
connectionParametersManager *handshake.ConnectionParametersManager
rttStats *congestion.RTTStats
streamFlowController map[protocol.StreamID]*flowController
contributesToConnectionFlowControl map[protocol.StreamID]bool
mutex sync.RWMutex
@@ -26,14 +29,15 @@ var (
var errMapAccess = errors.New("Error accessing the flowController map.")
// NewFlowControlManager creates a new flow control manager
func NewFlowControlManager(connectionParametersManager *handshake.ConnectionParametersManager) FlowControlManager {
func NewFlowControlManager(connectionParametersManager *handshake.ConnectionParametersManager, rttStats *congestion.RTTStats) FlowControlManager {
fcm := flowControlManager{
connectionParametersManager: connectionParametersManager,
rttStats: rttStats,
streamFlowController: make(map[protocol.StreamID]*flowController),
contributesToConnectionFlowControl: make(map[protocol.StreamID]bool),
}
// initialize connection level flow controller
fcm.streamFlowController[0] = newFlowController(0, connectionParametersManager)
fcm.streamFlowController[0] = newFlowController(0, connectionParametersManager, rttStats)
fcm.contributesToConnectionFlowControl[0] = false
return &fcm
}
@@ -47,7 +51,7 @@ func (f *flowControlManager) NewStream(streamID protocol.StreamID, contributesTo
return
}
f.streamFlowController[streamID] = newFlowController(streamID, f.connectionParametersManager)
f.streamFlowController[streamID] = newFlowController(streamID, f.connectionParametersManager, f.rttStats)
f.contributesToConnectionFlowControl[streamID] = contributesToConnectionFlow
}

View File

@@ -1,6 +1,7 @@
package flowcontrol
import (
"github.com/lucas-clemente/quic-go/congestion"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
. "github.com/onsi/ginkgo"
@@ -15,7 +16,7 @@ var _ = Describe("Flow Control Manager", func() {
cpm = &handshake.ConnectionParametersManager{}
setConnectionParametersManagerWindow(cpm, "receiveStreamFlowControlWindow", 0x100)
setConnectionParametersManagerWindow(cpm, "receiveConnectionFlowControlWindow", 0x200)
fcm = NewFlowControlManager(cpm).(*flowControlManager)
fcm = NewFlowControlManager(cpm, &congestion.RTTStats{}).(*flowControlManager)
})
It("creates a connection level flow controller", func() {

View File

@@ -1,6 +1,7 @@
package flowcontrol
import (
"github.com/lucas-clemente/quic-go/congestion"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
)
@@ -9,6 +10,7 @@ type flowController struct {
streamID protocol.StreamID
connectionParametersManager *handshake.ConnectionParametersManager
rttStats *congestion.RTTStats
bytesSent protocol.ByteCount
sendFlowControlWindow protocol.ByteCount
@@ -20,10 +22,11 @@ type flowController struct {
}
// newFlowController gets a new flow controller
func newFlowController(streamID protocol.StreamID, connectionParametersManager *handshake.ConnectionParametersManager) *flowController {
func newFlowController(streamID protocol.StreamID, connectionParametersManager *handshake.ConnectionParametersManager, rttStats *congestion.RTTStats) *flowController {
fc := flowController{
streamID: streamID,
connectionParametersManager: connectionParametersManager,
rttStats: rttStats,
}
if streamID == 0 {

View File

@@ -4,6 +4,7 @@ import (
"reflect"
"unsafe"
"github.com/lucas-clemente/quic-go/congestion"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
. "github.com/onsi/ginkgo"
@@ -25,9 +26,11 @@ var _ = Describe("Flow controller", func() {
Context("Constructor", func() {
var cpm *handshake.ConnectionParametersManager
var rttStats *congestion.RTTStats
BeforeEach(func() {
cpm = &handshake.ConnectionParametersManager{}
rttStats = &congestion.RTTStats{}
setConnectionParametersManagerWindow(cpm, "sendStreamFlowControlWindow", 1000)
setConnectionParametersManagerWindow(cpm, "receiveStreamFlowControlWindow", 2000)
setConnectionParametersManagerWindow(cpm, "sendConnectionFlowControlWindow", 3000)
@@ -35,24 +38,24 @@ var _ = Describe("Flow controller", func() {
})
It("reads the stream send and receive windows when acting as stream-level flow controller", func() {
fc := newFlowController(5, cpm)
fc := newFlowController(5, cpm, rttStats)
Expect(fc.streamID).To(Equal(protocol.StreamID(5)))
Expect(fc.receiveFlowControlWindow).To(Equal(protocol.ByteCount(2000)))
})
It("reads the stream send and receive windows when acting as stream-level flow controller", func() {
fc := newFlowController(0, cpm)
fc := newFlowController(0, cpm, rttStats)
Expect(fc.streamID).To(Equal(protocol.StreamID(0)))
Expect(fc.receiveFlowControlWindow).To(Equal(protocol.ByteCount(4000)))
})
It("does not set the stream flow control windows for sending", func() {
fc := newFlowController(5, cpm)
fc := newFlowController(5, cpm, rttStats)
Expect(fc.sendFlowControlWindow).To(BeZero())
})
It("does not set the connection flow control windows for sending", func() {
fc := newFlowController(0, cpm)
fc := newFlowController(0, cpm, rttStats)
Expect(fc.sendFlowControlWindow).To(BeZero())
})
})