forked from quic-go/quic-go
It is not sufficient to check for connection-level window updates every time a packet is sent. When a connection-level window update needs to be sent, we need to make sure that it gets sent immediately (i.e. call scheduleSending() in the session).
80 lines
2.0 KiB
Go
80 lines
2.0 KiB
Go
package quic
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
|
|
"github.com/lucas-clemente/quic-go/internal/protocol"
|
|
"github.com/lucas-clemente/quic-go/internal/wire"
|
|
)
|
|
|
|
type windowUpdateQueue struct {
|
|
mutex sync.Mutex
|
|
|
|
queue map[protocol.StreamID]bool // used as a set
|
|
queuedConn bool // connection-level window update
|
|
|
|
cryptoStream cryptoStreamI
|
|
streamGetter streamGetter
|
|
connFlowController flowcontrol.ConnectionFlowController
|
|
callback func(wire.Frame)
|
|
}
|
|
|
|
func newWindowUpdateQueue(
|
|
streamGetter streamGetter,
|
|
cryptoStream cryptoStreamI,
|
|
connFC flowcontrol.ConnectionFlowController,
|
|
cb func(wire.Frame),
|
|
) *windowUpdateQueue {
|
|
return &windowUpdateQueue{
|
|
queue: make(map[protocol.StreamID]bool),
|
|
streamGetter: streamGetter,
|
|
cryptoStream: cryptoStream,
|
|
connFlowController: connFC,
|
|
callback: cb,
|
|
}
|
|
}
|
|
|
|
func (q *windowUpdateQueue) AddStream(id protocol.StreamID) {
|
|
q.mutex.Lock()
|
|
q.queue[id] = true
|
|
q.mutex.Unlock()
|
|
}
|
|
|
|
func (q *windowUpdateQueue) AddConnection() {
|
|
q.mutex.Lock()
|
|
q.queuedConn = true
|
|
q.mutex.Unlock()
|
|
}
|
|
|
|
func (q *windowUpdateQueue) QueueAll() {
|
|
q.mutex.Lock()
|
|
// queue a connection-level window update
|
|
if q.queuedConn {
|
|
q.callback(&wire.MaxDataFrame{ByteOffset: q.connFlowController.GetWindowUpdate()})
|
|
q.queuedConn = false
|
|
}
|
|
// queue all stream-level window updates
|
|
var offset protocol.ByteCount
|
|
for id := range q.queue {
|
|
if id == q.cryptoStream.StreamID() {
|
|
offset = q.cryptoStream.getWindowUpdate()
|
|
} else {
|
|
str, err := q.streamGetter.GetOrOpenReceiveStream(id)
|
|
if err != nil || str == nil { // the stream can be nil if it was completed before dequeing the window update
|
|
continue
|
|
}
|
|
offset = str.getWindowUpdate()
|
|
}
|
|
if offset == 0 { // can happen if we received a final offset, right after queueing the window update
|
|
continue
|
|
}
|
|
q.callback(&wire.MaxStreamDataFrame{
|
|
StreamID: id,
|
|
ByteOffset: offset,
|
|
})
|
|
delete(q.queue, id)
|
|
}
|
|
q.mutex.Unlock()
|
|
}
|