diff --git a/internal/congestion/pacer.go b/internal/congestion/pacer.go new file mode 100644 index 00000000..b78ec1f2 --- /dev/null +++ b/internal/congestion/pacer.go @@ -0,0 +1,61 @@ +package congestion + +import ( + "math" + "time" + + "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/utils" +) + +const maxBurstSize = 10 * maxDatagramSize + +// The pacer implements a token bucket pacing algorithm. +type pacer struct { + budgetAtLastSent protocol.ByteCount + lastSentTime time.Time + bandwidth uint64 // in bytes / s +} + +func newPacer(bw uint64) *pacer { + return &pacer{ + bandwidth: bw, + budgetAtLastSent: maxBurstSize, + } +} + +func (p *pacer) SentPacket(sendTime time.Time, size protocol.ByteCount) { + budget := p.Budget(sendTime) + if size > budget { + p.budgetAtLastSent = 0 + } else { + p.budgetAtLastSent = budget - size + } + p.lastSentTime = sendTime +} + +func (p *pacer) SetBandwidth(bw uint64) { + if bw == 0 { + panic("zero bandwidth") + } + p.bandwidth = bw +} + +func (p *pacer) Budget(now time.Time) protocol.ByteCount { + if p.lastSentTime.IsZero() { + return p.budgetAtLastSent + } + return utils.MinByteCount( + maxBurstSize, + p.budgetAtLastSent+(protocol.ByteCount(p.bandwidth)*protocol.ByteCount(now.Sub(p.lastSentTime).Nanoseconds()))/1e9, + ) +} + +// TimeUntilSend returns when the next packet should be sent. +func (p *pacer) TimeUntilSend() time.Time { + if p.budgetAtLastSent >= maxDatagramSize { + return time.Time{} + } + // TODO: don't allow pacing faster than MinPacingDelay + return p.lastSentTime.Add(time.Duration(math.Ceil(float64(maxDatagramSize-p.budgetAtLastSent)*1e9/float64(p.bandwidth))) * time.Nanosecond) +} diff --git a/internal/congestion/pacer_test.go b/internal/congestion/pacer_test.go new file mode 100644 index 00000000..937213f3 --- /dev/null +++ b/internal/congestion/pacer_test.go @@ -0,0 +1,91 @@ +package congestion + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Pacer", func() { + var p *pacer + + const packetsPerSecond = 42 + + BeforeEach(func() { + p = newPacer(packetsPerSecond * uint64(maxDatagramSize)) // bandwidth: 42 full-size packets per second + }) + + It("allows a burst at the beginning", func() { + t := time.Now() + Expect(p.TimeUntilSend()).To(BeZero()) + Expect(p.Budget(t)).To(BeEquivalentTo(maxBurstSize)) + }) + + It("reduces the budget when sending packets", func() { + t := time.Now() + budget := p.Budget(t) + for budget > 0 { + Expect(p.TimeUntilSend()).To(BeZero()) + Expect(p.Budget(t)).To(Equal(budget)) + p.SentPacket(t, maxDatagramSize) + budget -= maxDatagramSize + } + Expect(p.Budget(t)).To(BeZero()) + Expect(p.TimeUntilSend()).ToNot(BeZero()) + }) + + sendBurst := func(t time.Time) { + for p.Budget(t) > 0 { + p.SentPacket(t, maxDatagramSize) + } + } + + It("paces packets after a burst", func() { + t := time.Now() + sendBurst(t) + // send 100 exactly paced packets + for i := 0; i < 100; i++ { + t2 := p.TimeUntilSend() + Expect(t2.Sub(t)).To(BeNumerically("~", time.Second/packetsPerSecond, time.Nanosecond)) + Expect(p.Budget(t2)).To(BeEquivalentTo(maxDatagramSize)) + p.SentPacket(t2, maxDatagramSize) + t = t2 + } + }) + + It("accounts for non-full-size packets", func() { + t := time.Now() + sendBurst(t) + t2 := p.TimeUntilSend() + Expect(t2.Sub(t)).To(BeNumerically("~", time.Second/packetsPerSecond, time.Nanosecond)) + // send a half-full packet + Expect(p.Budget(t2)).To(BeEquivalentTo(maxDatagramSize)) + size := maxDatagramSize / 2 + p.SentPacket(t2, size) + Expect(p.Budget(t2)).To(Equal(maxDatagramSize - size)) + Expect(p.TimeUntilSend()).To(BeTemporally("~", t2.Add(time.Second/packetsPerSecond/2), time.Nanosecond)) + }) + + It("accumulates budget, if no packets are sent", func() { + t := time.Now() + sendBurst(t) + t2 := p.TimeUntilSend() + Expect(t2).To(BeTemporally(">", t)) + // wait for 5 times the duration + Expect(p.Budget(t.Add(5 * t2.Sub(t)))).To(BeEquivalentTo(5 * maxDatagramSize)) + }) + + It("never allows bursts larger than the maximum burst size", func() { + t := time.Now() + sendBurst(t) + Expect(p.Budget(t.Add(time.Hour))).To(BeEquivalentTo(maxBurstSize)) + }) + + It("changes the bandwidth", func() { + t := time.Now() + sendBurst(t) + p.SetBandwidth(uint64(maxDatagramSize)) // reduce the bandwidth to 1 packet per second + Expect(p.TimeUntilSend()).To(Equal(t.Add(time.Second))) + }) +})