stream code cleanup

This commit is contained in:
Marten Seemann
2017-01-06 12:22:02 +07:00
parent a86f31d789
commit 57e6749955

View File

@@ -27,16 +27,22 @@ type stream struct {
// Once set, the errors must not be changed!
err error
cancelled utils.AtomicBool
// cancelled is set when Cancel() is called
cancelled utils.AtomicBool
// finishedReading is set once we read a frame with a FinBit
finishedReading utils.AtomicBool
// finisedWriting is set once Close() is called
finishedWriting utils.AtomicBool
resetLocally utils.AtomicBool
// resetLocally is set if Reset() is called
resetLocally utils.AtomicBool
// resetRemotely is set if RegisterRemoteError() is called
resetRemotely utils.AtomicBool
frameQueue *streamFrameSorter
newFrameOrErrCond sync.Cond
dataForWriting []byte
finSent bool
finSent utils.AtomicBool
doneWritingOrErrCond sync.Cond
flowControlManager flowcontrol.FlowControlManager
@@ -79,7 +85,7 @@ func (s *stream) Read(p []byte) (int, error) {
var err error
for {
// Stop waiting on errors
if s.err != nil {
if s.resetLocally.Get() || s.cancelled.Get() {
err = s.err
break
}
@@ -91,13 +97,8 @@ func (s *stream) Read(p []byte) (int, error) {
frame = s.frameQueue.Head()
}
s.mutex.Unlock()
// Here, either frame != nil xor err != nil
// fmt.Printf("err: %#v, frame: %#v\n", err, frame)
if frame == nil {
s.finishedReading.Set(true)
// We have an err and no data, return the error
if err != nil {
return bytesRead, err
}
@@ -208,15 +209,13 @@ func (s *stream) Close() error {
func (s *stream) shouldSendFin() bool {
s.mutex.Lock()
res := s.finishedWriting.Get() && !s.finSent && s.err == nil && s.dataForWriting == nil
res := s.finishedWriting.Get() && !s.finSent.Get() && s.err == nil && s.dataForWriting == nil
s.mutex.Unlock()
return res
}
func (s *stream) sentFin() {
s.mutex.Lock()
s.finSent = true
s.mutex.Unlock()
s.finSent.Set(true)
}
// AddStreamFrame adds a new stream frame
@@ -245,8 +244,6 @@ func (s *stream) CloseRemote(offset protocol.ByteCount) {
// Cancel is called by session to indicate that an error occurred
// The stream should will be closed immediately
func (s *stream) Cancel(err error) {
s.finishedReading.Set(true)
s.finishedWriting.Set(true)
s.cancelled.Set(true)
s.mutex.Lock()
@@ -256,15 +253,11 @@ func (s *stream) Cancel(err error) {
s.newFrameOrErrCond.Signal()
s.doneWritingOrErrCond.Signal()
}
// if s.writeErr == nil {
// s.writeErr = err
// }
s.mutex.Unlock()
}
// resets the stream locally
func (s *stream) Reset(err error) {
s.finishedReading.Set(true)
s.resetLocally.Set(true)
s.mutex.Lock()
// errors must not be changed!
@@ -278,7 +271,7 @@ func (s *stream) Reset(err error) {
// resets the stream remotely
func (s *stream) RegisterRemoteError(err error) {
s.finishedWriting.Set(true)
s.resetRemotely.Set(true)
s.mutex.Lock()
// errors must not be changed!
if s.err == nil {
@@ -288,18 +281,12 @@ func (s *stream) RegisterRemoteError(err error) {
s.mutex.Unlock()
}
func (s *stream) finishedRead() bool {
return s.finishedReading.Get()
}
func (s *stream) finishedWriteAndSentFin() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.err != nil || (s.finishedWriting.Get() && s.finSent)
return s.finishedWriting.Get() && s.finSent.Get()
}
func (s *stream) finished() bool {
return s.finishedRead() && s.finishedWriteAndSentFin()
return s.cancelled.Get() || (s.finishedReading.Get() && s.finishedWriteAndSentFin())
}
func (s *stream) StreamID() protocol.StreamID {