diff --git a/session.go b/session.go index b843558ce..529caf480 100644 --- a/session.go +++ b/session.go @@ -602,17 +602,17 @@ func (s *Session) GetOrOpenStream(id protocol.StreamID) (utils.Stream, error) { // The streamsMutex is locked by OpenStream or GetOrOpenStream before calling this function. func (s *Session) newStreamImpl(id protocol.StreamID) (*stream, error) { maxAllowedStreams := uint32(protocol.MaxStreamsMultiplier * float32(s.connectionParametersManager.GetMaxStreamsPerConnection())) - if s.openStreamsCount >= maxAllowedStreams { + if atomic.LoadUint32(&s.openStreamsCount) >= maxAllowedStreams { return nil, qerr.TooManyOpenStreams } + if _, ok := s.streams[id]; ok { + return nil, fmt.Errorf("Session: stream with ID %d already exists", id) + } stream, err := newStream(s, s.connectionParametersManager, s.flowController, id) if err != nil { return nil, err } - if s.streams[id] != nil { - return nil, fmt.Errorf("Session: stream with ID %d already exists", id) - } - s.openStreamsCount++ + atomic.AddUint32(&s.openStreamsCount, 1) s.streams[id] = stream return stream, nil } @@ -633,7 +633,7 @@ func (s *Session) garbageCollectStreams() { s.windowUpdateManager.RemoveStream(k) } if v.finished() { - s.openStreamsCount-- + atomic.AddUint32(&s.openStreamsCount, ^uint32(0)) // decrement s.streams[k] = nil } }