From 57e6749955b02b73b4e59ca759bdffa990103a7b Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 6 Jan 2017 12:22:02 +0700 Subject: [PATCH] stream code cleanup --- stream.go | 45 ++++++++++++++++----------------------------- 1 file changed, 16 insertions(+), 29 deletions(-) diff --git a/stream.go b/stream.go index deae8ab1..affadb09 100644 --- a/stream.go +++ b/stream.go @@ -27,16 +27,22 @@ type stream struct { // Once set, the errors must not be changed! err error - cancelled utils.AtomicBool + // cancelled is set when Cancel() is called + cancelled utils.AtomicBool + // finishedReading is set once we read a frame with a FinBit finishedReading utils.AtomicBool + // finisedWriting is set once Close() is called finishedWriting utils.AtomicBool - resetLocally utils.AtomicBool + // resetLocally is set if Reset() is called + resetLocally utils.AtomicBool + // resetRemotely is set if RegisterRemoteError() is called + resetRemotely utils.AtomicBool frameQueue *streamFrameSorter newFrameOrErrCond sync.Cond dataForWriting []byte - finSent bool + finSent utils.AtomicBool doneWritingOrErrCond sync.Cond flowControlManager flowcontrol.FlowControlManager @@ -79,7 +85,7 @@ func (s *stream) Read(p []byte) (int, error) { var err error for { // Stop waiting on errors - if s.err != nil { + if s.resetLocally.Get() || s.cancelled.Get() { err = s.err break } @@ -91,13 +97,8 @@ func (s *stream) Read(p []byte) (int, error) { frame = s.frameQueue.Head() } s.mutex.Unlock() - // Here, either frame != nil xor err != nil - // fmt.Printf("err: %#v, frame: %#v\n", err, frame) - - if frame == nil { - s.finishedReading.Set(true) - // We have an err and no data, return the error + if err != nil { return bytesRead, err } @@ -208,15 +209,13 @@ func (s *stream) Close() error { func (s *stream) shouldSendFin() bool { s.mutex.Lock() - res := s.finishedWriting.Get() && !s.finSent && s.err == nil && s.dataForWriting == nil + res := s.finishedWriting.Get() && !s.finSent.Get() && s.err == nil && s.dataForWriting == nil s.mutex.Unlock() return res } func (s *stream) sentFin() { - s.mutex.Lock() - s.finSent = true - s.mutex.Unlock() + s.finSent.Set(true) } // AddStreamFrame adds a new stream frame @@ -245,8 +244,6 @@ func (s *stream) CloseRemote(offset protocol.ByteCount) { // Cancel is called by session to indicate that an error occurred // The stream should will be closed immediately func (s *stream) Cancel(err error) { - s.finishedReading.Set(true) - s.finishedWriting.Set(true) s.cancelled.Set(true) s.mutex.Lock() @@ -256,15 +253,11 @@ func (s *stream) Cancel(err error) { s.newFrameOrErrCond.Signal() s.doneWritingOrErrCond.Signal() } - // if s.writeErr == nil { - // s.writeErr = err - // } s.mutex.Unlock() } // resets the stream locally func (s *stream) Reset(err error) { - s.finishedReading.Set(true) s.resetLocally.Set(true) s.mutex.Lock() // errors must not be changed! @@ -278,7 +271,7 @@ func (s *stream) Reset(err error) { // resets the stream remotely func (s *stream) RegisterRemoteError(err error) { - s.finishedWriting.Set(true) + s.resetRemotely.Set(true) s.mutex.Lock() // errors must not be changed! if s.err == nil { @@ -288,18 +281,12 @@ func (s *stream) RegisterRemoteError(err error) { s.mutex.Unlock() } -func (s *stream) finishedRead() bool { - return s.finishedReading.Get() -} - func (s *stream) finishedWriteAndSentFin() bool { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.err != nil || (s.finishedWriting.Get() && s.finSent) + return s.finishedWriting.Get() && s.finSent.Get() } func (s *stream) finished() bool { - return s.finishedRead() && s.finishedWriteAndSentFin() + return s.cancelled.Get() || (s.finishedReading.Get() && s.finishedWriteAndSentFin()) } func (s *stream) StreamID() protocol.StreamID {