From 43621c9c25c0b41e5bc5a66573268a99e8618ead Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 14 May 2016 15:25:51 +0700 Subject: [PATCH] keep separate flow control windows for sending and receiving in ConnectionParametersManager work towards #19, #20, #39 --- handshake/connection_parameters_manager.go | 86 ++++++++++++++----- .../connection_parameters_manager_test.go | 83 +++++++++++++++--- protocol/protocol.go | 4 + protocol/server_parameters.go | 8 ++ stream.go | 6 +- 5 files changed, 146 insertions(+), 41 deletions(-) diff --git a/handshake/connection_parameters_manager.go b/handshake/connection_parameters_manager.go index 429b1bba..2d153f25 100644 --- a/handshake/connection_parameters_manager.go +++ b/handshake/connection_parameters_manager.go @@ -8,15 +8,20 @@ import ( "time" "github.com/lucas-clemente/quic-go/protocol" + "github.com/lucas-clemente/quic-go/utils" ) // ConnectionParametersManager stores the connection parameters // Warning: Writes may only be done from the crypto stream, see the comment // in GetSHLOMap(). -// TODO: Separate our SFCW from the client's type ConnectionParametersManager struct { params map[Tag][]byte mutex sync.RWMutex + + sendStreamFlowControlWindow protocol.ByteCount + sendConnectionFlowControlWindow protocol.ByteCount + receiveStreamFlowControlWindow protocol.ByteCount + receiveConnectionFlowControlWindow protocol.ByteCount } // ErrTagNotInConnectionParameterMap is returned when a tag is not present in the connection parameters @@ -26,29 +31,45 @@ var ErrTagNotInConnectionParameterMap = errors.New("Tag not found in Connections func NewConnectionParamatersManager() *ConnectionParametersManager { return &ConnectionParametersManager{ params: map[Tag][]byte{ - TagSFCW: {0x0, 0x40, 0x0, 0x0}, // Stream Flow Control Window - TagCFCW: {0x0, 0x40, 0x0, 0x0}, // Connection Flow Control Window TagICSL: {0x1e, 0x00, 0x00, 0x00}, // idle connection state lifetime = 30s TagMSPC: {0x64, 0x00, 0x00, 0x00}, // Max streams per connection = 100 }, + sendStreamFlowControlWindow: protocol.InitialStreamFlowControlWindow, // can only be changed by the client + sendConnectionFlowControlWindow: protocol.InitialConnectionFlowControlWindow, // can only be changed by the client + receiveStreamFlowControlWindow: protocol.ReceiveStreamFlowControlWindow, + receiveConnectionFlowControlWindow: protocol.ReceiveConnectionFlowControlWindow, } } // SetFromMap reads all params func (h *ConnectionParametersManager) SetFromMap(params map[Tag][]byte) error { h.mutex.Lock() + defer h.mutex.Unlock() + for key, value := range params { switch key { - case TagSFCW, TagCFCW, TagICSL, TagMSPC, TagTCID: + case TagICSL, TagMSPC, TagTCID: h.params[key] = value + case TagSFCW: + sendStreamFlowControlWindow, err := utils.ReadUint32(bytes.NewBuffer(value)) + if err != nil { + return err + } + h.sendStreamFlowControlWindow = protocol.ByteCount(sendStreamFlowControlWindow) + case TagCFCW: + sendConnectionFlowControlWindow, err := utils.ReadUint32(bytes.NewBuffer(value)) + if err != nil { + return err + } + h.sendConnectionFlowControlWindow = protocol.ByteCount(sendConnectionFlowControlWindow) } } - h.mutex.Unlock() + return nil } -// GetRawValue gets the byte-slice for a tag -func (h *ConnectionParametersManager) GetRawValue(tag Tag) ([]byte, error) { +// getRawValue gets the byte-slice for a tag +func (h *ConnectionParametersManager) getRawValue(tag Tag) ([]byte, error) { h.mutex.RLock() rawValue, ok := h.params[tag] h.mutex.RUnlock() @@ -61,33 +82,54 @@ func (h *ConnectionParametersManager) GetRawValue(tag Tag) ([]byte, error) { // GetSHLOMap gets all values (except crypto values) needed for the SHLO func (h *ConnectionParametersManager) GetSHLOMap() map[Tag][]byte { + sfcw := bytes.NewBuffer([]byte{}) + cfcw := bytes.NewBuffer([]byte{}) + utils.WriteUint32(sfcw, uint32(h.GetReceiveStreamFlowControlWindow())) + utils.WriteUint32(cfcw, uint32(h.GetReceiveConnectionFlowControlWindow())) + return map[Tag][]byte{ TagICSL: []byte{0x1e, 0x00, 0x00, 0x00}, //30 TagMSPC: []byte{0x64, 0x00, 0x00, 0x00}, //100 + TagCFCW: cfcw.Bytes(), + TagSFCW: sfcw.Bytes(), } } -// GetStreamFlowControlWindow gets the size of the stream-level flow control window -func (h *ConnectionParametersManager) GetStreamFlowControlWindow() (protocol.ByteCount, error) { - rawValue, err := h.GetRawValue(TagSFCW) +// GetSendStreamFlowControlWindow gets the size of the stream-level flow control window for sending data +func (h *ConnectionParametersManager) GetSendStreamFlowControlWindow() protocol.ByteCount { + h.mutex.RLock() + defer h.mutex.RUnlock() - if err != nil { - return 0, err - } + return h.sendStreamFlowControlWindow +} - var value uint32 - buf := bytes.NewBuffer(rawValue) - err = binary.Read(buf, binary.LittleEndian, &value) - if err != nil { - return 0, err - } +// GetSendConnectionFlowControlWindow gets the size of the stream-level flow control window for sending data +func (h *ConnectionParametersManager) GetSendConnectionFlowControlWindow() protocol.ByteCount { + h.mutex.RLock() + defer h.mutex.RUnlock() - return protocol.ByteCount(value), nil + return h.sendConnectionFlowControlWindow +} + +// GetReceiveStreamFlowControlWindow gets the size of the stream-level flow control window for receiving data +func (h *ConnectionParametersManager) GetReceiveStreamFlowControlWindow() protocol.ByteCount { + h.mutex.RLock() + defer h.mutex.RUnlock() + + return h.receiveStreamFlowControlWindow +} + +// GetReceiveConnectionFlowControlWindow gets the size of the stream-level flow control window for receiving data +func (h *ConnectionParametersManager) GetReceiveConnectionFlowControlWindow() protocol.ByteCount { + h.mutex.RLock() + defer h.mutex.RUnlock() + + return h.receiveConnectionFlowControlWindow } // GetIdleConnectionStateLifetime gets the idle timeout func (h *ConnectionParametersManager) GetIdleConnectionStateLifetime() time.Duration { - rawValue, err := h.GetRawValue(TagICSL) + rawValue, err := h.getRawValue(TagICSL) if err != nil { panic("ConnectionParameters: Could not find ICSL") } @@ -99,7 +141,7 @@ func (h *ConnectionParametersManager) GetIdleConnectionStateLifetime() time.Dura // TruncateConnectionID determines if the client requests truncated ConnectionIDs func (h *ConnectionParametersManager) TruncateConnectionID() bool { - rawValue, err := h.GetRawValue(TagTCID) + rawValue, err := h.getRawValue(TagTCID) if err != nil { return false } diff --git a/handshake/connection_parameters_manager_test.go b/handshake/connection_parameters_manager_test.go index 8508df83..02d0a5b6 100644 --- a/handshake/connection_parameters_manager_test.go +++ b/handshake/connection_parameters_manager_test.go @@ -22,21 +22,37 @@ var _ = Describe("ConnectionsParameterManager", func() { cpm.SetFromMap(values) - val, err := cpm.GetRawValue(TagICSL) + val, err := cpm.getRawValue(TagICSL) Expect(err).ToNot(HaveOccurred()) Expect(val).To(Equal(icsl)) }) It("returns an error for a tag that is not set", func() { - _, err := cpm.GetRawValue(TagKEXS) + _, err := cpm.getRawValue(TagKEXS) Expect(err).To(HaveOccurred()) Expect(err).To(Equal(ErrTagNotInConnectionParameterMap)) }) - It("returns all parameters necessary for the SHLO", func() { - entryMap := cpm.GetSHLOMap() - Expect(entryMap).To(HaveKey(TagICSL)) - Expect(entryMap).To(HaveKey(TagMSPC)) + Context("SHLO", func() { + It("returns all parameters necessary for the SHLO", func() { + entryMap := cpm.GetSHLOMap() + Expect(entryMap).To(HaveKey(TagICSL)) + Expect(entryMap).To(HaveKey(TagMSPC)) + }) + + It("returns stream-level flow control windows in SHLO", func() { + cpm.receiveStreamFlowControlWindow = 0xDEADBEEF + entryMap := cpm.GetSHLOMap() + Expect(entryMap).To(HaveKey(TagSFCW)) + Expect(entryMap[TagSFCW]).To(Equal([]byte{0xEF, 0xBE, 0xAD, 0xDE})) + }) + + It("returns connection-level flow control windows in SHLO", func() { + cpm.receiveConnectionFlowControlWindow = 0xDECAFBAD + entryMap := cpm.GetSHLOMap() + Expect(entryMap).To(HaveKey(TagCFCW)) + Expect(entryMap[TagCFCW]).To(Equal([]byte{0xAD, 0xFB, 0xCA, 0xDE})) + }) }) Context("Truncated connection IDs", func() { @@ -54,17 +70,56 @@ var _ = Describe("ConnectionsParameterManager", func() { }) Context("flow control", func() { - It("has the correct default flow control window", func() { - val, err := cpm.GetStreamFlowControlWindow() - Expect(err).ToNot(HaveOccurred()) - Expect(val).To(Equal(protocol.ByteCount(0x4000))) + It("has the correct default stream-level flow control window for sending", func() { + Expect(cpm.GetSendStreamFlowControlWindow()).To(Equal(protocol.InitialStreamFlowControlWindow)) }) - It("reads the stream-level flowControlWindow", func() { - cpm.params[TagSFCW] = []byte{0xDE, 0xAD, 0xBE, 0xEF} - val, err := cpm.GetStreamFlowControlWindow() + It("has the correct default connection-level flow control window for sending", func() { + Expect(cpm.GetSendConnectionFlowControlWindow()).To(Equal(protocol.InitialConnectionFlowControlWindow)) + }) + + It("has the correct default stream-level flow control window for receiving", func() { + Expect(cpm.GetReceiveStreamFlowControlWindow()).To(Equal(protocol.ReceiveStreamFlowControlWindow)) + }) + + It("has the correct default connection-level flow control window for receiving", func() { + Expect(cpm.GetReceiveConnectionFlowControlWindow()).To(Equal(protocol.ReceiveConnectionFlowControlWindow)) + }) + + It("sets a new stream-level flow control window for sending", func() { + values := map[Tag][]byte{ + TagSFCW: []byte{0xDE, 0xAD, 0xBE, 0xEF}, + } + err := cpm.SetFromMap(values) Expect(err).ToNot(HaveOccurred()) - Expect(val).To(Equal(protocol.ByteCount(0xEFBEADDE))) + Expect(cpm.GetSendStreamFlowControlWindow()).To(Equal(protocol.ByteCount(0xEFBEADDE))) + }) + + It("does not change the stream-level flow control window when given an invalid value", func() { + values := map[Tag][]byte{ + TagSFCW: []byte{0xDE, 0xAD, 0xBE}, // 1 byte too short + } + err := cpm.SetFromMap(values) + Expect(err).To(HaveOccurred()) + Expect(cpm.GetSendStreamFlowControlWindow()).To(Equal(protocol.InitialStreamFlowControlWindow)) + }) + + It("sets a new connection-level flow control window for sending", func() { + values := map[Tag][]byte{ + TagCFCW: []byte{0xDE, 0xAD, 0xBE, 0xEF}, + } + err := cpm.SetFromMap(values) + Expect(err).ToNot(HaveOccurred()) + Expect(cpm.GetSendConnectionFlowControlWindow()).To(Equal(protocol.ByteCount(0xEFBEADDE))) + }) + + It("does not change the connection-level flow control window when given an invalid value", func() { + values := map[Tag][]byte{ + TagSFCW: []byte{0xDE, 0xAD, 0xBE}, // 1 byte too short + } + err := cpm.SetFromMap(values) + Expect(err).To(HaveOccurred()) + Expect(cpm.GetSendStreamFlowControlWindow()).To(Equal(protocol.InitialConnectionFlowControlWindow)) }) }) diff --git a/protocol/protocol.go b/protocol/protocol.go index 619f57c0..012307ad 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -41,4 +41,8 @@ const MaxFrameAndPublicHeaderSize = MaxPacketSize - 1 /*private header*/ - 12 /* // Used in QUIC for congestion window computations in bytes. const DefaultTCPMSS ByteCount = 1460 +// InitialStreamFlowControlWindow is the initial stream-level flow control window for sending +const InitialStreamFlowControlWindow ByteCount = (1 << 14) // 16 kB +// InitialConnectionFlowControlWindow is the initial connection-level flow control window for sending +const InitialConnectionFlowControlWindow ByteCount = (1 << 14) // 16 kB diff --git a/protocol/server_parameters.go b/protocol/server_parameters.go index 475a9135..c0dccf5b 100644 --- a/protocol/server_parameters.go +++ b/protocol/server_parameters.go @@ -23,3 +23,11 @@ const SmallPacketPayloadSizeThreshold = MaxPacketSize / 2 // SmallPacketSendDelay is the time delay applied to small packets const SmallPacketSendDelay = 500 * time.Microsecond + +// ReceiveStreamFlowControlWindow is the stream-level flow control window for receiving data +// TODO: set a reasonable value here +const ReceiveStreamFlowControlWindow ByteCount = (1 << 20) // 1 MB + +// ReceiveConnectionFlowControlWindow is the stream-level flow control window for receiving data +// TODO: set a reasonable value here +const ReceiveConnectionFlowControlWindow ByteCount = (1 << 20) // 1 MB diff --git a/stream.go b/stream.go index 27a53db4..66c8b432 100644 --- a/stream.go +++ b/stream.go @@ -47,11 +47,7 @@ func newStream(session streamHandler, connectionParameterManager *handshake.Conn s.newFrameOrErrCond.L = &s.mutex s.windowUpdateOrErrCond.L = &s.mutex - var err error - s.flowControlWindow, err = connectionParameterManager.GetStreamFlowControlWindow() - if err != nil { - return nil, err - } + s.flowControlWindow = connectionParameterManager.GetSendStreamFlowControlWindow() return s, nil }