forked from quic-go/quic-go
replace streamFrameQueue with just-in-time framing of written data
This commits replaces the stream frame queue with a framer which requests data from the streams just when a frame is needed by the packet packer. This simplifies a lot of things and allows some other refactorings, see issue #83. There are a few pending tests which will be fixed soon.
This commit is contained in:
155
stream.go
155
stream.go
@@ -14,9 +14,8 @@ import (
|
||||
)
|
||||
|
||||
type streamHandler interface {
|
||||
queueStreamFrame(*frames.StreamFrame) error
|
||||
updateReceiveFlowControlWindow(streamID protocol.StreamID, byteOffset protocol.ByteCount) error
|
||||
streamBlocked(streamID protocol.StreamID, byteOffset protocol.ByteCount)
|
||||
scheduleSending()
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -47,24 +46,22 @@ type stream struct {
|
||||
frameQueue *streamFrameSorter
|
||||
newFrameOrErrCond sync.Cond
|
||||
|
||||
flowControlManager flowcontrol.FlowControlManager
|
||||
// TODO: remove those
|
||||
flowController flowcontrol.FlowController
|
||||
connectionFlowController flowcontrol.FlowController
|
||||
contributesToConnectionFlowControl bool
|
||||
dataForWriting []byte
|
||||
finSent bool
|
||||
doneWritingOrErrCond sync.Cond
|
||||
|
||||
windowUpdateOrErrCond sync.Cond
|
||||
flowControlManager flowcontrol.FlowControlManager
|
||||
// TODO: remove this
|
||||
contributesToConnectionFlowControl bool
|
||||
}
|
||||
|
||||
// newStream creates a new Stream
|
||||
func newStream(session streamHandler, connectionParameterManager *handshake.ConnectionParametersManager, connectionFlowController flowcontrol.FlowController, flowControlManager flowcontrol.FlowControlManager, StreamID protocol.StreamID) (*stream, error) {
|
||||
func newStream(session streamHandler, connectionParameterManager *handshake.ConnectionParametersManager, flowControlManager flowcontrol.FlowControlManager, StreamID protocol.StreamID) (*stream, error) {
|
||||
s := &stream{
|
||||
session: session,
|
||||
streamID: StreamID,
|
||||
flowControlManager: flowControlManager,
|
||||
connectionFlowController: connectionFlowController,
|
||||
contributesToConnectionFlowControl: true,
|
||||
flowController: flowcontrol.NewFlowController(StreamID, connectionParameterManager),
|
||||
frameQueue: newStreamFrameSorter(),
|
||||
}
|
||||
|
||||
@@ -75,7 +72,7 @@ func newStream(session streamHandler, connectionParameterManager *handshake.Conn
|
||||
}
|
||||
|
||||
s.newFrameOrErrCond.L = &s.mutex
|
||||
s.windowUpdateOrErrCond.L = &s.mutex
|
||||
s.doneWritingOrErrCond.L = &s.mutex
|
||||
|
||||
return s, nil
|
||||
}
|
||||
@@ -159,83 +156,66 @@ func (s *stream) ReadByte() (byte, error) {
|
||||
return p[0], err
|
||||
}
|
||||
|
||||
func (s *stream) ConnectionFlowControlWindowUpdated() {
|
||||
s.windowUpdateOrErrCond.Broadcast()
|
||||
}
|
||||
|
||||
func (s *stream) UpdateSendFlowControlWindow(n protocol.ByteCount) bool {
|
||||
if s.flowController.UpdateSendWindow(n) {
|
||||
s.windowUpdateOrErrCond.Broadcast()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *stream) Write(p []byte) (int, error) {
|
||||
s.mutex.Lock()
|
||||
err := s.err
|
||||
s.mutex.Unlock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
s.dataForWriting = p
|
||||
|
||||
s.session.scheduleSending()
|
||||
|
||||
for s.dataForWriting != nil && s.err == nil {
|
||||
s.doneWritingOrErrCond.Wait()
|
||||
}
|
||||
|
||||
dataWritten := 0
|
||||
|
||||
for dataWritten < len(p) {
|
||||
s.mutex.Lock()
|
||||
remainingBytesInWindow := utils.MinByteCount(s.flowController.SendWindowSize(), protocol.ByteCount(len(p)-dataWritten))
|
||||
if s.contributesToConnectionFlowControl {
|
||||
remainingBytesInWindow = utils.MinByteCount(remainingBytesInWindow, s.connectionFlowController.SendWindowSize())
|
||||
}
|
||||
for remainingBytesInWindow == 0 && s.err == nil {
|
||||
s.windowUpdateOrErrCond.Wait()
|
||||
remainingBytesInWindow = utils.MinByteCount(s.flowController.SendWindowSize(), protocol.ByteCount(len(p)-dataWritten))
|
||||
if s.contributesToConnectionFlowControl {
|
||||
remainingBytesInWindow = utils.MinByteCount(remainingBytesInWindow, s.connectionFlowController.SendWindowSize())
|
||||
}
|
||||
}
|
||||
s.mutex.Unlock()
|
||||
|
||||
if remainingBytesInWindow == 0 {
|
||||
// We must have had an error
|
||||
return 0, s.err
|
||||
}
|
||||
|
||||
dataLen := utils.MinByteCount(protocol.ByteCount(len(p)), remainingBytesInWindow)
|
||||
data := make([]byte, dataLen)
|
||||
copy(data, p[dataWritten:])
|
||||
err := s.session.queueStreamFrame(&frames.StreamFrame{
|
||||
StreamID: s.streamID,
|
||||
Offset: s.writeOffset,
|
||||
Data: data,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
dataWritten += int(dataLen) // We cannot have written more than the int range
|
||||
s.flowController.AddBytesSent(protocol.ByteCount(dataLen))
|
||||
if s.contributesToConnectionFlowControl {
|
||||
s.connectionFlowController.AddBytesSent(protocol.ByteCount(dataLen))
|
||||
}
|
||||
s.writeOffset += protocol.ByteCount(dataLen)
|
||||
|
||||
s.maybeTriggerBlocked()
|
||||
if s.err != nil {
|
||||
return 0, s.err
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
return dataWritten, nil
|
||||
func (s *stream) lenOfDataForWriting() protocol.ByteCount {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
return protocol.ByteCount(len(s.dataForWriting))
|
||||
}
|
||||
|
||||
func (s *stream) getDataForWriting(maxBytes protocol.ByteCount) []byte {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
if s.dataForWriting == nil {
|
||||
return nil
|
||||
}
|
||||
var ret []byte
|
||||
if protocol.ByteCount(len(s.dataForWriting)) > maxBytes {
|
||||
ret = s.dataForWriting[:maxBytes]
|
||||
s.dataForWriting = s.dataForWriting[maxBytes:]
|
||||
} else {
|
||||
ret = s.dataForWriting
|
||||
s.dataForWriting = nil
|
||||
s.doneWritingOrErrCond.Signal()
|
||||
}
|
||||
s.writeOffset += protocol.ByteCount(len(ret))
|
||||
return ret
|
||||
}
|
||||
|
||||
// Close implements io.Closer
|
||||
func (s *stream) Close() error {
|
||||
atomic.StoreInt32(&s.closed, 1)
|
||||
return s.session.queueStreamFrame(&frames.StreamFrame{
|
||||
StreamID: s.streamID,
|
||||
Offset: s.writeOffset,
|
||||
FinBit: true,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stream) shouldSendFin() bool {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
closed := atomic.LoadInt32(&s.closed) != 0
|
||||
return closed && !s.finSent && s.err == nil && s.dataForWriting == nil
|
||||
}
|
||||
|
||||
func (s *stream) sentFin() {
|
||||
s.mutex.Lock()
|
||||
s.finSent = true
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
// AddStreamFrame adds a new stream frame
|
||||
@@ -287,24 +267,6 @@ func (s *stream) maybeTriggerWindowUpdate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stream) maybeTriggerBlocked() {
|
||||
streamBlocked := s.flowController.MaybeTriggerBlocked()
|
||||
|
||||
if streamBlocked {
|
||||
s.session.streamBlocked(s.streamID, s.writeOffset)
|
||||
}
|
||||
|
||||
if s.contributesToConnectionFlowControl {
|
||||
connectionBlocked := s.connectionFlowController.MaybeTriggerBlocked()
|
||||
|
||||
if connectionBlocked {
|
||||
// TODO: send out connection-level BlockedFrames at the right time
|
||||
// see https://github.com/lucas-clemente/quic-go/issues/113
|
||||
s.session.streamBlocked(0, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterError is called by session to indicate that an error occurred and the
|
||||
// stream should be closed.
|
||||
func (s *stream) RegisterError(err error) {
|
||||
@@ -315,7 +277,7 @@ func (s *stream) RegisterError(err error) {
|
||||
return
|
||||
}
|
||||
s.err = err
|
||||
s.windowUpdateOrErrCond.Signal()
|
||||
s.doneWritingOrErrCond.Signal()
|
||||
s.newFrameOrErrCond.Signal()
|
||||
}
|
||||
|
||||
@@ -324,6 +286,7 @@ func (s *stream) finishedReading() bool {
|
||||
}
|
||||
|
||||
func (s *stream) finishedWriting() bool {
|
||||
// TODO: sentFIN
|
||||
return atomic.LoadInt32(&s.closed) != 0
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user