From 5fbd52158fe49ade53edcf4f8f079e2a12293261 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 15 Apr 2017 12:13:12 +0700 Subject: [PATCH] replace the sync.Cond for stream.Read() by a channel --- stream.go | 27 ++++++++++++++++++--------- stream_test.go | 8 +++----- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/stream.go b/stream.go index 183db9cb..3e2a7303 100644 --- a/stream.go +++ b/stream.go @@ -40,8 +40,8 @@ type stream struct { // resetRemotely is set if RegisterRemoteError() is called resetRemotely utils.AtomicBool - frameQueue *streamFrameSorter - newFrameOrErrCond sync.Cond + frameQueue *streamFrameSorter + readChan chan struct{} dataForWriting []byte finSent utils.AtomicBool @@ -62,8 +62,8 @@ func newStream(StreamID protocol.StreamID, streamID: StreamID, flowControlManager: flowControlManager, frameQueue: newStreamFrameSorter(), + readChan: make(chan struct{}, 1), } - s.newFrameOrErrCond.L = &s.mutex s.doneWritingOrErrCond.L = &s.mutex return s } @@ -84,9 +84,9 @@ func (s *stream) Read(p []byte) (int, error) { for bytesRead < len(p) { s.mutex.Lock() frame := s.frameQueue.Head() + s.mutex.Unlock() if frame == nil && bytesRead > 0 { - s.mutex.Unlock() return bytesRead, s.err } @@ -101,10 +101,11 @@ func (s *stream) Read(p []byte) (int, error) { s.readPosInFrame = int(s.readOffset - frame.Offset) break } - s.newFrameOrErrCond.Wait() + <-s.readChan + s.mutex.Lock() frame = s.frameQueue.Head() + s.mutex.Unlock() } - s.mutex.Unlock() if err != nil { return bytesRead, err @@ -250,10 +251,18 @@ func (s *stream) AddStreamFrame(frame *frames.StreamFrame) error { if err != nil && err != errDuplicateStreamData { return err } - s.newFrameOrErrCond.Signal() + s.signalRead() return nil } +// signalRead performs a non-blocking send on the readChan +func (s *stream) signalRead() { + select { + case s.readChan <- struct{}{}: + default: + } +} + // CloseRemote makes the stream receive a "virtual" FIN stream frame at a given offset func (s *stream) CloseRemote(offset protocol.ByteCount) { s.AddStreamFrame(&frames.StreamFrame{FinBit: true, Offset: offset}) @@ -267,7 +276,7 @@ func (s *stream) Cancel(err error) { // errors must not be changed! if s.err == nil { s.err = err - s.newFrameOrErrCond.Signal() + s.signalRead() s.doneWritingOrErrCond.Signal() } s.mutex.Unlock() @@ -283,7 +292,7 @@ func (s *stream) Reset(err error) { // errors must not be changed! if s.err == nil { s.err = err - s.newFrameOrErrCond.Signal() + s.signalRead() s.doneWritingOrErrCond.Signal() } if s.shouldSendReset() { diff --git a/stream_test.go b/stream_test.go index ad066f9c..4f4f273a 100644 --- a/stream_test.go +++ b/stream_test.go @@ -135,11 +135,9 @@ var _ = Describe("Stream", func() { mockFcm.EXPECT().UpdateHighestReceived(streamID, protocol.ByteCount(2)) mockFcm.EXPECT().AddBytesRead(streamID, protocol.ByteCount(2)) go func() { - frame := frames.StreamFrame{ - Offset: 0, - Data: []byte{0xDE, 0xAD}, - } - time.Sleep(time.Millisecond) + defer GinkgoRecover() + frame := frames.StreamFrame{Data: []byte{0xDE, 0xAD}} + time.Sleep(10 * time.Millisecond) err := str.AddStreamFrame(&frame) Expect(err).ToNot(HaveOccurred()) }()