diff --git a/flow_controller.go b/flow_controller.go index 2e703ddc..bbc18daf 100644 --- a/flow_controller.go +++ b/flow_controller.go @@ -1,6 +1,8 @@ package quic import ( + "sync" + "github.com/lucas-clemente/quic-go/handshake" "github.com/lucas-clemente/quic-go/protocol" ) @@ -16,6 +18,8 @@ type flowController struct { receiveWindowUpdateThreshold protocol.ByteCount receiveFlowControlWindow protocol.ByteCount receiveFlowControlWindowIncrement protocol.ByteCount + + mutex sync.RWMutex } func newFlowController(connectionParametersManager *handshake.ConnectionParametersManager) *flowController { @@ -28,12 +32,18 @@ func newFlowController(connectionParametersManager *handshake.ConnectionParamete } func (c *flowController) AddBytesSent(n protocol.ByteCount) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.bytesSent += n } // UpdateSendWindow should be called after receiving a WindowUpdateFrame // it returns true if the window was actually updated func (c *flowController) UpdateSendWindow(newOffset protocol.ByteCount) bool { + c.mutex.Lock() + defer c.mutex.Unlock() + if newOffset > c.sendFlowControlWindow { c.sendFlowControlWindow = newOffset return true @@ -42,6 +52,9 @@ func (c *flowController) UpdateSendWindow(newOffset protocol.ByteCount) bool { } func (c *flowController) SendWindowSize() protocol.ByteCount { + c.mutex.RLock() + defer c.mutex.RUnlock() + if c.bytesSent > c.sendFlowControlWindow { // should never happen, but make sure we don't do an underflow here return 0 } @@ -49,6 +62,9 @@ func (c *flowController) SendWindowSize() protocol.ByteCount { } func (c *flowController) AddBytesRead(n protocol.ByteCount) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.bytesRead += n } @@ -59,6 +75,9 @@ func (c *flowController) MaybeTriggerBlocked() bool { return false } + c.mutex.Lock() + defer c.mutex.Unlock() + if c.lastBlockedSentForOffset == c.sendFlowControlWindow { return false } @@ -70,6 +89,9 @@ func (c *flowController) MaybeTriggerBlocked() bool { // MaybeTriggerWindowUpdate determines if it is necessary to send a WindowUpdate // if so, it returns true and the offset of the window func (c *flowController) MaybeTriggerWindowUpdate() (bool, protocol.ByteCount) { + c.mutex.Lock() + defer c.mutex.Unlock() + diff := c.receiveFlowControlWindow - c.bytesRead if diff < c.receiveWindowUpdateThreshold { c.receiveFlowControlWindow += c.receiveFlowControlWindowIncrement @@ -79,6 +101,9 @@ func (c *flowController) MaybeTriggerWindowUpdate() (bool, protocol.ByteCount) { } func (c *flowController) CheckFlowControlViolation(highestByte protocol.ByteCount) bool { + c.mutex.Lock() + defer c.mutex.Unlock() + if highestByte > c.receiveFlowControlWindow { return true } diff --git a/session.go b/session.go index 56506015..20a15be8 100644 --- a/session.go +++ b/session.go @@ -290,20 +290,18 @@ func (s *Session) handleWindowUpdateFrame(frame *frames.WindowUpdateFrame) error // return errors.New("Connection level flow control not yet implemented") return nil } + s.streamsMutex.RLock() - defer s.streamsMutex.RUnlock() - stream, streamExists := s.streams[frame.StreamID] - if !streamExists { return errWindowUpdateOnInvalidStream } if stream == nil { return errWindowUpdateOnClosedStream } + s.streamsMutex.RUnlock() stream.UpdateSendFlowControlWindow(frame.ByteOffset) - return nil } diff --git a/stream.go b/stream.go index 5459251c..c2719349 100644 --- a/stream.go +++ b/stream.go @@ -140,9 +140,6 @@ func (s *stream) ReadByte() (byte, error) { } func (s *stream) UpdateSendFlowControlWindow(n protocol.ByteCount) { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.flowController.UpdateSendWindow(n) { s.windowUpdateOrErrCond.Broadcast() }