convert SendStream to a struct (#5172)

This commit is contained in:
Marten Seemann
2025-06-01 11:53:53 +08:00
committed by GitHub
parent eb656df2fe
commit 78e77bcfdb
18 changed files with 253 additions and 636 deletions

View File

@@ -52,7 +52,7 @@ type streamsMap struct {
mutex sync.Mutex
outgoingBidiStreams *outgoingStreamsMap[*Stream]
outgoingUniStreams *outgoingStreamsMap[sendStreamI]
outgoingUniStreams *outgoingStreamsMap[*SendStream]
incomingBidiStreams *incomingStreamsMap[*Stream]
incomingUniStreams *incomingStreamsMap[receiveStreamI]
reset bool
@@ -102,7 +102,7 @@ func (m *streamsMap) initMaps() {
)
m.outgoingUniStreams = newOutgoingStreamsMap(
protocol.StreamTypeUni,
func(num protocol.StreamNum) sendStreamI {
func(num protocol.StreamNum) *SendStream {
id := num.StreamID(protocol.StreamTypeUni, m.perspective)
return newSendStream(m.ctx, id, m.sender, m.newFlowController(id))
},
@@ -143,7 +143,7 @@ func (m *streamsMap) OpenStreamSync(ctx context.Context) (*Stream, error) {
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
}
func (m *streamsMap) OpenUniStream() (SendStream, error) {
func (m *streamsMap) OpenUniStream() (*SendStream, error) {
m.mutex.Lock()
reset := m.reset
mm := m.outgoingUniStreams
@@ -155,7 +155,7 @@ func (m *streamsMap) OpenUniStream() (SendStream, error) {
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
}
func (m *streamsMap) OpenUniStreamSync(ctx context.Context) (SendStream, error) {
func (m *streamsMap) OpenUniStreamSync(ctx context.Context) (*SendStream, error) {
m.mutex.Lock()
reset := m.reset
mm := m.outgoingUniStreams
@@ -247,7 +247,7 @@ func (m *streamsMap) getOrOpenReceiveStream(id protocol.StreamID) (receiveStream
panic("")
}
func (m *streamsMap) GetOrOpenSendStream(id protocol.StreamID) (sendStreamI, error) {
func (m *streamsMap) GetOrOpenSendStream(id protocol.StreamID) (*SendStream, error) {
str, err := m.getOrOpenSendStream(id)
if err != nil {
return nil, &qerr.TransportError{
@@ -258,12 +258,15 @@ func (m *streamsMap) GetOrOpenSendStream(id protocol.StreamID) (sendStreamI, err
return str, nil
}
func (m *streamsMap) getOrOpenSendStream(id protocol.StreamID) (sendStreamI, error) {
func (m *streamsMap) getOrOpenSendStream(id protocol.StreamID) (*SendStream, error) {
num := id.StreamNum()
switch id.Type() {
case protocol.StreamTypeUni:
if id.InitiatedBy() == m.perspective {
str, err := m.outgoingUniStreams.GetStream(num)
if str == nil && err == nil {
return nil, nil
}
return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective)
}
// an incoming unidirectional stream is a receive stream, not a send stream
@@ -274,13 +277,19 @@ func (m *streamsMap) getOrOpenSendStream(id protocol.StreamID) (sendStreamI, err
if str == nil && err == nil {
return nil, nil
}
return str, convertStreamError(err, protocol.StreamTypeBidi, id.InitiatedBy())
if err != nil {
return nil, convertStreamError(err, protocol.StreamTypeBidi, id.InitiatedBy())
}
return str.SendStream, nil
} else {
str, err := m.incomingBidiStreams.GetOrOpenStream(num)
if str == nil && err == nil {
return nil, nil
}
return str, convertStreamError(err, protocol.StreamTypeBidi, id.InitiatedBy())
if err != nil {
return nil, convertStreamError(err, protocol.StreamTypeBidi, id.InitiatedBy())
}
return str.SendStream, nil
}
}
panic("")