implement sender side behavior for RESET_STREAM_AT (#5242)

* improve existing send stream test

* implement sender side behavior for RESET_STREAM_AT

* refactor send stream cancelation and shutdown error handling

* correctly deal with 0-RTT corner case
This commit is contained in:
Marten Seemann
2025-06-28 11:44:47 +08:00
committed by GitHub
parent b2f24318af
commit a2926a3603
10 changed files with 864 additions and 138 deletions

View File

@@ -30,12 +30,13 @@ type streamsMap struct {
queueControlFrame func(wire.Frame)
newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController
mutex sync.Mutex
outgoingBidiStreams *outgoingStreamsMap[*Stream]
outgoingUniStreams *outgoingStreamsMap[*SendStream]
incomingBidiStreams *incomingStreamsMap[*Stream]
incomingUniStreams *incomingStreamsMap[*ReceiveStream]
reset bool
mutex sync.Mutex
outgoingBidiStreams *outgoingStreamsMap[*Stream]
outgoingUniStreams *outgoingStreamsMap[*SendStream]
incomingBidiStreams *incomingStreamsMap[*Stream]
incomingUniStreams *incomingStreamsMap[*ReceiveStream]
reset bool
supportsResetStreamAt bool
}
func newStreamsMap(
@@ -64,7 +65,7 @@ func (m *streamsMap) initMaps() {
m.outgoingBidiStreams = newOutgoingStreamsMap(
protocol.StreamTypeBidi,
func(id protocol.StreamID) *Stream {
return newStream(m.ctx, id, m.sender, m.newFlowController(id))
return newStream(m.ctx, id, m.sender, m.newFlowController(id), m.supportsResetStreamAt)
},
m.queueControlFrame,
m.perspective,
@@ -72,7 +73,7 @@ func (m *streamsMap) initMaps() {
m.incomingBidiStreams = newIncomingStreamsMap(
protocol.StreamTypeBidi,
func(id protocol.StreamID) *Stream {
return newStream(m.ctx, id, m.sender, m.newFlowController(id))
return newStream(m.ctx, id, m.sender, m.newFlowController(id), m.supportsResetStreamAt)
},
m.maxIncomingBidiStreams,
m.queueControlFrame,
@@ -81,7 +82,7 @@ func (m *streamsMap) initMaps() {
m.outgoingUniStreams = newOutgoingStreamsMap(
protocol.StreamTypeUni,
func(id protocol.StreamID) *SendStream {
return newSendStream(m.ctx, id, m.sender, m.newFlowController(id))
return newSendStream(m.ctx, id, m.sender, m.newFlowController(id), m.supportsResetStreamAt)
},
m.queueControlFrame,
m.perspective,
@@ -316,7 +317,10 @@ func (m *streamsMap) HandleStreamFrame(f *wire.StreamFrame, rcvTime time.Time) e
return str.handleStreamFrame(f, rcvTime)
}
func (m *streamsMap) UpdateLimits(p *wire.TransportParameters) {
func (m *streamsMap) HandleTransportParameters(p *wire.TransportParameters) {
m.supportsResetStreamAt = p.EnableResetStreamAt
m.outgoingBidiStreams.EnableResetStreamAt()
m.outgoingUniStreams.EnableResetStreamAt()
m.outgoingBidiStreams.UpdateSendWindow(p.InitialMaxStreamDataBidiRemote)
m.outgoingBidiStreams.SetMaxStream(p.MaxBidiStreamNum.StreamID(protocol.StreamTypeBidi, m.perspective))
m.outgoingUniStreams.UpdateSendWindow(p.InitialMaxStreamDataUni)