optimize packetization of DATA_BLOCKED frames (#4845)

DATA_BLOCKED frames should be sent in the same as the STREAM frames that
resulted in the connection becoming blocked on connection flow control.
If there's not enough space left in that packet, the DATA_BLOCKED frame
is sent in the next packet.
This commit is contained in:
Marten Seemann
2025-01-08 11:41:10 +08:00
committed by GitHub
parent 5a6187c870
commit c42f8456ab
4 changed files with 133 additions and 27 deletions

View File

@@ -8,6 +8,7 @@ import (
"golang.org/x/exp/rand"
"github.com/quic-go/quic-go/internal/ackhandler"
"github.com/quic-go/quic-go/internal/flowcontrol"
"github.com/quic-go/quic-go/internal/protocol"
"github.com/quic-go/quic-go/internal/wire"
@@ -19,7 +20,7 @@ func TestFramerControlFrames(t *testing.T) {
pc := &wire.PathChallengeFrame{Data: [8]byte{1, 2, 3, 4, 6, 7, 8}}
msf := &wire.MaxStreamsFrame{MaxStreamNum: 0x1337}
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
require.False(t, framer.HasData())
framer.QueueControlFrame(pc)
require.True(t, framer.HasData())
@@ -45,7 +46,7 @@ func TestFramerControlFrameSizing(t *testing.T) {
bf := &wire.DataBlockedFrame{MaximumData: 0x1337}
bfLen := bf.Length(protocol.Version1)
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
numFrames := int(maxSize / bfLen) // max number of frames that fit into maxSize
for i := 0; i < numFrames+1; i++ {
framer.QueueControlFrame(bf)
@@ -65,7 +66,7 @@ func TestFramerStreamControlFrames(t *testing.T) {
mdf1 := &wire.MaxStreamDataFrame{StreamID: streamID, MaximumStreamData: 1337}
mdf2 := &wire.MaxStreamDataFrame{StreamID: streamID, MaximumStreamData: 1338}
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
framer.QueueControlFrame(ping)
str := NewMockStreamControlFrameGetter(gomock.NewController(t))
framer.AddStreamWithControlFrames(streamID, str)
@@ -87,7 +88,7 @@ func TestFramerStreamControlFramesSizing(t *testing.T) {
mdf1 := &wire.MaxStreamDataFrame{MaximumStreamData: 1337}
str := NewMockStreamControlFrameGetter(gomock.NewController(t))
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
framer.AddStreamWithControlFrames(10, str)
str.EXPECT().getControlFrame(gomock.Any()).Return(ackhandler.Frame{Frame: mdf1}, true, true).AnyTimes()
frames, _, l := framer.Append(nil, nil, 100, time.Now(), protocol.Version1)
@@ -114,7 +115,7 @@ func TestFramerStreamDataBlocked(t *testing.T) {
func testFramerStreamDataBlocked(t *testing.T, fits bool) {
const streamID = 5
str := NewMockSendStreamI(gomock.NewController(t))
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
framer.AddActiveStream(streamID, str)
str.EXPECT().popStreamFrame(gomock.Any(), gomock.Any()).DoAndReturn(
func(size protocol.ByteCount, v protocol.Version) (ackhandler.StreamFrame, *wire.StreamDataBlockedFrame, bool) {
@@ -151,8 +152,65 @@ func testFramerStreamDataBlocked(t *testing.T, fits bool) {
}
}
func TestFramerDataBlocked(t *testing.T) {
t.Run("small STREAM frame", func(t *testing.T) {
testFramerDataBlocked(t, true)
})
t.Run("large STREAM frame", func(t *testing.T) {
testFramerDataBlocked(t, false)
})
}
// If the stream becomes blocked on connection flow control, we attempt to pack the
// DATA_BLOCKED frame into the same packet.
// However, there's the pathological case, where the STREAM frame and the DATA_BLOCKED frame
// don't fit into the same packet. In that case, the DATA_BLOCKED frame is queued and sent
// in the next packet.
func testFramerDataBlocked(t *testing.T, fits bool) {
const streamID = 5
const offset = 100
fc := flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil)
fc.UpdateSendWindow(offset)
fc.AddBytesSent(offset)
str := NewMockSendStreamI(gomock.NewController(t))
framer := newFramer(fc)
framer.AddActiveStream(streamID, str)
str.EXPECT().popStreamFrame(gomock.Any(), gomock.Any()).DoAndReturn(
func(size protocol.ByteCount, v protocol.Version) (ackhandler.StreamFrame, *wire.StreamDataBlockedFrame, bool) {
data := []byte("foobar")
if !fits {
// Leave 2 bytes in the packet.
// This is not enough to fit in the DATA_BLOCKED frame.
data = make([]byte, size-2)
}
f := &wire.StreamFrame{StreamID: streamID, DataLenPresent: true, Data: data}
return ackhandler.StreamFrame{Frame: f}, nil, false
},
)
const maxSize protocol.ByteCount = 1000
frames, streamFrames, l := framer.Append(nil, nil, maxSize, time.Now(), protocol.Version1)
require.Len(t, streamFrames, 1)
if fits {
require.Len(t, frames, 1)
require.Equal(t, &wire.DataBlockedFrame{MaximumData: offset}, frames[0].Frame)
} else {
require.Equal(t, streamFrames[0].Frame.Length(protocol.Version1), l)
require.Empty(t, frames)
frames, streamFrames, l2 := framer.Append(nil, nil, maxSize, time.Now(), protocol.Version1)
require.Greater(t, l+l2, maxSize)
require.Empty(t, streamFrames)
require.Len(t, frames, 1)
require.Equal(t, &wire.DataBlockedFrame{MaximumData: offset}, frames[0].Frame)
}
}
func TestFramerDetectsFrameDoS(t *testing.T) {
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
for i := 0; i < maxControlFrames-1; i++ {
framer.QueueControlFrame(&wire.PingFrame{})
framer.QueueControlFrame(&wire.PingFrame{})
@@ -170,7 +228,7 @@ func TestFramerDetectsFrameDoS(t *testing.T) {
}
func TestFramerDetectsFramePathResponseDoS(t *testing.T) {
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
var pathResponses []*wire.PathResponseFrame
for i := 0; i < 2*maxPathResponses; i++ {
var f wire.PathResponseFrame
@@ -192,7 +250,7 @@ func TestFramerDetectsFramePathResponseDoS(t *testing.T) {
}
func TestFramerPacksSinglePathResponsePerPacket(t *testing.T) {
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
f1 := &wire.PathResponseFrame{Data: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}}
f2 := &wire.PathResponseFrame{Data: [8]byte{2, 3, 4, 5, 6, 7, 8, 9}}
cf1 := &wire.DataBlockedFrame{MaximumData: 1337}
@@ -224,7 +282,7 @@ func TestFramerAppendStreamFrames(t *testing.T) {
f2 := &wire.StreamFrame{StreamID: str2ID, Data: []byte("bar"), DataLenPresent: true}
totalLen := f1.Length(protocol.Version1) + f2.Length(protocol.Version1)
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
require.False(t, framer.HasData())
// no frames added yet
controlFrames, fs, length := framer.Append(nil, nil, protocol.MaxByteCount, time.Now(), protocol.Version1)
@@ -272,7 +330,7 @@ func TestFramerAppendStreamFrames(t *testing.T) {
func TestFramerRemoveActiveStream(t *testing.T) {
const id = protocol.StreamID(42)
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
require.False(t, framer.HasData())
framer.AddActiveStream(id, NewMockSendStreamI(gomock.NewController(t)))
require.True(t, framer.HasData())
@@ -285,7 +343,7 @@ func TestFramerRemoveActiveStream(t *testing.T) {
func TestFramerMinStreamFrameSize(t *testing.T) {
const id = protocol.StreamID(42)
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
str := NewMockSendStreamI(gomock.NewController(t))
framer.AddActiveStream(id, str)
@@ -310,7 +368,7 @@ func TestFramerMinStreamFrameSize(t *testing.T) {
func TestFramerMinStreamFrameSizeMultipleStreamFrames(t *testing.T) {
const id = protocol.StreamID(42)
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
str := NewMockSendStreamI(gomock.NewController(t))
framer.AddActiveStream(id, str)
@@ -331,7 +389,7 @@ func TestFramerMinStreamFrameSizeMultipleStreamFrames(t *testing.T) {
func TestFramerFillPacketOneStream(t *testing.T) {
const id = protocol.StreamID(42)
str := NewMockSendStreamI(gomock.NewController(t))
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
for i := protocol.MinStreamFrameSize; i < 2000; i++ {
str.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).DoAndReturn(
@@ -362,7 +420,7 @@ func TestFramerFillPacketMultipleStreams(t *testing.T) {
mockCtrl := gomock.NewController(t)
stream1 := NewMockSendStreamI(mockCtrl)
stream2 := NewMockSendStreamI(mockCtrl)
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
for i := 2 * protocol.MinStreamFrameSize; i < 2000; i++ {
stream1.EXPECT().popStreamFrame(gomock.Any(), protocol.Version1).DoAndReturn(
@@ -398,7 +456,7 @@ func TestFramer0RTTRejection(t *testing.T) {
ping := &wire.PingFrame{}
pc := &wire.PathChallengeFrame{Data: [8]byte{1, 2, 3, 4, 6, 7, 8}}
framer := newFramer()
framer := newFramer(flowcontrol.NewConnectionFlowController(0, 0, nil, nil, nil))
framer.QueueControlFrame(ncid)
framer.QueueControlFrame(&wire.DataBlockedFrame{MaximumData: 1337})
framer.QueueControlFrame(&wire.StreamDataBlockedFrame{StreamID: 42, MaximumStreamData: 1337})