diff --git a/session.go b/session.go index eaa4d3ad..1a7bd8d4 100644 --- a/session.go +++ b/session.go @@ -160,7 +160,7 @@ func (s *Session) handleStreamFrame(r *bytes.Reader) error { } else { stream, ok := s.Streams[frame.StreamID] if !ok { - stream = NewStream(frame.StreamID) + stream = NewStream(s, frame.StreamID) s.Streams[frame.StreamID] = stream } err := stream.AddStreamFrame(frame) diff --git a/stream.go b/stream.go index 4dac00b9..c2103934 100644 --- a/stream.go +++ b/stream.go @@ -8,15 +8,18 @@ import ( // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface type Stream struct { + Session *Session StreamID protocol.StreamID StreamFrames chan *frames.StreamFrame CurrentFrame *frames.StreamFrame ReadPosInFrame int + WriteOffset uint64 } // NewStream creates a new Stream -func NewStream(StreamID protocol.StreamID) *Stream { +func NewStream(session *Session, StreamID protocol.StreamID) *Stream { return &Stream{ + Session: session, StreamID: StreamID, StreamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number } @@ -50,6 +53,20 @@ func (s *Stream) Read(p []byte) (int, error) { return bytesRead, nil } +func (s *Stream) Write(p []byte) (int, error) { + frame := &frames.StreamFrame{ + StreamID: s.StreamID, + Offset: s.WriteOffset, + Data: p, + } + err := s.Session.SendFrames([]frames.Frame{frame}) + if err != nil { + return 0, err + } + s.WriteOffset += uint64(len(p)) + return len(p), nil +} + // AddStreamFrame adds a new stream frame func (s *Stream) AddStreamFrame(frame *frames.StreamFrame) error { s.StreamFrames <- frame