From 77f34a9207bef6d5389c7c3da1368b900481bda3 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 5 May 2016 11:29:12 +0700 Subject: [PATCH] set initial flow control window from handshake parameters fixes #50 --- handshake/connection_parameters_manager.go | 26 +++++++++++++++++++ .../connection_parameters_manager_test.go | 14 ++++++++++ session.go | 7 ++++- session_test.go | 7 ++--- stream.go | 24 ++++++++++++----- stream_test.go | 4 ++- 6 files changed, 70 insertions(+), 12 deletions(-) diff --git a/handshake/connection_parameters_manager.go b/handshake/connection_parameters_manager.go index 5161c74f..16f5996a 100644 --- a/handshake/connection_parameters_manager.go +++ b/handshake/connection_parameters_manager.go @@ -1,6 +1,8 @@ package handshake import ( + "bytes" + "encoding/binary" "errors" "sync" ) @@ -19,6 +21,12 @@ func NewConnectionParamatersManager() *ConnectionParametersManager { cpm := &ConnectionParametersManager{ params: make(map[Tag][]byte), } + + // set default parameters + cpm.mutex.Lock() + cpm.params[TagSFCW] = []byte{0x0, 0x40, 0x0, 0x0} // Stream Flow Control Window + cpm.params[TagCFCW] = []byte{0x0, 0x40, 0x0, 0x0} // Connection Flow Control WindowWindow + cpm.mutex.Unlock() return cpm } @@ -51,3 +59,21 @@ func (h *ConnectionParametersManager) GetSHLOMap() map[Tag][]byte { TagMSPC: []byte{0x64, 0x00, 0x00, 0x00}, //100 } } + +// GetStreamFlowControlWindow gets the size of the stream-level flow control window +func (h *ConnectionParametersManager) GetStreamFlowControlWindow() (uint32, error) { + rawValue, err := h.GetRawValue(TagSFCW) + + if err != nil { + return 0, err + } + + var value uint32 + buf := bytes.NewBuffer(rawValue) + err = binary.Read(buf, binary.LittleEndian, &value) + if err != nil { + return 0, err + } + + return value, nil +} diff --git a/handshake/connection_parameters_manager_test.go b/handshake/connection_parameters_manager_test.go index 7d71196a..cec6f8b6 100644 --- a/handshake/connection_parameters_manager_test.go +++ b/handshake/connection_parameters_manager_test.go @@ -42,4 +42,18 @@ var _ = Describe("ConnectionsParameterManager", func() { Expect(entryMap).To(HaveKey(TagMSPC)) }) + Context("flow control", func() { + It("has the correct default flow control window", func() { + val, err := cpm.GetStreamFlowControlWindow() + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(uint32(0x4000))) + }) + + It("reads the stream-level flowControlWindow", func() { + cpm.params[TagSFCW] = []byte{0xDE, 0xAD, 0xBE, 0xEF} + val, err := cpm.GetStreamFlowControlWindow() + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(uint32(0xEFBEADDE))) + }) + }) }) diff --git a/session.go b/session.go index df44fcb1..6bd39bc3 100644 --- a/session.go +++ b/session.go @@ -395,7 +395,12 @@ func (s *Session) QueueStreamFrame(frame *frames.StreamFrame) error { func (s *Session) NewStream(id protocol.StreamID) (utils.Stream, error) { s.streamsMutex.Lock() defer s.streamsMutex.Unlock() - stream := newStream(s, id) + stream, err := newStream(s, s.connectionParametersManager, id) + + if err != nil { + return nil, err + } + if s.streams[id] != nil { return nil, fmt.Errorf("Session: stream with ID %d already exists", id) } diff --git a/session_test.go b/session_test.go index 68607290..7a4f90eb 100644 --- a/session_test.go +++ b/session_test.go @@ -41,9 +41,10 @@ var _ = Describe("Session", func() { conn = &mockConnection{} callbackCalled = false session = &Session{ - conn: conn, - streams: make(map[protocol.StreamID]*stream), - streamCallback: func(*Session, utils.Stream) { callbackCalled = true }, + conn: conn, + streams: make(map[protocol.StreamID]*stream), + streamCallback: func(*Session, utils.Stream) { callbackCalled = true }, + connectionParametersManager: handshake.NewConnectionParamatersManager(), } }) diff --git a/stream.go b/stream.go index 57cc60fe..23a3a4ee 100644 --- a/stream.go +++ b/stream.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "github.com/lucas-clemente/quic-go/frames" + "github.com/lucas-clemente/quic-go/handshake" "github.com/lucas-clemente/quic-go/protocol" "github.com/lucas-clemente/quic-go/utils" ) @@ -29,19 +30,28 @@ type stream struct { remoteErr error currentErr error + connectionParameterManager *handshake.ConnectionParametersManager + flowControlWindow uint64 windowUpdateCond *sync.Cond } // newStream creates a new Stream -func newStream(session streamHandler, StreamID protocol.StreamID) *stream { - return &stream{ - session: session, - streamID: StreamID, - streamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number - flowControlWindow: 0x4000, // 16 byte, TODO: read this from the negotiated connection parameters (TagCFCW) - windowUpdateCond: sync.NewCond(&sync.Mutex{}), +func newStream(session streamHandler, connectionParameterManager *handshake.ConnectionParametersManager, StreamID protocol.StreamID) (*stream, error) { + s := &stream{ + session: session, + streamID: StreamID, + streamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number + connectionParameterManager: connectionParameterManager, + windowUpdateCond: sync.NewCond(&sync.Mutex{}), } + + flowControlWindow, err := connectionParameterManager.GetStreamFlowControlWindow() + if err != nil { + return nil, err + } + s.flowControlWindow = uint64(flowControlWindow) + return s, nil } // Read implements io.Reader diff --git a/stream_test.go b/stream_test.go index 2ac08115..46a229d9 100644 --- a/stream_test.go +++ b/stream_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/lucas-clemente/quic-go/frames" + "github.com/lucas-clemente/quic-go/handshake" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -27,7 +28,8 @@ var _ = Describe("Stream", func() { BeforeEach(func() { handler = &mockStreamHandler{} - str = newStream(handler, 1337) + cpm := handshake.NewConnectionParamatersManager() + str, _ = newStream(handler, cpm, 1337) }) Context("reading", func() {