forked from quic-go/quic-go
make stream private
This commit is contained in:
104
stream.go
104
stream.go
@@ -15,66 +15,66 @@ type streamHandler interface {
|
||||
}
|
||||
|
||||
// A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
|
||||
type Stream struct {
|
||||
Session streamHandler
|
||||
StreamID protocol.StreamID
|
||||
StreamFrames chan *frames.StreamFrame
|
||||
CurrentFrame *frames.StreamFrame
|
||||
ReadPosInFrame int
|
||||
WriteOffset uint64
|
||||
ReadOffset uint64
|
||||
type stream struct {
|
||||
session streamHandler
|
||||
streamID protocol.StreamID
|
||||
streamFrames chan *frames.StreamFrame
|
||||
currentFrame *frames.StreamFrame
|
||||
readPosInFrame int
|
||||
writeOffset uint64
|
||||
readOffset uint64
|
||||
frameQueue []*frames.StreamFrame // TODO: replace with heap
|
||||
currentErr error
|
||||
}
|
||||
|
||||
// NewStream creates a new Stream
|
||||
func NewStream(session streamHandler, StreamID protocol.StreamID) *Stream {
|
||||
return &Stream{
|
||||
Session: session,
|
||||
StreamID: StreamID,
|
||||
StreamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number
|
||||
// newStream creates a new Stream
|
||||
func newStream(session streamHandler, StreamID protocol.StreamID) *stream {
|
||||
return &stream{
|
||||
session: session,
|
||||
streamID: StreamID,
|
||||
streamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number
|
||||
}
|
||||
}
|
||||
|
||||
// Read reads data
|
||||
func (s *Stream) Read(p []byte) (int, error) {
|
||||
func (s *stream) Read(p []byte) (int, error) {
|
||||
bytesRead := 0
|
||||
for bytesRead < len(p) {
|
||||
if s.CurrentFrame == nil {
|
||||
if s.currentFrame == nil {
|
||||
var err error
|
||||
s.CurrentFrame, err = s.getNextFrameInOrder(bytesRead == 0)
|
||||
s.currentFrame, err = s.getNextFrameInOrder(bytesRead == 0)
|
||||
if err != nil {
|
||||
return bytesRead, err
|
||||
}
|
||||
if s.CurrentFrame == nil {
|
||||
if s.currentFrame == nil {
|
||||
return bytesRead, nil
|
||||
}
|
||||
s.ReadPosInFrame = 0
|
||||
s.readPosInFrame = 0
|
||||
}
|
||||
m := utils.Min(len(p)-bytesRead, len(s.CurrentFrame.Data)-s.ReadPosInFrame)
|
||||
copy(p[bytesRead:], s.CurrentFrame.Data[s.ReadPosInFrame:])
|
||||
s.ReadPosInFrame += m
|
||||
m := utils.Min(len(p)-bytesRead, len(s.currentFrame.Data)-s.readPosInFrame)
|
||||
copy(p[bytesRead:], s.currentFrame.Data[s.readPosInFrame:])
|
||||
s.readPosInFrame += m
|
||||
bytesRead += m
|
||||
s.ReadOffset += uint64(m)
|
||||
if s.ReadPosInFrame >= len(s.CurrentFrame.Data) {
|
||||
if s.CurrentFrame.FinBit {
|
||||
s.readOffset += uint64(m)
|
||||
if s.readPosInFrame >= len(s.currentFrame.Data) {
|
||||
if s.currentFrame.FinBit {
|
||||
s.currentErr = io.EOF
|
||||
close(s.StreamFrames)
|
||||
s.CurrentFrame = nil
|
||||
s.Session.closeStream(s.StreamID)
|
||||
close(s.streamFrames)
|
||||
s.currentFrame = nil
|
||||
s.session.closeStream(s.streamID)
|
||||
return bytesRead, io.EOF
|
||||
}
|
||||
s.CurrentFrame = nil
|
||||
s.currentFrame = nil
|
||||
}
|
||||
}
|
||||
|
||||
return bytesRead, nil
|
||||
}
|
||||
|
||||
func (s *Stream) getNextFrameInOrder(wait bool) (*frames.StreamFrame, error) {
|
||||
func (s *stream) getNextFrameInOrder(wait bool) (*frames.StreamFrame, error) {
|
||||
// First, check the queue
|
||||
for i, f := range s.frameQueue {
|
||||
if f.Offset == s.ReadOffset {
|
||||
if f.Offset == s.readOffset {
|
||||
// Move last element into position i
|
||||
s.frameQueue[i] = s.frameQueue[len(s.frameQueue)-1]
|
||||
s.frameQueue = s.frameQueue[:len(s.frameQueue)-1]
|
||||
@@ -91,12 +91,12 @@ func (s *Stream) getNextFrameInOrder(wait bool) (*frames.StreamFrame, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if nextFrameFromChannel.Offset == s.ReadOffset {
|
||||
if nextFrameFromChannel.Offset == s.readOffset {
|
||||
return nextFrameFromChannel, nil
|
||||
}
|
||||
|
||||
// Discard if we already know it
|
||||
if nextFrameFromChannel.Offset < s.ReadOffset {
|
||||
if nextFrameFromChannel.Offset < s.readOffset {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -105,15 +105,15 @@ func (s *Stream) getNextFrameInOrder(wait bool) (*frames.StreamFrame, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stream) nextFrameInChan(blocking bool) (f *frames.StreamFrame, err error) {
|
||||
func (s *stream) nextFrameInChan(blocking bool) (f *frames.StreamFrame, err error) {
|
||||
var ok bool
|
||||
if blocking {
|
||||
select {
|
||||
case f, ok = <-s.StreamFrames:
|
||||
case f, ok = <-s.streamFrames:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case f, ok = <-s.StreamFrames:
|
||||
case f, ok = <-s.streamFrames:
|
||||
default:
|
||||
}
|
||||
}
|
||||
@@ -124,48 +124,48 @@ func (s *Stream) nextFrameInChan(blocking bool) (f *frames.StreamFrame, err erro
|
||||
}
|
||||
|
||||
// ReadByte implements io.ByteReader
|
||||
func (s *Stream) ReadByte() (byte, error) {
|
||||
func (s *stream) ReadByte() (byte, error) {
|
||||
// TODO: Optimize
|
||||
p := make([]byte, 1)
|
||||
_, err := io.ReadFull(s, p)
|
||||
return p[0], err
|
||||
}
|
||||
|
||||
func (s *Stream) Write(p []byte) (int, error) {
|
||||
func (s *stream) Write(p []byte) (int, error) {
|
||||
data := make([]byte, len(p))
|
||||
copy(data, p)
|
||||
err := s.Session.QueueFrame(&frames.StreamFrame{
|
||||
StreamID: s.StreamID,
|
||||
Offset: s.WriteOffset,
|
||||
err := s.session.QueueFrame(&frames.StreamFrame{
|
||||
StreamID: s.streamID,
|
||||
Offset: s.writeOffset,
|
||||
Data: data,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
s.WriteOffset += uint64(len(p))
|
||||
s.writeOffset += uint64(len(p))
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// Close implements io.Closer
|
||||
func (s *Stream) Close() error {
|
||||
fmt.Printf("Closing stream %d\n", s.StreamID)
|
||||
return s.Session.QueueFrame(&frames.StreamFrame{
|
||||
StreamID: s.StreamID,
|
||||
Offset: s.WriteOffset,
|
||||
func (s *stream) Close() error {
|
||||
fmt.Printf("Closing stream %d\n", s.streamID)
|
||||
return s.session.QueueFrame(&frames.StreamFrame{
|
||||
StreamID: s.streamID,
|
||||
Offset: s.writeOffset,
|
||||
FinBit: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AddStreamFrame adds a new stream frame
|
||||
func (s *Stream) AddStreamFrame(frame *frames.StreamFrame) error {
|
||||
s.StreamFrames <- frame
|
||||
func (s *stream) AddStreamFrame(frame *frames.StreamFrame) error {
|
||||
s.streamFrames <- frame
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterError is called by session to indicate that an error occured and the
|
||||
// stream should be closed.
|
||||
func (s *Stream) RegisterError(err error) {
|
||||
func (s *stream) RegisterError(err error) {
|
||||
s.currentErr = err
|
||||
s.Session.closeStream(s.StreamID)
|
||||
close(s.StreamFrames)
|
||||
s.session.closeStream(s.streamID)
|
||||
close(s.streamFrames)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user