forked from quic-go/quic-go
keep separate flow control windows for sending and receiving in ConnectionParametersManager
work towards #19, #20, #39
This commit is contained in:
@@ -8,15 +8,20 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/protocol"
|
||||
"github.com/lucas-clemente/quic-go/utils"
|
||||
)
|
||||
|
||||
// ConnectionParametersManager stores the connection parameters
|
||||
// Warning: Writes may only be done from the crypto stream, see the comment
|
||||
// in GetSHLOMap().
|
||||
// TODO: Separate our SFCW from the client's
|
||||
type ConnectionParametersManager struct {
|
||||
params map[Tag][]byte
|
||||
mutex sync.RWMutex
|
||||
|
||||
sendStreamFlowControlWindow protocol.ByteCount
|
||||
sendConnectionFlowControlWindow protocol.ByteCount
|
||||
receiveStreamFlowControlWindow protocol.ByteCount
|
||||
receiveConnectionFlowControlWindow protocol.ByteCount
|
||||
}
|
||||
|
||||
// ErrTagNotInConnectionParameterMap is returned when a tag is not present in the connection parameters
|
||||
@@ -26,29 +31,45 @@ var ErrTagNotInConnectionParameterMap = errors.New("Tag not found in Connections
|
||||
func NewConnectionParamatersManager() *ConnectionParametersManager {
|
||||
return &ConnectionParametersManager{
|
||||
params: map[Tag][]byte{
|
||||
TagSFCW: {0x0, 0x40, 0x0, 0x0}, // Stream Flow Control Window
|
||||
TagCFCW: {0x0, 0x40, 0x0, 0x0}, // Connection Flow Control Window
|
||||
TagICSL: {0x1e, 0x00, 0x00, 0x00}, // idle connection state lifetime = 30s
|
||||
TagMSPC: {0x64, 0x00, 0x00, 0x00}, // Max streams per connection = 100
|
||||
},
|
||||
sendStreamFlowControlWindow: protocol.InitialStreamFlowControlWindow, // can only be changed by the client
|
||||
sendConnectionFlowControlWindow: protocol.InitialConnectionFlowControlWindow, // can only be changed by the client
|
||||
receiveStreamFlowControlWindow: protocol.ReceiveStreamFlowControlWindow,
|
||||
receiveConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow,
|
||||
}
|
||||
}
|
||||
|
||||
// SetFromMap reads all params
|
||||
func (h *ConnectionParametersManager) SetFromMap(params map[Tag][]byte) error {
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
|
||||
for key, value := range params {
|
||||
switch key {
|
||||
case TagSFCW, TagCFCW, TagICSL, TagMSPC, TagTCID:
|
||||
case TagICSL, TagMSPC, TagTCID:
|
||||
h.params[key] = value
|
||||
case TagSFCW:
|
||||
sendStreamFlowControlWindow, err := utils.ReadUint32(bytes.NewBuffer(value))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h.sendStreamFlowControlWindow = protocol.ByteCount(sendStreamFlowControlWindow)
|
||||
case TagCFCW:
|
||||
sendConnectionFlowControlWindow, err := utils.ReadUint32(bytes.NewBuffer(value))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h.sendConnectionFlowControlWindow = protocol.ByteCount(sendConnectionFlowControlWindow)
|
||||
}
|
||||
}
|
||||
h.mutex.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetRawValue gets the byte-slice for a tag
|
||||
func (h *ConnectionParametersManager) GetRawValue(tag Tag) ([]byte, error) {
|
||||
// getRawValue gets the byte-slice for a tag
|
||||
func (h *ConnectionParametersManager) getRawValue(tag Tag) ([]byte, error) {
|
||||
h.mutex.RLock()
|
||||
rawValue, ok := h.params[tag]
|
||||
h.mutex.RUnlock()
|
||||
@@ -61,33 +82,54 @@ func (h *ConnectionParametersManager) GetRawValue(tag Tag) ([]byte, error) {
|
||||
|
||||
// GetSHLOMap gets all values (except crypto values) needed for the SHLO
|
||||
func (h *ConnectionParametersManager) GetSHLOMap() map[Tag][]byte {
|
||||
sfcw := bytes.NewBuffer([]byte{})
|
||||
cfcw := bytes.NewBuffer([]byte{})
|
||||
utils.WriteUint32(sfcw, uint32(h.GetReceiveStreamFlowControlWindow()))
|
||||
utils.WriteUint32(cfcw, uint32(h.GetReceiveConnectionFlowControlWindow()))
|
||||
|
||||
return map[Tag][]byte{
|
||||
TagICSL: []byte{0x1e, 0x00, 0x00, 0x00}, //30
|
||||
TagMSPC: []byte{0x64, 0x00, 0x00, 0x00}, //100
|
||||
TagCFCW: cfcw.Bytes(),
|
||||
TagSFCW: sfcw.Bytes(),
|
||||
}
|
||||
}
|
||||
|
||||
// GetStreamFlowControlWindow gets the size of the stream-level flow control window
|
||||
func (h *ConnectionParametersManager) GetStreamFlowControlWindow() (protocol.ByteCount, error) {
|
||||
rawValue, err := h.GetRawValue(TagSFCW)
|
||||
// GetSendStreamFlowControlWindow gets the size of the stream-level flow control window for sending data
|
||||
func (h *ConnectionParametersManager) GetSendStreamFlowControlWindow() protocol.ByteCount {
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return h.sendStreamFlowControlWindow
|
||||
}
|
||||
|
||||
var value uint32
|
||||
buf := bytes.NewBuffer(rawValue)
|
||||
err = binary.Read(buf, binary.LittleEndian, &value)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// GetSendConnectionFlowControlWindow gets the size of the stream-level flow control window for sending data
|
||||
func (h *ConnectionParametersManager) GetSendConnectionFlowControlWindow() protocol.ByteCount {
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
|
||||
return protocol.ByteCount(value), nil
|
||||
return h.sendConnectionFlowControlWindow
|
||||
}
|
||||
|
||||
// GetReceiveStreamFlowControlWindow gets the size of the stream-level flow control window for receiving data
|
||||
func (h *ConnectionParametersManager) GetReceiveStreamFlowControlWindow() protocol.ByteCount {
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
|
||||
return h.receiveStreamFlowControlWindow
|
||||
}
|
||||
|
||||
// GetReceiveConnectionFlowControlWindow gets the size of the stream-level flow control window for receiving data
|
||||
func (h *ConnectionParametersManager) GetReceiveConnectionFlowControlWindow() protocol.ByteCount {
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
|
||||
return h.receiveConnectionFlowControlWindow
|
||||
}
|
||||
|
||||
// GetIdleConnectionStateLifetime gets the idle timeout
|
||||
func (h *ConnectionParametersManager) GetIdleConnectionStateLifetime() time.Duration {
|
||||
rawValue, err := h.GetRawValue(TagICSL)
|
||||
rawValue, err := h.getRawValue(TagICSL)
|
||||
if err != nil {
|
||||
panic("ConnectionParameters: Could not find ICSL")
|
||||
}
|
||||
@@ -99,7 +141,7 @@ func (h *ConnectionParametersManager) GetIdleConnectionStateLifetime() time.Dura
|
||||
|
||||
// TruncateConnectionID determines if the client requests truncated ConnectionIDs
|
||||
func (h *ConnectionParametersManager) TruncateConnectionID() bool {
|
||||
rawValue, err := h.GetRawValue(TagTCID)
|
||||
rawValue, err := h.getRawValue(TagTCID)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -22,21 +22,37 @@ var _ = Describe("ConnectionsParameterManager", func() {
|
||||
|
||||
cpm.SetFromMap(values)
|
||||
|
||||
val, err := cpm.GetRawValue(TagICSL)
|
||||
val, err := cpm.getRawValue(TagICSL)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(val).To(Equal(icsl))
|
||||
})
|
||||
|
||||
It("returns an error for a tag that is not set", func() {
|
||||
_, err := cpm.GetRawValue(TagKEXS)
|
||||
_, err := cpm.getRawValue(TagKEXS)
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err).To(Equal(ErrTagNotInConnectionParameterMap))
|
||||
})
|
||||
|
||||
It("returns all parameters necessary for the SHLO", func() {
|
||||
entryMap := cpm.GetSHLOMap()
|
||||
Expect(entryMap).To(HaveKey(TagICSL))
|
||||
Expect(entryMap).To(HaveKey(TagMSPC))
|
||||
Context("SHLO", func() {
|
||||
It("returns all parameters necessary for the SHLO", func() {
|
||||
entryMap := cpm.GetSHLOMap()
|
||||
Expect(entryMap).To(HaveKey(TagICSL))
|
||||
Expect(entryMap).To(HaveKey(TagMSPC))
|
||||
})
|
||||
|
||||
It("returns stream-level flow control windows in SHLO", func() {
|
||||
cpm.receiveStreamFlowControlWindow = 0xDEADBEEF
|
||||
entryMap := cpm.GetSHLOMap()
|
||||
Expect(entryMap).To(HaveKey(TagSFCW))
|
||||
Expect(entryMap[TagSFCW]).To(Equal([]byte{0xEF, 0xBE, 0xAD, 0xDE}))
|
||||
})
|
||||
|
||||
It("returns connection-level flow control windows in SHLO", func() {
|
||||
cpm.receiveConnectionFlowControlWindow = 0xDECAFBAD
|
||||
entryMap := cpm.GetSHLOMap()
|
||||
Expect(entryMap).To(HaveKey(TagCFCW))
|
||||
Expect(entryMap[TagCFCW]).To(Equal([]byte{0xAD, 0xFB, 0xCA, 0xDE}))
|
||||
})
|
||||
})
|
||||
|
||||
Context("Truncated connection IDs", func() {
|
||||
@@ -54,17 +70,56 @@ var _ = Describe("ConnectionsParameterManager", func() {
|
||||
})
|
||||
|
||||
Context("flow control", func() {
|
||||
It("has the correct default flow control window", func() {
|
||||
val, err := cpm.GetStreamFlowControlWindow()
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(val).To(Equal(protocol.ByteCount(0x4000)))
|
||||
It("has the correct default stream-level flow control window for sending", func() {
|
||||
Expect(cpm.GetSendStreamFlowControlWindow()).To(Equal(protocol.InitialStreamFlowControlWindow))
|
||||
})
|
||||
|
||||
It("reads the stream-level flowControlWindow", func() {
|
||||
cpm.params[TagSFCW] = []byte{0xDE, 0xAD, 0xBE, 0xEF}
|
||||
val, err := cpm.GetStreamFlowControlWindow()
|
||||
It("has the correct default connection-level flow control window for sending", func() {
|
||||
Expect(cpm.GetSendConnectionFlowControlWindow()).To(Equal(protocol.InitialConnectionFlowControlWindow))
|
||||
})
|
||||
|
||||
It("has the correct default stream-level flow control window for receiving", func() {
|
||||
Expect(cpm.GetReceiveStreamFlowControlWindow()).To(Equal(protocol.ReceiveStreamFlowControlWindow))
|
||||
})
|
||||
|
||||
It("has the correct default connection-level flow control window for receiving", func() {
|
||||
Expect(cpm.GetReceiveConnectionFlowControlWindow()).To(Equal(protocol.ReceiveConnectionFlowControlWindow))
|
||||
})
|
||||
|
||||
It("sets a new stream-level flow control window for sending", func() {
|
||||
values := map[Tag][]byte{
|
||||
TagSFCW: []byte{0xDE, 0xAD, 0xBE, 0xEF},
|
||||
}
|
||||
err := cpm.SetFromMap(values)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(val).To(Equal(protocol.ByteCount(0xEFBEADDE)))
|
||||
Expect(cpm.GetSendStreamFlowControlWindow()).To(Equal(protocol.ByteCount(0xEFBEADDE)))
|
||||
})
|
||||
|
||||
It("does not change the stream-level flow control window when given an invalid value", func() {
|
||||
values := map[Tag][]byte{
|
||||
TagSFCW: []byte{0xDE, 0xAD, 0xBE}, // 1 byte too short
|
||||
}
|
||||
err := cpm.SetFromMap(values)
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(cpm.GetSendStreamFlowControlWindow()).To(Equal(protocol.InitialStreamFlowControlWindow))
|
||||
})
|
||||
|
||||
It("sets a new connection-level flow control window for sending", func() {
|
||||
values := map[Tag][]byte{
|
||||
TagCFCW: []byte{0xDE, 0xAD, 0xBE, 0xEF},
|
||||
}
|
||||
err := cpm.SetFromMap(values)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(cpm.GetSendConnectionFlowControlWindow()).To(Equal(protocol.ByteCount(0xEFBEADDE)))
|
||||
})
|
||||
|
||||
It("does not change the connection-level flow control window when given an invalid value", func() {
|
||||
values := map[Tag][]byte{
|
||||
TagSFCW: []byte{0xDE, 0xAD, 0xBE}, // 1 byte too short
|
||||
}
|
||||
err := cpm.SetFromMap(values)
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(cpm.GetSendStreamFlowControlWindow()).To(Equal(protocol.InitialConnectionFlowControlWindow))
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
@@ -41,4 +41,8 @@ const MaxFrameAndPublicHeaderSize = MaxPacketSize - 1 /*private header*/ - 12 /*
|
||||
// Used in QUIC for congestion window computations in bytes.
|
||||
const DefaultTCPMSS ByteCount = 1460
|
||||
|
||||
// InitialStreamFlowControlWindow is the initial stream-level flow control window for sending
|
||||
const InitialStreamFlowControlWindow ByteCount = (1 << 14) // 16 kB
|
||||
|
||||
// InitialConnectionFlowControlWindow is the initial connection-level flow control window for sending
|
||||
const InitialConnectionFlowControlWindow ByteCount = (1 << 14) // 16 kB
|
||||
|
||||
@@ -23,3 +23,11 @@ const SmallPacketPayloadSizeThreshold = MaxPacketSize / 2
|
||||
|
||||
// SmallPacketSendDelay is the time delay applied to small packets
|
||||
const SmallPacketSendDelay = 500 * time.Microsecond
|
||||
|
||||
// ReceiveStreamFlowControlWindow is the stream-level flow control window for receiving data
|
||||
// TODO: set a reasonable value here
|
||||
const ReceiveStreamFlowControlWindow ByteCount = (1 << 20) // 1 MB
|
||||
|
||||
// ReceiveConnectionFlowControlWindow is the stream-level flow control window for receiving data
|
||||
// TODO: set a reasonable value here
|
||||
const ReceiveConnectionFlowControlWindow ByteCount = (1 << 20) // 1 MB
|
||||
|
||||
@@ -47,11 +47,7 @@ func newStream(session streamHandler, connectionParameterManager *handshake.Conn
|
||||
s.newFrameOrErrCond.L = &s.mutex
|
||||
s.windowUpdateOrErrCond.L = &s.mutex
|
||||
|
||||
var err error
|
||||
s.flowControlWindow, err = connectionParameterManager.GetStreamFlowControlWindow()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.flowControlWindow = connectionParameterManager.GetSendStreamFlowControlWindow()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user