From f74082b2fb904884e1c5746ba0b808da70eaf0c2 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 28 May 2019 16:36:21 +0200 Subject: [PATCH] use chan instead of sync.Cond to signal that new streams can be accepted --- streams_map_incoming_bidi.go | 22 ++++++++++++---------- streams_map_incoming_generic.go | 22 ++++++++++++---------- streams_map_incoming_uni.go | 22 ++++++++++++---------- 3 files changed, 36 insertions(+), 30 deletions(-) diff --git a/streams_map_incoming_bidi.go b/streams_map_incoming_bidi.go index f0aad6a27..aa431558d 100644 --- a/streams_map_incoming_bidi.go +++ b/streams_map_incoming_bidi.go @@ -12,8 +12,8 @@ import ( ) type incomingBidiStreamsMap struct { - mutex sync.RWMutex - cond sync.Cond + mutex sync.RWMutex + newStreamChan chan struct{} streams map[protocol.StreamNum]streamI // When a stream is deleted before it was accepted, we can't delete it immediately. @@ -36,9 +36,9 @@ func newIncomingBidiStreamsMap( newStream func(protocol.StreamNum) streamI, maxStreams uint64, queueControlFrame func(wire.Frame), - // streamNumToID func(protocol.StreamNum) protocol.StreamID, ) *incomingBidiStreamsMap { - m := &incomingBidiStreamsMap{ + return &incomingBidiStreamsMap{ + newStreamChan: make(chan struct{}), streams: make(map[protocol.StreamNum]streamI), streamsToDelete: make(map[protocol.StreamNum]struct{}), maxStream: protocol.StreamNum(maxStreams), @@ -47,10 +47,7 @@ func newIncomingBidiStreamsMap( nextStreamToOpen: 1, nextStreamToAccept: 1, queueMaxStreamID: func(f *wire.MaxStreamsFrame) { queueControlFrame(f) }, - // streamNumToID: streamNumToID, } - m.cond.L = &m.mutex - return m } func (m *incomingBidiStreamsMap) AcceptStream() (streamI, error) { @@ -69,7 +66,9 @@ func (m *incomingBidiStreamsMap) AcceptStream() (streamI, error) { if ok { break } - m.cond.Wait() + m.mutex.Unlock() + <-m.newStreamChan + m.mutex.Lock() } m.nextStreamToAccept++ // If this stream was completed before being accepted, we can delete it now. @@ -111,7 +110,10 @@ func (m *incomingBidiStreamsMap) GetOrOpenStream(num protocol.StreamNum) (stream // * highestStream is only modified by this function for newNum := m.nextStreamToOpen; newNum <= num; newNum++ { m.streams[newNum] = m.newStream(newNum) - m.cond.Signal() + select { + case m.newStreamChan <- struct{}{}: + default: + } } m.nextStreamToOpen = num + 1 s := m.streams[num] @@ -167,5 +169,5 @@ func (m *incomingBidiStreamsMap) CloseWithError(err error) { str.closeForShutdown(err) } m.mutex.Unlock() - m.cond.Broadcast() + close(m.newStreamChan) } diff --git a/streams_map_incoming_generic.go b/streams_map_incoming_generic.go index c7485aca3..004daed54 100644 --- a/streams_map_incoming_generic.go +++ b/streams_map_incoming_generic.go @@ -10,8 +10,8 @@ import ( //go:generate genny -in $GOFILE -out streams_map_incoming_bidi.go gen "item=streamI Item=BidiStream streamTypeGeneric=protocol.StreamTypeBidi" //go:generate genny -in $GOFILE -out streams_map_incoming_uni.go gen "item=receiveStreamI Item=UniStream streamTypeGeneric=protocol.StreamTypeUni" type incomingItemsMap struct { - mutex sync.RWMutex - cond sync.Cond + mutex sync.RWMutex + newStreamChan chan struct{} streams map[protocol.StreamNum]item // When a stream is deleted before it was accepted, we can't delete it immediately. @@ -34,9 +34,9 @@ func newIncomingItemsMap( newStream func(protocol.StreamNum) item, maxStreams uint64, queueControlFrame func(wire.Frame), - // streamNumToID func(protocol.StreamNum) protocol.StreamID, ) *incomingItemsMap { - m := &incomingItemsMap{ + return &incomingItemsMap{ + newStreamChan: make(chan struct{}), streams: make(map[protocol.StreamNum]item), streamsToDelete: make(map[protocol.StreamNum]struct{}), maxStream: protocol.StreamNum(maxStreams), @@ -45,10 +45,7 @@ func newIncomingItemsMap( nextStreamToOpen: 1, nextStreamToAccept: 1, queueMaxStreamID: func(f *wire.MaxStreamsFrame) { queueControlFrame(f) }, - // streamNumToID: streamNumToID, } - m.cond.L = &m.mutex - return m } func (m *incomingItemsMap) AcceptStream() (item, error) { @@ -67,7 +64,9 @@ func (m *incomingItemsMap) AcceptStream() (item, error) { if ok { break } - m.cond.Wait() + m.mutex.Unlock() + <-m.newStreamChan + m.mutex.Lock() } m.nextStreamToAccept++ // If this stream was completed before being accepted, we can delete it now. @@ -109,7 +108,10 @@ func (m *incomingItemsMap) GetOrOpenStream(num protocol.StreamNum) (item, error) // * highestStream is only modified by this function for newNum := m.nextStreamToOpen; newNum <= num; newNum++ { m.streams[newNum] = m.newStream(newNum) - m.cond.Signal() + select { + case m.newStreamChan <- struct{}{}: + default: + } } m.nextStreamToOpen = num + 1 s := m.streams[num] @@ -165,5 +167,5 @@ func (m *incomingItemsMap) CloseWithError(err error) { str.closeForShutdown(err) } m.mutex.Unlock() - m.cond.Broadcast() + close(m.newStreamChan) } diff --git a/streams_map_incoming_uni.go b/streams_map_incoming_uni.go index cc83eb667..1069ba191 100644 --- a/streams_map_incoming_uni.go +++ b/streams_map_incoming_uni.go @@ -12,8 +12,8 @@ import ( ) type incomingUniStreamsMap struct { - mutex sync.RWMutex - cond sync.Cond + mutex sync.RWMutex + newStreamChan chan struct{} streams map[protocol.StreamNum]receiveStreamI // When a stream is deleted before it was accepted, we can't delete it immediately. @@ -36,9 +36,9 @@ func newIncomingUniStreamsMap( newStream func(protocol.StreamNum) receiveStreamI, maxStreams uint64, queueControlFrame func(wire.Frame), - // streamNumToID func(protocol.StreamNum) protocol.StreamID, ) *incomingUniStreamsMap { - m := &incomingUniStreamsMap{ + return &incomingUniStreamsMap{ + newStreamChan: make(chan struct{}), streams: make(map[protocol.StreamNum]receiveStreamI), streamsToDelete: make(map[protocol.StreamNum]struct{}), maxStream: protocol.StreamNum(maxStreams), @@ -47,10 +47,7 @@ func newIncomingUniStreamsMap( nextStreamToOpen: 1, nextStreamToAccept: 1, queueMaxStreamID: func(f *wire.MaxStreamsFrame) { queueControlFrame(f) }, - // streamNumToID: streamNumToID, } - m.cond.L = &m.mutex - return m } func (m *incomingUniStreamsMap) AcceptStream() (receiveStreamI, error) { @@ -69,7 +66,9 @@ func (m *incomingUniStreamsMap) AcceptStream() (receiveStreamI, error) { if ok { break } - m.cond.Wait() + m.mutex.Unlock() + <-m.newStreamChan + m.mutex.Lock() } m.nextStreamToAccept++ // If this stream was completed before being accepted, we can delete it now. @@ -111,7 +110,10 @@ func (m *incomingUniStreamsMap) GetOrOpenStream(num protocol.StreamNum) (receive // * highestStream is only modified by this function for newNum := m.nextStreamToOpen; newNum <= num; newNum++ { m.streams[newNum] = m.newStream(newNum) - m.cond.Signal() + select { + case m.newStreamChan <- struct{}{}: + default: + } } m.nextStreamToOpen = num + 1 s := m.streams[num] @@ -167,5 +169,5 @@ func (m *incomingUniStreamsMap) CloseWithError(err error) { str.closeForShutdown(err) } m.mutex.Unlock() - m.cond.Broadcast() + close(m.newStreamChan) }