add a Write method for Streams

This commit is contained in:
Marten Seemann
2016-04-17 16:53:04 +07:00
parent 1097698c4b
commit a90c9ba173
2 changed files with 19 additions and 2 deletions

View File

@@ -160,7 +160,7 @@ func (s *Session) handleStreamFrame(r *bytes.Reader) error {
} else { } else {
stream, ok := s.Streams[frame.StreamID] stream, ok := s.Streams[frame.StreamID]
if !ok { if !ok {
stream = NewStream(frame.StreamID) stream = NewStream(s, frame.StreamID)
s.Streams[frame.StreamID] = stream s.Streams[frame.StreamID] = stream
} }
err := stream.AddStreamFrame(frame) err := stream.AddStreamFrame(frame)

View File

@@ -8,15 +8,18 @@ import (
// A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
type Stream struct { type Stream struct {
Session *Session
StreamID protocol.StreamID StreamID protocol.StreamID
StreamFrames chan *frames.StreamFrame StreamFrames chan *frames.StreamFrame
CurrentFrame *frames.StreamFrame CurrentFrame *frames.StreamFrame
ReadPosInFrame int ReadPosInFrame int
WriteOffset uint64
} }
// NewStream creates a new Stream // NewStream creates a new Stream
func NewStream(StreamID protocol.StreamID) *Stream { func NewStream(session *Session, StreamID protocol.StreamID) *Stream {
return &Stream{ return &Stream{
Session: session,
StreamID: StreamID, StreamID: StreamID,
StreamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number StreamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number
} }
@@ -50,6 +53,20 @@ func (s *Stream) Read(p []byte) (int, error) {
return bytesRead, nil return bytesRead, nil
} }
func (s *Stream) Write(p []byte) (int, error) {
frame := &frames.StreamFrame{
StreamID: s.StreamID,
Offset: s.WriteOffset,
Data: p,
}
err := s.Session.SendFrames([]frames.Frame{frame})
if err != nil {
return 0, err
}
s.WriteOffset += uint64(len(p))
return len(p), nil
}
// AddStreamFrame adds a new stream frame // AddStreamFrame adds a new stream frame
func (s *Stream) AddStreamFrame(frame *frames.StreamFrame) error { func (s *Stream) AddStreamFrame(frame *frames.StreamFrame) error {
s.StreamFrames <- frame s.StreamFrames <- frame