use chan instead of sync.Cond to signal that new streams can be accepted

This commit is contained in:
Marten Seemann
2019-05-28 16:36:21 +02:00
parent 12bce1caaa
commit f74082b2fb
3 changed files with 36 additions and 30 deletions

View File

@@ -12,8 +12,8 @@ import (
)
type incomingBidiStreamsMap struct {
mutex sync.RWMutex
cond sync.Cond
mutex sync.RWMutex
newStreamChan chan struct{}
streams map[protocol.StreamNum]streamI
// When a stream is deleted before it was accepted, we can't delete it immediately.
@@ -36,9 +36,9 @@ func newIncomingBidiStreamsMap(
newStream func(protocol.StreamNum) streamI,
maxStreams uint64,
queueControlFrame func(wire.Frame),
// streamNumToID func(protocol.StreamNum) protocol.StreamID,
) *incomingBidiStreamsMap {
m := &incomingBidiStreamsMap{
return &incomingBidiStreamsMap{
newStreamChan: make(chan struct{}),
streams: make(map[protocol.StreamNum]streamI),
streamsToDelete: make(map[protocol.StreamNum]struct{}),
maxStream: protocol.StreamNum(maxStreams),
@@ -47,10 +47,7 @@ func newIncomingBidiStreamsMap(
nextStreamToOpen: 1,
nextStreamToAccept: 1,
queueMaxStreamID: func(f *wire.MaxStreamsFrame) { queueControlFrame(f) },
// streamNumToID: streamNumToID,
}
m.cond.L = &m.mutex
return m
}
func (m *incomingBidiStreamsMap) AcceptStream() (streamI, error) {
@@ -69,7 +66,9 @@ func (m *incomingBidiStreamsMap) AcceptStream() (streamI, error) {
if ok {
break
}
m.cond.Wait()
m.mutex.Unlock()
<-m.newStreamChan
m.mutex.Lock()
}
m.nextStreamToAccept++
// If this stream was completed before being accepted, we can delete it now.
@@ -111,7 +110,10 @@ func (m *incomingBidiStreamsMap) GetOrOpenStream(num protocol.StreamNum) (stream
// * highestStream is only modified by this function
for newNum := m.nextStreamToOpen; newNum <= num; newNum++ {
m.streams[newNum] = m.newStream(newNum)
m.cond.Signal()
select {
case m.newStreamChan <- struct{}{}:
default:
}
}
m.nextStreamToOpen = num + 1
s := m.streams[num]
@@ -167,5 +169,5 @@ func (m *incomingBidiStreamsMap) CloseWithError(err error) {
str.closeForShutdown(err)
}
m.mutex.Unlock()
m.cond.Broadcast()
close(m.newStreamChan)
}

View File

@@ -10,8 +10,8 @@ import (
//go:generate genny -in $GOFILE -out streams_map_incoming_bidi.go gen "item=streamI Item=BidiStream streamTypeGeneric=protocol.StreamTypeBidi"
//go:generate genny -in $GOFILE -out streams_map_incoming_uni.go gen "item=receiveStreamI Item=UniStream streamTypeGeneric=protocol.StreamTypeUni"
type incomingItemsMap struct {
mutex sync.RWMutex
cond sync.Cond
mutex sync.RWMutex
newStreamChan chan struct{}
streams map[protocol.StreamNum]item
// When a stream is deleted before it was accepted, we can't delete it immediately.
@@ -34,9 +34,9 @@ func newIncomingItemsMap(
newStream func(protocol.StreamNum) item,
maxStreams uint64,
queueControlFrame func(wire.Frame),
// streamNumToID func(protocol.StreamNum) protocol.StreamID,
) *incomingItemsMap {
m := &incomingItemsMap{
return &incomingItemsMap{
newStreamChan: make(chan struct{}),
streams: make(map[protocol.StreamNum]item),
streamsToDelete: make(map[protocol.StreamNum]struct{}),
maxStream: protocol.StreamNum(maxStreams),
@@ -45,10 +45,7 @@ func newIncomingItemsMap(
nextStreamToOpen: 1,
nextStreamToAccept: 1,
queueMaxStreamID: func(f *wire.MaxStreamsFrame) { queueControlFrame(f) },
// streamNumToID: streamNumToID,
}
m.cond.L = &m.mutex
return m
}
func (m *incomingItemsMap) AcceptStream() (item, error) {
@@ -67,7 +64,9 @@ func (m *incomingItemsMap) AcceptStream() (item, error) {
if ok {
break
}
m.cond.Wait()
m.mutex.Unlock()
<-m.newStreamChan
m.mutex.Lock()
}
m.nextStreamToAccept++
// If this stream was completed before being accepted, we can delete it now.
@@ -109,7 +108,10 @@ func (m *incomingItemsMap) GetOrOpenStream(num protocol.StreamNum) (item, error)
// * highestStream is only modified by this function
for newNum := m.nextStreamToOpen; newNum <= num; newNum++ {
m.streams[newNum] = m.newStream(newNum)
m.cond.Signal()
select {
case m.newStreamChan <- struct{}{}:
default:
}
}
m.nextStreamToOpen = num + 1
s := m.streams[num]
@@ -165,5 +167,5 @@ func (m *incomingItemsMap) CloseWithError(err error) {
str.closeForShutdown(err)
}
m.mutex.Unlock()
m.cond.Broadcast()
close(m.newStreamChan)
}

View File

@@ -12,8 +12,8 @@ import (
)
type incomingUniStreamsMap struct {
mutex sync.RWMutex
cond sync.Cond
mutex sync.RWMutex
newStreamChan chan struct{}
streams map[protocol.StreamNum]receiveStreamI
// When a stream is deleted before it was accepted, we can't delete it immediately.
@@ -36,9 +36,9 @@ func newIncomingUniStreamsMap(
newStream func(protocol.StreamNum) receiveStreamI,
maxStreams uint64,
queueControlFrame func(wire.Frame),
// streamNumToID func(protocol.StreamNum) protocol.StreamID,
) *incomingUniStreamsMap {
m := &incomingUniStreamsMap{
return &incomingUniStreamsMap{
newStreamChan: make(chan struct{}),
streams: make(map[protocol.StreamNum]receiveStreamI),
streamsToDelete: make(map[protocol.StreamNum]struct{}),
maxStream: protocol.StreamNum(maxStreams),
@@ -47,10 +47,7 @@ func newIncomingUniStreamsMap(
nextStreamToOpen: 1,
nextStreamToAccept: 1,
queueMaxStreamID: func(f *wire.MaxStreamsFrame) { queueControlFrame(f) },
// streamNumToID: streamNumToID,
}
m.cond.L = &m.mutex
return m
}
func (m *incomingUniStreamsMap) AcceptStream() (receiveStreamI, error) {
@@ -69,7 +66,9 @@ func (m *incomingUniStreamsMap) AcceptStream() (receiveStreamI, error) {
if ok {
break
}
m.cond.Wait()
m.mutex.Unlock()
<-m.newStreamChan
m.mutex.Lock()
}
m.nextStreamToAccept++
// If this stream was completed before being accepted, we can delete it now.
@@ -111,7 +110,10 @@ func (m *incomingUniStreamsMap) GetOrOpenStream(num protocol.StreamNum) (receive
// * highestStream is only modified by this function
for newNum := m.nextStreamToOpen; newNum <= num; newNum++ {
m.streams[newNum] = m.newStream(newNum)
m.cond.Signal()
select {
case m.newStreamChan <- struct{}{}:
default:
}
}
m.nextStreamToOpen = num + 1
s := m.streams[num]
@@ -167,5 +169,5 @@ func (m *incomingUniStreamsMap) CloseWithError(err error) {
str.closeForShutdown(err)
}
m.mutex.Unlock()
m.cond.Broadcast()
close(m.newStreamChan)
}