From 46b1d7a1fc34bbbe29d83815526a48722b957b96 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 24 Jan 2019 13:11:20 +0700 Subject: [PATCH 1/2] call the onStreamCompleted callback without holding the stream mutex --- receive_stream.go | 42 ++++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/receive_stream.go b/receive_stream.go index 59deb744..f166ee56 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -191,14 +191,17 @@ func (s *receiveStream) dequeueNextFrame() { } func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) { + if completed := s.cancelReadImpl(errorCode); completed { + s.streamCompleted() + } +} + +func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) bool /* completed */ { s.mutex.Lock() defer s.mutex.Unlock() if s.finRead || s.canceledRead || s.resetRemotely { - return - } - if s.finalOffset != protocol.MaxByteCount { // final offset was already received - s.streamCompleted() + return false } s.canceledRead = true s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode) @@ -207,34 +210,37 @@ func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) { StreamID: s.streamID, ErrorCode: errorCode, }) + // We're done with this stream if the final offset was already received. + return s.finalOffset != protocol.MaxByteCount } func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error { - maxOffset := frame.Offset + frame.DataLen() + completed, err := s.handleStreamFrameImpl(frame) + if completed { + s.streamCompleted() + } + return err +} +func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) { s.mutex.Lock() defer s.mutex.Unlock() + maxOffset := frame.Offset + frame.DataLen() if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil { - return err + return false, err } if frame.FinBit { s.finalOffset = maxOffset } if s.canceledRead { - if frame.FinBit { - s.streamCompleted() - } - return nil + return frame.FinBit, nil } if err := s.frameQueue.Push(frame.Data, frame.Offset); err != nil { - return err - } - if frame.FinBit { - s.finalOffset = maxOffset + return false, err } s.signalRead() - return nil + return false, nil } func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error { @@ -298,7 +304,11 @@ func (s *receiveStream) getWindowUpdate() protocol.ByteCount { } func (s *receiveStream) streamCompleted() { - if !s.finRead { + s.mutex.Lock() + finRead := s.finRead + s.mutex.Unlock() + + if !finRead { s.flowController.Abandon() } s.sender.onStreamCompleted(s.streamID) From 2787a6051ab72bce3ae9036e598e5bf2bd124cef Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 24 Jan 2019 13:17:22 +0700 Subject: [PATCH 2/2] remove some defer statements in the stream --- receive_stream.go | 27 ++++++++++++++------------- send_stream.go | 16 +++++++++------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/receive_stream.go b/receive_stream.go index f166ee56..de76335e 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -78,7 +78,10 @@ func (s *receiveStream) StreamID() protocol.StreamID { // Read implements io.Reader. It is not thread safe! func (s *receiveStream) Read(p []byte) (int, error) { + s.mutex.Lock() completed, n, err := s.readImpl(p) + s.mutex.Unlock() + if completed { s.streamCompleted() } @@ -86,9 +89,6 @@ func (s *receiveStream) Read(p []byte) (int, error) { } func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.finRead { return false, 0, io.EOF } @@ -191,15 +191,16 @@ func (s *receiveStream) dequeueNextFrame() { } func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) { - if completed := s.cancelReadImpl(errorCode); completed { + s.mutex.Lock() + completed := s.cancelReadImpl(errorCode) + s.mutex.Unlock() + + if completed { s.streamCompleted() } } func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) bool /* completed */ { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.finRead || s.canceledRead || s.resetRemotely { return false } @@ -215,7 +216,10 @@ func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) } func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error { + s.mutex.Lock() completed, err := s.handleStreamFrameImpl(frame) + s.mutex.Unlock() + if completed { s.streamCompleted() } @@ -223,9 +227,6 @@ func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error { } func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - maxOffset := frame.Offset + frame.DataLen() if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil { return false, err @@ -244,7 +245,10 @@ func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* } func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error { + s.mutex.Lock() completed, err := s.handleResetStreamFrameImpl(frame) + s.mutex.Unlock() + if completed { s.streamCompleted() } @@ -252,9 +256,6 @@ func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) err } func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.closedForShutdown { return false, nil } diff --git a/send_stream.go b/send_stream.go index 056fcf56..92387c86 100644 --- a/send_stream.go +++ b/send_stream.go @@ -146,7 +146,10 @@ func (s *sendStream) Write(p []byte) (int, error) { // popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream // maxBytes is the maximum length this frame (including frame header) will have. func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) { + s.mutex.Lock() completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes) + s.mutex.Unlock() + if completed { s.sender.onStreamCompleted(s.streamID) } @@ -154,9 +157,6 @@ func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFr } func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.canceledWrite || s.closeForShutdownErr != nil { return false, nil, false } @@ -273,6 +273,7 @@ func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) { s.mutex.Lock() hasStreamData := s.dataForWriting != nil s.mutex.Unlock() + s.flowController.UpdateSendWindow(frame.ByteOffset) if hasStreamData { s.sender.onHasStreamData(s.streamID) @@ -280,16 +281,17 @@ func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) { } func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) { - if completed := s.handleStopSendingFrameImpl(frame); completed { + s.mutex.Lock() + completed := s.handleStopSendingFrameImpl(frame) + s.mutex.Unlock() + + if completed { s.sender.onStreamCompleted(s.streamID) } } // must be called after locking the mutex func (s *sendStream) handleStopSendingFrameImpl(frame *wire.StopSendingFrame) bool /*completed*/ { - s.mutex.Lock() - defer s.mutex.Unlock() - writeErr := streamCanceledError{ errorCode: frame.ErrorCode, error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),