forked from quic-go/quic-go
add a function to reset the streams map after 0-RTT rejection
This commit is contained in:
153
streams_map.go
153
streams_map.go
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
|
||||
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||
@@ -45,14 +46,20 @@ var errTooManyOpenStreams = errors.New("too many open streams")
|
||||
|
||||
type streamsMap struct {
|
||||
perspective protocol.Perspective
|
||||
version protocol.VersionNumber
|
||||
|
||||
maxIncomingBidiStreams uint64
|
||||
maxIncomingUniStreams uint64
|
||||
|
||||
sender streamSender
|
||||
newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController
|
||||
|
||||
mutex sync.Mutex
|
||||
outgoingBidiStreams *outgoingBidiStreamsMap
|
||||
outgoingUniStreams *outgoingUniStreamsMap
|
||||
incomingBidiStreams *incomingBidiStreamsMap
|
||||
incomingUniStreams *incomingUniStreamsMap
|
||||
reset bool
|
||||
}
|
||||
|
||||
var _ streamManager = &streamsMap{}
|
||||
@@ -66,70 +73,119 @@ func newStreamsMap(
|
||||
version protocol.VersionNumber,
|
||||
) streamManager {
|
||||
m := &streamsMap{
|
||||
perspective: perspective,
|
||||
newFlowController: newFlowController,
|
||||
sender: sender,
|
||||
perspective: perspective,
|
||||
newFlowController: newFlowController,
|
||||
maxIncomingBidiStreams: maxIncomingBidiStreams,
|
||||
maxIncomingUniStreams: maxIncomingUniStreams,
|
||||
sender: sender,
|
||||
version: version,
|
||||
}
|
||||
m.outgoingBidiStreams = newOutgoingBidiStreamsMap(
|
||||
func(num protocol.StreamNum) streamI {
|
||||
id := num.StreamID(protocol.StreamTypeBidi, perspective)
|
||||
return newStream(id, m.sender, m.newFlowController(id), version)
|
||||
},
|
||||
sender.queueControlFrame,
|
||||
)
|
||||
m.incomingBidiStreams = newIncomingBidiStreamsMap(
|
||||
func(num protocol.StreamNum) streamI {
|
||||
id := num.StreamID(protocol.StreamTypeBidi, perspective.Opposite())
|
||||
return newStream(id, m.sender, m.newFlowController(id), version)
|
||||
},
|
||||
maxIncomingBidiStreams,
|
||||
sender.queueControlFrame,
|
||||
)
|
||||
m.outgoingUniStreams = newOutgoingUniStreamsMap(
|
||||
func(num protocol.StreamNum) sendStreamI {
|
||||
id := num.StreamID(protocol.StreamTypeUni, perspective)
|
||||
return newSendStream(id, m.sender, m.newFlowController(id), version)
|
||||
},
|
||||
sender.queueControlFrame,
|
||||
)
|
||||
m.incomingUniStreams = newIncomingUniStreamsMap(
|
||||
func(num protocol.StreamNum) receiveStreamI {
|
||||
id := num.StreamID(protocol.StreamTypeUni, perspective.Opposite())
|
||||
return newReceiveStream(id, m.sender, m.newFlowController(id), version)
|
||||
},
|
||||
maxIncomingUniStreams,
|
||||
sender.queueControlFrame,
|
||||
)
|
||||
m.initMaps()
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *streamsMap) initMaps() {
|
||||
m.outgoingBidiStreams = newOutgoingBidiStreamsMap(
|
||||
func(num protocol.StreamNum) streamI {
|
||||
id := num.StreamID(protocol.StreamTypeBidi, m.perspective)
|
||||
return newStream(id, m.sender, m.newFlowController(id), m.version)
|
||||
},
|
||||
m.sender.queueControlFrame,
|
||||
)
|
||||
m.incomingBidiStreams = newIncomingBidiStreamsMap(
|
||||
func(num protocol.StreamNum) streamI {
|
||||
id := num.StreamID(protocol.StreamTypeBidi, m.perspective.Opposite())
|
||||
return newStream(id, m.sender, m.newFlowController(id), m.version)
|
||||
},
|
||||
m.maxIncomingBidiStreams,
|
||||
m.sender.queueControlFrame,
|
||||
)
|
||||
m.outgoingUniStreams = newOutgoingUniStreamsMap(
|
||||
func(num protocol.StreamNum) sendStreamI {
|
||||
id := num.StreamID(protocol.StreamTypeUni, m.perspective)
|
||||
return newSendStream(id, m.sender, m.newFlowController(id), m.version)
|
||||
},
|
||||
m.sender.queueControlFrame,
|
||||
)
|
||||
m.incomingUniStreams = newIncomingUniStreamsMap(
|
||||
func(num protocol.StreamNum) receiveStreamI {
|
||||
id := num.StreamID(protocol.StreamTypeUni, m.perspective.Opposite())
|
||||
return newReceiveStream(id, m.sender, m.newFlowController(id), m.version)
|
||||
},
|
||||
m.maxIncomingUniStreams,
|
||||
m.sender.queueControlFrame,
|
||||
)
|
||||
}
|
||||
|
||||
func (m *streamsMap) OpenStream() (Stream, error) {
|
||||
str, err := m.outgoingBidiStreams.OpenStream()
|
||||
m.mutex.Lock()
|
||||
reset := m.reset
|
||||
mm := m.outgoingBidiStreams
|
||||
m.mutex.Unlock()
|
||||
if reset {
|
||||
return nil, Err0RTTRejected
|
||||
}
|
||||
str, err := mm.OpenStream()
|
||||
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
|
||||
}
|
||||
|
||||
func (m *streamsMap) OpenStreamSync(ctx context.Context) (Stream, error) {
|
||||
str, err := m.outgoingBidiStreams.OpenStreamSync(ctx)
|
||||
m.mutex.Lock()
|
||||
reset := m.reset
|
||||
mm := m.outgoingBidiStreams
|
||||
m.mutex.Unlock()
|
||||
if reset {
|
||||
return nil, Err0RTTRejected
|
||||
}
|
||||
str, err := mm.OpenStreamSync(ctx)
|
||||
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
|
||||
}
|
||||
|
||||
func (m *streamsMap) OpenUniStream() (SendStream, error) {
|
||||
str, err := m.outgoingUniStreams.OpenStream()
|
||||
m.mutex.Lock()
|
||||
reset := m.reset
|
||||
mm := m.outgoingUniStreams
|
||||
m.mutex.Unlock()
|
||||
if reset {
|
||||
return nil, Err0RTTRejected
|
||||
}
|
||||
str, err := mm.OpenStream()
|
||||
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
|
||||
}
|
||||
|
||||
func (m *streamsMap) OpenUniStreamSync(ctx context.Context) (SendStream, error) {
|
||||
str, err := m.outgoingUniStreams.OpenStreamSync(ctx)
|
||||
m.mutex.Lock()
|
||||
reset := m.reset
|
||||
mm := m.outgoingUniStreams
|
||||
m.mutex.Unlock()
|
||||
if reset {
|
||||
return nil, Err0RTTRejected
|
||||
}
|
||||
str, err := mm.OpenStreamSync(ctx)
|
||||
return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective)
|
||||
}
|
||||
|
||||
func (m *streamsMap) AcceptStream(ctx context.Context) (Stream, error) {
|
||||
str, err := m.incomingBidiStreams.AcceptStream(ctx)
|
||||
m.mutex.Lock()
|
||||
reset := m.reset
|
||||
mm := m.incomingBidiStreams
|
||||
m.mutex.Unlock()
|
||||
if reset {
|
||||
return nil, Err0RTTRejected
|
||||
}
|
||||
str, err := mm.AcceptStream(ctx)
|
||||
return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective.Opposite())
|
||||
}
|
||||
|
||||
func (m *streamsMap) AcceptUniStream(ctx context.Context) (ReceiveStream, error) {
|
||||
str, err := m.incomingUniStreams.AcceptStream(ctx)
|
||||
m.mutex.Lock()
|
||||
reset := m.reset
|
||||
mm := m.incomingUniStreams
|
||||
m.mutex.Unlock()
|
||||
if reset {
|
||||
return nil, Err0RTTRejected
|
||||
}
|
||||
str, err := mm.AcceptStream(ctx)
|
||||
return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective.Opposite())
|
||||
}
|
||||
|
||||
@@ -232,3 +288,22 @@ func (m *streamsMap) CloseWithError(err error) {
|
||||
m.incomingBidiStreams.CloseWithError(err)
|
||||
m.incomingUniStreams.CloseWithError(err)
|
||||
}
|
||||
|
||||
// ResetFor0RTT resets is used when 0-RTT is rejected. In that case, the streams maps are
|
||||
// 1. closed with an Err0RTTRejected, making calls to Open{Uni}Stream{Sync} / Accept{Uni}Stream return that error.
|
||||
// 2. reset to their initial state, such that we can immediately process new incoming stream data.
|
||||
// Afterwards, calls to Open{Uni}Stream{Sync} / Accept{Uni}Stream will continue to return the error,
|
||||
// until UseResetMaps() has been called.
|
||||
func (m *streamsMap) ResetFor0RTT() {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
m.reset = true
|
||||
m.CloseWithError(Err0RTTRejected)
|
||||
m.initMaps()
|
||||
}
|
||||
|
||||
func (m *streamsMap) UseResetMaps() {
|
||||
m.mutex.Lock()
|
||||
m.reset = false
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user