forked from quic-go/quic-go
use a ring buffer in the framer (#3857)
* implement and use ringbuffer in framer * Add comments for ring buffer Co-authored-by: Marten Seemann <martenseemann@gmail.com> --------- Co-authored-by: Marten Seemann <martenseemann@gmail.com>
This commit is contained in:
16
framer.go
16
framer.go
@@ -6,6 +6,7 @@ import (
|
|||||||
|
|
||||||
"github.com/quic-go/quic-go/internal/ackhandler"
|
"github.com/quic-go/quic-go/internal/ackhandler"
|
||||||
"github.com/quic-go/quic-go/internal/protocol"
|
"github.com/quic-go/quic-go/internal/protocol"
|
||||||
|
"github.com/quic-go/quic-go/internal/utils/ringbuffer"
|
||||||
"github.com/quic-go/quic-go/internal/wire"
|
"github.com/quic-go/quic-go/internal/wire"
|
||||||
"github.com/quic-go/quic-go/quicvarint"
|
"github.com/quic-go/quic-go/quicvarint"
|
||||||
)
|
)
|
||||||
@@ -28,7 +29,7 @@ type framerI struct {
|
|||||||
streamGetter streamGetter
|
streamGetter streamGetter
|
||||||
|
|
||||||
activeStreams map[protocol.StreamID]struct{}
|
activeStreams map[protocol.StreamID]struct{}
|
||||||
streamQueue []protocol.StreamID
|
streamQueue ringbuffer.RingBuffer[protocol.StreamID]
|
||||||
|
|
||||||
controlFrameMutex sync.Mutex
|
controlFrameMutex sync.Mutex
|
||||||
controlFrames []wire.Frame
|
controlFrames []wire.Frame
|
||||||
@@ -45,7 +46,7 @@ func newFramer(streamGetter streamGetter) framer {
|
|||||||
|
|
||||||
func (f *framerI) HasData() bool {
|
func (f *framerI) HasData() bool {
|
||||||
f.mutex.Lock()
|
f.mutex.Lock()
|
||||||
hasData := len(f.streamQueue) > 0
|
hasData := !f.streamQueue.Empty()
|
||||||
f.mutex.Unlock()
|
f.mutex.Unlock()
|
||||||
if hasData {
|
if hasData {
|
||||||
return true
|
return true
|
||||||
@@ -84,7 +85,7 @@ func (f *framerI) AppendControlFrames(frames []*ackhandler.Frame, maxLen protoco
|
|||||||
func (f *framerI) AddActiveStream(id protocol.StreamID) {
|
func (f *framerI) AddActiveStream(id protocol.StreamID) {
|
||||||
f.mutex.Lock()
|
f.mutex.Lock()
|
||||||
if _, ok := f.activeStreams[id]; !ok {
|
if _, ok := f.activeStreams[id]; !ok {
|
||||||
f.streamQueue = append(f.streamQueue, id)
|
f.streamQueue.PushBack(id)
|
||||||
f.activeStreams[id] = struct{}{}
|
f.activeStreams[id] = struct{}{}
|
||||||
}
|
}
|
||||||
f.mutex.Unlock()
|
f.mutex.Unlock()
|
||||||
@@ -95,13 +96,12 @@ func (f *framerI) AppendStreamFrames(frames []*ackhandler.Frame, maxLen protocol
|
|||||||
var lastFrame *ackhandler.Frame
|
var lastFrame *ackhandler.Frame
|
||||||
f.mutex.Lock()
|
f.mutex.Lock()
|
||||||
// pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
|
// pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
|
||||||
numActiveStreams := len(f.streamQueue)
|
numActiveStreams := f.streamQueue.Len()
|
||||||
for i := 0; i < numActiveStreams; i++ {
|
for i := 0; i < numActiveStreams; i++ {
|
||||||
if protocol.MinStreamFrameSize+length > maxLen {
|
if protocol.MinStreamFrameSize+length > maxLen {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
id := f.streamQueue[0]
|
id := f.streamQueue.PopFront()
|
||||||
f.streamQueue = f.streamQueue[1:]
|
|
||||||
// This should never return an error. Better check it anyway.
|
// This should never return an error. Better check it anyway.
|
||||||
// The stream will only be in the streamQueue, if it enqueued itself there.
|
// The stream will only be in the streamQueue, if it enqueued itself there.
|
||||||
str, err := f.streamGetter.GetOrOpenSendStream(id)
|
str, err := f.streamGetter.GetOrOpenSendStream(id)
|
||||||
@@ -117,7 +117,7 @@ func (f *framerI) AppendStreamFrames(frames []*ackhandler.Frame, maxLen protocol
|
|||||||
remainingLen += quicvarint.Len(uint64(remainingLen))
|
remainingLen += quicvarint.Len(uint64(remainingLen))
|
||||||
frame, hasMoreData := str.popStreamFrame(remainingLen, v)
|
frame, hasMoreData := str.popStreamFrame(remainingLen, v)
|
||||||
if hasMoreData { // put the stream back in the queue (at the end)
|
if hasMoreData { // put the stream back in the queue (at the end)
|
||||||
f.streamQueue = append(f.streamQueue, id)
|
f.streamQueue.PushBack(id)
|
||||||
} else { // no more data to send. Stream is not active any more
|
} else { // no more data to send. Stream is not active any more
|
||||||
delete(f.activeStreams, id)
|
delete(f.activeStreams, id)
|
||||||
}
|
}
|
||||||
@@ -146,7 +146,7 @@ func (f *framerI) Handle0RTTRejection() error {
|
|||||||
defer f.mutex.Unlock()
|
defer f.mutex.Unlock()
|
||||||
|
|
||||||
f.controlFrameMutex.Lock()
|
f.controlFrameMutex.Lock()
|
||||||
f.streamQueue = f.streamQueue[:0]
|
f.streamQueue.Clear()
|
||||||
for id := range f.activeStreams {
|
for id := range f.activeStreams {
|
||||||
delete(f.activeStreams, id)
|
delete(f.activeStreams, id)
|
||||||
}
|
}
|
||||||
|
|||||||
86
internal/utils/ringbuffer/ringbuffer.go
Normal file
86
internal/utils/ringbuffer/ringbuffer.go
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
package ringbuffer
|
||||||
|
|
||||||
|
// A RingBuffer is a ring buffer.
|
||||||
|
// It acts as a heap that doesn't cause any allocations.
|
||||||
|
type RingBuffer[T any] struct {
|
||||||
|
ring []T
|
||||||
|
headPos, tailPos int
|
||||||
|
full bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init preallocs a buffer with a certain size.
|
||||||
|
func (r *RingBuffer[T]) Init(size int) {
|
||||||
|
r.ring = make([]T, size)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len returns the number of elements in the ring buffer.
|
||||||
|
func (r *RingBuffer[T]) Len() int {
|
||||||
|
if r.full {
|
||||||
|
return len(r.ring)
|
||||||
|
}
|
||||||
|
if r.tailPos >= r.headPos {
|
||||||
|
return r.tailPos - r.headPos
|
||||||
|
}
|
||||||
|
return r.tailPos - r.headPos + len(r.ring)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Empty says if the ring buffer is empty.
|
||||||
|
func (r *RingBuffer[T]) Empty() bool {
|
||||||
|
return !r.full && r.headPos == r.tailPos
|
||||||
|
}
|
||||||
|
|
||||||
|
// PushBack adds a new element.
|
||||||
|
// If the ring buffer is full, its capacity is increased first.
|
||||||
|
func (r *RingBuffer[T]) PushBack(t T) {
|
||||||
|
if r.full || len(r.ring) == 0 {
|
||||||
|
r.grow()
|
||||||
|
}
|
||||||
|
r.ring[r.tailPos] = t
|
||||||
|
r.tailPos++
|
||||||
|
if r.tailPos == len(r.ring) {
|
||||||
|
r.tailPos = 0
|
||||||
|
}
|
||||||
|
if r.tailPos == r.headPos {
|
||||||
|
r.full = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PopFront returns the next element.
|
||||||
|
// It must not be called when the buffer is empty, that means that
|
||||||
|
// callers might need to check if there are elements in the buffer first.
|
||||||
|
func (r *RingBuffer[T]) PopFront() T {
|
||||||
|
if r.Empty() {
|
||||||
|
panic("github.com/quic-go/quic-go/internal/utils/ringbuffer: pop from an empty queue")
|
||||||
|
}
|
||||||
|
r.full = false
|
||||||
|
t := r.ring[r.headPos]
|
||||||
|
r.ring[r.headPos] = *new(T)
|
||||||
|
r.headPos++
|
||||||
|
if r.headPos == len(r.ring) {
|
||||||
|
r.headPos = 0
|
||||||
|
}
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
// Grow the maximum size of the queue.
|
||||||
|
// This method assume the queue is full.
|
||||||
|
func (r *RingBuffer[T]) grow() {
|
||||||
|
oldRing := r.ring
|
||||||
|
newSize := len(oldRing) * 2
|
||||||
|
if newSize == 0 {
|
||||||
|
newSize = 1
|
||||||
|
}
|
||||||
|
r.ring = make([]T, newSize)
|
||||||
|
headLen := copy(r.ring, oldRing[r.headPos:])
|
||||||
|
copy(r.ring[headLen:], oldRing[:r.headPos])
|
||||||
|
r.headPos, r.tailPos, r.full = 0, len(oldRing), false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear removes all elements.
|
||||||
|
func (r *RingBuffer[T]) Clear() {
|
||||||
|
var zeroValue T
|
||||||
|
for i := range r.ring {
|
||||||
|
r.ring[i] = zeroValue
|
||||||
|
}
|
||||||
|
r.headPos, r.tailPos, r.full = 0, 0, false
|
||||||
|
}
|
||||||
12
internal/utils/ringbuffer/ringbuffer_bench_test.go
Normal file
12
internal/utils/ringbuffer/ringbuffer_bench_test.go
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
package ringbuffer
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func BenchmarkRingBuffer(b *testing.B) {
|
||||||
|
r := RingBuffer[int]{}
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
r.PushBack(i)
|
||||||
|
r.PopFront()
|
||||||
|
}
|
||||||
|
}
|
||||||
13
internal/utils/ringbuffer/ringbuffer_suite_test.go
Normal file
13
internal/utils/ringbuffer/ringbuffer_suite_test.go
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
package ringbuffer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTestdata(t *testing.T) {
|
||||||
|
RegisterFailHandler(Fail)
|
||||||
|
RunSpecs(t, "ringbuffer suite")
|
||||||
|
}
|
||||||
38
internal/utils/ringbuffer/ringbuffer_test.go
Normal file
38
internal/utils/ringbuffer/ringbuffer_test.go
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
package ringbuffer
|
||||||
|
|
||||||
|
import (
|
||||||
|
. "github.com/onsi/ginkgo/v2"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("RingBuffer", func() {
|
||||||
|
It("push and pop", func() {
|
||||||
|
r := RingBuffer[int]{}
|
||||||
|
Expect(len(r.ring)).To(Equal(0))
|
||||||
|
Expect(func() { r.PopFront() }).To(Panic())
|
||||||
|
r.PushBack(1)
|
||||||
|
r.PushBack(2)
|
||||||
|
r.PushBack(3)
|
||||||
|
Expect(r.PopFront()).To(Equal(1))
|
||||||
|
Expect(r.PopFront()).To(Equal(2))
|
||||||
|
r.PushBack(4)
|
||||||
|
r.PushBack(5)
|
||||||
|
Expect(r.Len()).To(Equal(3))
|
||||||
|
r.PushBack(6)
|
||||||
|
Expect(r.Len()).To(Equal(4))
|
||||||
|
Expect(r.PopFront()).To(Equal(3))
|
||||||
|
Expect(r.PopFront()).To(Equal(4))
|
||||||
|
Expect(r.PopFront()).To(Equal(5))
|
||||||
|
Expect(r.PopFront()).To(Equal(6))
|
||||||
|
})
|
||||||
|
It("clear", func() {
|
||||||
|
r := RingBuffer[int]{}
|
||||||
|
r.Init(2)
|
||||||
|
r.PushBack(1)
|
||||||
|
r.PushBack(2)
|
||||||
|
Expect(r.full).To(BeTrue())
|
||||||
|
r.Clear()
|
||||||
|
Expect(r.full).To(BeFalse())
|
||||||
|
Expect(r.Len()).To(Equal(0))
|
||||||
|
})
|
||||||
|
})
|
||||||
Reference in New Issue
Block a user