set initial flow control window from handshake parameters

fixes #50
This commit is contained in:
Marten Seemann
2016-05-05 11:29:12 +07:00
parent d178a02ad8
commit 77f34a9207
6 changed files with 70 additions and 12 deletions

View File

@@ -1,6 +1,8 @@
package handshake
import (
"bytes"
"encoding/binary"
"errors"
"sync"
)
@@ -19,6 +21,12 @@ func NewConnectionParamatersManager() *ConnectionParametersManager {
cpm := &ConnectionParametersManager{
params: make(map[Tag][]byte),
}
// set default parameters
cpm.mutex.Lock()
cpm.params[TagSFCW] = []byte{0x0, 0x40, 0x0, 0x0} // Stream Flow Control Window
cpm.params[TagCFCW] = []byte{0x0, 0x40, 0x0, 0x0} // Connection Flow Control WindowWindow
cpm.mutex.Unlock()
return cpm
}
@@ -51,3 +59,21 @@ func (h *ConnectionParametersManager) GetSHLOMap() map[Tag][]byte {
TagMSPC: []byte{0x64, 0x00, 0x00, 0x00}, //100
}
}
// GetStreamFlowControlWindow gets the size of the stream-level flow control window
func (h *ConnectionParametersManager) GetStreamFlowControlWindow() (uint32, error) {
rawValue, err := h.GetRawValue(TagSFCW)
if err != nil {
return 0, err
}
var value uint32
buf := bytes.NewBuffer(rawValue)
err = binary.Read(buf, binary.LittleEndian, &value)
if err != nil {
return 0, err
}
return value, nil
}

View File

@@ -42,4 +42,18 @@ var _ = Describe("ConnectionsParameterManager", func() {
Expect(entryMap).To(HaveKey(TagMSPC))
})
Context("flow control", func() {
It("has the correct default flow control window", func() {
val, err := cpm.GetStreamFlowControlWindow()
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(uint32(0x4000)))
})
It("reads the stream-level flowControlWindow", func() {
cpm.params[TagSFCW] = []byte{0xDE, 0xAD, 0xBE, 0xEF}
val, err := cpm.GetStreamFlowControlWindow()
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(uint32(0xEFBEADDE)))
})
})
})

View File

@@ -395,7 +395,12 @@ func (s *Session) QueueStreamFrame(frame *frames.StreamFrame) error {
func (s *Session) NewStream(id protocol.StreamID) (utils.Stream, error) {
s.streamsMutex.Lock()
defer s.streamsMutex.Unlock()
stream := newStream(s, id)
stream, err := newStream(s, s.connectionParametersManager, id)
if err != nil {
return nil, err
}
if s.streams[id] != nil {
return nil, fmt.Errorf("Session: stream with ID %d already exists", id)
}

View File

@@ -44,6 +44,7 @@ var _ = Describe("Session", func() {
conn: conn,
streams: make(map[protocol.StreamID]*stream),
streamCallback: func(*Session, utils.Stream) { callbackCalled = true },
connectionParametersManager: handshake.NewConnectionParamatersManager(),
}
})

View File

@@ -6,6 +6,7 @@ import (
"sync/atomic"
"github.com/lucas-clemente/quic-go/frames"
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/utils"
)
@@ -29,19 +30,28 @@ type stream struct {
remoteErr error
currentErr error
connectionParameterManager *handshake.ConnectionParametersManager
flowControlWindow uint64
windowUpdateCond *sync.Cond
}
// newStream creates a new Stream
func newStream(session streamHandler, StreamID protocol.StreamID) *stream {
return &stream{
func newStream(session streamHandler, connectionParameterManager *handshake.ConnectionParametersManager, StreamID protocol.StreamID) (*stream, error) {
s := &stream{
session: session,
streamID: StreamID,
streamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number
flowControlWindow: 0x4000, // 16 byte, TODO: read this from the negotiated connection parameters (TagCFCW)
connectionParameterManager: connectionParameterManager,
windowUpdateCond: sync.NewCond(&sync.Mutex{}),
}
flowControlWindow, err := connectionParameterManager.GetStreamFlowControlWindow()
if err != nil {
return nil, err
}
s.flowControlWindow = uint64(flowControlWindow)
return s, nil
}
// Read implements io.Reader

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/lucas-clemente/quic-go/frames"
"github.com/lucas-clemente/quic-go/handshake"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
@@ -27,7 +28,8 @@ var _ = Describe("Stream", func() {
BeforeEach(func() {
handler = &mockStreamHandler{}
str = newStream(handler, 1337)
cpm := handshake.NewConnectionParamatersManager()
str, _ = newStream(handler, cpm, 1337)
})
Context("reading", func() {