simplify stream ID handling in the outgoing streams map (#5209)

No functional change expected.
This commit is contained in:
Marten Seemann
2025-06-08 20:27:14 +08:00
committed by GitHub
parent 576aa1747f
commit 41fce88a63
4 changed files with 209 additions and 201 deletions

View File

@@ -2,6 +2,7 @@ package quic
import (
"context"
"fmt"
"slices"
"sync"
@@ -18,15 +19,15 @@ type outgoingStreamsMap[T outgoingStream] struct {
mutex sync.RWMutex
streamType protocol.StreamType
streams map[protocol.StreamNum]T
streams map[protocol.StreamID]T
openQueue []chan struct{}
nextStream protocol.StreamNum // stream ID of the stream returned by OpenStream(Sync)
maxStream protocol.StreamNum // the maximum stream ID we're allowed to open
blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream
nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync)
maxStream protocol.StreamID // the maximum stream ID we're allowed to open
blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream
newStream func(protocol.StreamNum) T
newStream func(protocol.StreamID) T
queueStreamIDBlocked func(*wire.StreamsBlockedFrame)
closeErr error
@@ -34,14 +35,26 @@ type outgoingStreamsMap[T outgoingStream] struct {
func newOutgoingStreamsMap[T outgoingStream](
streamType protocol.StreamType,
newStream func(protocol.StreamNum) T,
newStream func(protocol.StreamID) T,
queueControlFrame func(wire.Frame),
pers protocol.Perspective,
) *outgoingStreamsMap[T] {
var nextStream protocol.StreamID
switch {
case streamType == protocol.StreamTypeBidi && pers == protocol.PerspectiveServer:
nextStream = protocol.FirstOutgoingBidiStreamServer
case streamType == protocol.StreamTypeBidi && pers == protocol.PerspectiveClient:
nextStream = protocol.FirstOutgoingBidiStreamClient
case streamType == protocol.StreamTypeUni && pers == protocol.PerspectiveServer:
nextStream = protocol.FirstOutgoingUniStreamServer
case streamType == protocol.StreamTypeUni && pers == protocol.PerspectiveClient:
nextStream = protocol.FirstOutgoingUniStreamClient
}
return &outgoingStreamsMap[T]{
streamType: streamType,
streams: make(map[protocol.StreamNum]T),
streams: make(map[protocol.StreamID]T),
maxStream: protocol.InvalidStreamNum,
nextStream: 1,
nextStream: nextStream,
newStream: newStream,
queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) },
}
@@ -114,7 +127,7 @@ func (m *outgoingStreamsMap[T]) OpenStreamSync(ctx context.Context) (T, error) {
func (m *outgoingStreamsMap[T]) openStream() T {
s := m.newStream(m.nextStream)
m.streams[m.nextStream] = s
m.nextStream++
m.nextStream += 4
return s
}
@@ -125,55 +138,49 @@ func (m *outgoingStreamsMap[T]) maybeSendBlockedFrame() {
return
}
var streamNum protocol.StreamNum
if m.maxStream != protocol.InvalidStreamNum {
streamNum = m.maxStream
var streamLimit protocol.StreamNum
if m.maxStream != protocol.InvalidStreamID {
streamLimit = m.maxStream.StreamNum()
}
m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{
Type: m.streamType,
StreamLimit: streamNum,
StreamLimit: streamLimit,
})
m.blockedSent = true
}
func (m *outgoingStreamsMap[T]) GetStream(num protocol.StreamNum) (T, error) {
func (m *outgoingStreamsMap[T]) GetStream(id protocol.StreamID) (T, error) {
m.mutex.RLock()
if num >= m.nextStream {
if id >= m.nextStream {
m.mutex.RUnlock()
return *new(T), streamError{
message: "peer attempted to open stream %d",
nums: []protocol.StreamNum{num},
}
return *new(T), fmt.Errorf("peer attempted to open stream %d", id)
}
s := m.streams[num]
s := m.streams[id]
m.mutex.RUnlock()
return s, nil
}
func (m *outgoingStreamsMap[T]) DeleteStream(num protocol.StreamNum) error {
func (m *outgoingStreamsMap[T]) DeleteStream(id protocol.StreamID) error {
m.mutex.Lock()
defer m.mutex.Unlock()
if _, ok := m.streams[num]; !ok {
return streamError{
message: "tried to delete unknown outgoing stream %d",
nums: []protocol.StreamNum{num},
}
if _, ok := m.streams[id]; !ok {
return fmt.Errorf("tried to delete unknown outgoing stream %d", id)
}
delete(m.streams, num)
delete(m.streams, id)
return nil
}
func (m *outgoingStreamsMap[T]) SetMaxStream(num protocol.StreamNum) {
func (m *outgoingStreamsMap[T]) SetMaxStream(id protocol.StreamID) {
m.mutex.Lock()
defer m.mutex.Unlock()
if num <= m.maxStream {
if id <= m.maxStream {
return
}
m.maxStream = num
m.maxStream = id
m.blockedSent = false
if m.maxStream < m.nextStream-1+protocol.StreamNum(len(m.openQueue)) {
if m.maxStream < m.nextStream-4+4*protocol.StreamID(len(m.openQueue)) {
m.maybeSendBlockedFrame()
}
m.maybeUnblockOpenSync()