forked from quic-go/quic-go
fix race condition in flow control manager
This commit is contained in:
@@ -138,6 +138,7 @@ func (f *flowControlManager) AddBytesRead(streamID protocol.StreamID, n protocol
|
|||||||
func (f *flowControlManager) GetWindowUpdates() (res []WindowUpdate) {
|
func (f *flowControlManager) GetWindowUpdates() (res []WindowUpdate) {
|
||||||
f.mutex.Lock()
|
f.mutex.Lock()
|
||||||
defer f.mutex.Unlock()
|
defer f.mutex.Unlock()
|
||||||
|
|
||||||
connFlowController := f.streamFlowController[0]
|
connFlowController := f.streamFlowController[0]
|
||||||
|
|
||||||
// get WindowUpdates for streams
|
// get WindowUpdates for streams
|
||||||
@@ -161,8 +162,9 @@ func (f *flowControlManager) GetWindowUpdates() (res []WindowUpdate) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *flowControlManager) GetReceiveWindow(streamID protocol.StreamID) (protocol.ByteCount, error) {
|
func (f *flowControlManager) GetReceiveWindow(streamID protocol.StreamID) (protocol.ByteCount, error) {
|
||||||
f.mutex.Lock()
|
f.mutex.RLock()
|
||||||
defer f.mutex.Unlock()
|
defer f.mutex.RUnlock()
|
||||||
|
|
||||||
flowController, err := f.getFlowController(streamID)
|
flowController, err := f.getFlowController(streamID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@@ -172,10 +174,10 @@ func (f *flowControlManager) GetReceiveWindow(streamID protocol.StreamID) (proto
|
|||||||
|
|
||||||
// streamID must not be 0 here
|
// streamID must not be 0 here
|
||||||
func (f *flowControlManager) AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error {
|
func (f *flowControlManager) AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error {
|
||||||
// Only lock the part reading from the map, since send-windows are only accessed from the session goroutine.
|
|
||||||
f.mutex.Lock()
|
f.mutex.Lock()
|
||||||
|
defer f.mutex.Unlock()
|
||||||
|
|
||||||
fc, err := f.getFlowController(streamID)
|
fc, err := f.getFlowController(streamID)
|
||||||
f.mutex.Unlock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -190,10 +192,10 @@ func (f *flowControlManager) AddBytesSent(streamID protocol.StreamID, n protocol
|
|||||||
|
|
||||||
// must not be called with StreamID 0
|
// must not be called with StreamID 0
|
||||||
func (f *flowControlManager) SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error) {
|
func (f *flowControlManager) SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error) {
|
||||||
// Only lock the part reading from the map, since send-windows are only accessed from the session goroutine.
|
|
||||||
f.mutex.RLock()
|
f.mutex.RLock()
|
||||||
|
defer f.mutex.RUnlock()
|
||||||
|
|
||||||
fc, err := f.getFlowController(streamID)
|
fc, err := f.getFlowController(streamID)
|
||||||
f.mutex.RUnlock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@@ -207,24 +209,23 @@ func (f *flowControlManager) SendWindowSize(streamID protocol.StreamID) (protoco
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *flowControlManager) RemainingConnectionWindowSize() protocol.ByteCount {
|
func (f *flowControlManager) RemainingConnectionWindowSize() protocol.ByteCount {
|
||||||
// Only lock the part reading from the map, since send-windows are only accessed from the session goroutine.
|
|
||||||
f.mutex.RLock()
|
f.mutex.RLock()
|
||||||
res := f.streamFlowController[0].SendWindowSize()
|
defer f.mutex.RUnlock()
|
||||||
f.mutex.RUnlock()
|
|
||||||
return res
|
return f.streamFlowController[0].SendWindowSize()
|
||||||
}
|
}
|
||||||
|
|
||||||
// streamID may be 0 here
|
// streamID may be 0 here
|
||||||
func (f *flowControlManager) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) {
|
func (f *flowControlManager) UpdateWindow(streamID protocol.StreamID, offset protocol.ByteCount) (bool, error) {
|
||||||
// Only lock the part reading from the map, since send-windows are only accessed from the session goroutine.
|
|
||||||
f.mutex.Lock()
|
f.mutex.Lock()
|
||||||
streamFlowController, err := f.getFlowController(streamID)
|
defer f.mutex.Unlock()
|
||||||
f.mutex.Unlock()
|
|
||||||
|
fc, err := f.getFlowController(streamID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return streamFlowController.UpdateSendWindow(offset), nil
|
return fc.UpdateSendWindow(offset), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *flowControlManager) getFlowController(streamID protocol.StreamID) (*flowController, error) {
|
func (f *flowControlManager) getFlowController(streamID protocol.StreamID) (*flowController, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user