http3: move qlogging of frames into the frame parser (#5378)

This commit is contained in:
Marten Seemann
2025-10-12 12:40:24 +08:00
committed by GitHub
parent ed194a0c5e
commit cd4b1307db
13 changed files with 97 additions and 64 deletions

View File

@@ -18,9 +18,14 @@ func TestResponseBodyReading(t *testing.T) {
var buf bytes.Buffer
buf.Write(getDataFrame([]byte("foobar")))
str := NewMockDatagramStream(mockCtrl)
str.EXPECT().StreamID().Return(quic.StreamID(42)).AnyTimes()
str.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
reqDone := make(chan struct{})
rb := newResponseBody(&Stream{datagramStream: str}, -1, reqDone)
rb := newResponseBody(
newStream(str, nil, nil, func(r io.Reader, u uint64) error { return nil }, nil),
-1,
reqDone,
)
data, err := io.ReadAll(rb)
require.NoError(t, err)
@@ -30,9 +35,14 @@ func TestResponseBodyReading(t *testing.T) {
func TestResponseBodyReadError(t *testing.T) {
mockCtrl := gomock.NewController(t)
str := NewMockDatagramStream(mockCtrl)
str.EXPECT().StreamID().Return(quic.StreamID(42)).AnyTimes()
str.EXPECT().Read(gomock.Any()).Return(0, assert.AnError).Times(2)
reqDone := make(chan struct{})
rb := newResponseBody(&Stream{datagramStream: str}, -1, reqDone)
rb := newResponseBody(
newStream(str, nil, nil, func(r io.Reader, u uint64) error { return nil }, nil),
-1,
reqDone,
)
_, err := rb.Read([]byte{0})
require.ErrorIs(t, err, assert.AnError)
@@ -49,9 +59,14 @@ func TestResponseBodyReadError(t *testing.T) {
func TestResponseBodyClose(t *testing.T) {
mockCtrl := gomock.NewController(t)
str := NewMockDatagramStream(mockCtrl)
str.EXPECT().StreamID().Return(quic.StreamID(42)).AnyTimes()
str.EXPECT().CancelRead(quic.StreamErrorCode(ErrCodeRequestCanceled)).Times(2)
reqDone := make(chan struct{})
rb := newResponseBody(&Stream{datagramStream: str}, -1, reqDone)
rb := newResponseBody(
newStream(str, nil, nil, func(r io.Reader, u uint64) error { return nil }, nil),
-1,
reqDone,
)
require.NoError(t, rb.Close())
select {
case <-reqDone:
@@ -66,9 +81,14 @@ func TestResponseBodyClose(t *testing.T) {
func TestResponseBodyConcurrentClose(t *testing.T) {
mockCtrl := gomock.NewController(t)
str := NewMockDatagramStream(mockCtrl)
str.EXPECT().StreamID().Return(quic.StreamID(42)).AnyTimes()
str.EXPECT().CancelRead(quic.StreamErrorCode(ErrCodeRequestCanceled)).MaxTimes(3)
reqDone := make(chan struct{})
rb := newResponseBody(&Stream{datagramStream: str}, -1, reqDone)
rb := newResponseBody(
newStream(str, nil, nil, func(r io.Reader, u uint64) error { return nil }, nil),
-1,
reqDone,
)
for range 3 {
go rb.Close()
@@ -101,10 +121,15 @@ func testResponseBodyLengthLimiting(t *testing.T, alongFrameBoundary bool) {
}
mockCtrl := gomock.NewController(t)
str := NewMockDatagramStream(mockCtrl)
str.EXPECT().StreamID().Return(quic.StreamID(42)).AnyTimes()
str.EXPECT().CancelRead(quic.StreamErrorCode(ErrCodeMessageError))
str.EXPECT().CancelWrite(quic.StreamErrorCode(ErrCodeMessageError))
str.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
rb := newResponseBody(&Stream{datagramStream: str}, l, make(chan struct{}))
rb := newResponseBody(
newStream(str, nil, nil, func(r io.Reader, u uint64) error { return nil }, nil),
l,
make(chan struct{}),
)
data, err := io.ReadAll(rb)
require.Equal(t, []byte("foobar")[:l], data)
require.ErrorIs(t, err, errTooMuchData)

View File

@@ -158,7 +158,7 @@ func (c *ClientConn) handleBidirectionalStreams(streamHijacker func(FrameType, q
},
}
go func() {
if _, err := fp.ParseNext(); err == errHijacked {
if _, err := fp.ParseNext(c.conn.qlogger); err == errHijacked {
return
}
if err != nil {

View File

@@ -48,7 +48,7 @@ func testClientSettings(t *testing.T, enableDatagrams bool, other map[uint64]uin
require.NoError(t, err)
require.EqualValues(t, streamTypeControlStream, typ)
fp := (&frameParser{r: str})
f, err := fp.ParseNext()
f, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &settingsFrame{}, f)
settingsFrame := f.(*settingsFrame)
@@ -60,6 +60,7 @@ func encodeResponse(t *testing.T, status int) []byte {
mockCtrl := gomock.NewController(t)
buf := &bytes.Buffer{}
rstr := NewMockDatagramStream(mockCtrl)
rstr.EXPECT().StreamID().Return(quic.StreamID(42)).AnyTimes()
rstr.EXPECT().Write(gomock.Any()).Do(buf.Write).AnyTimes()
rw := newResponseWriter(newStream(rstr, nil, nil, func(r io.Reader, u uint64) error { return nil }, nil), nil, false, nil)
rw.WriteHeader(status)
@@ -325,6 +326,7 @@ func TestClient1xxHandling(t *testing.T) {
func testClient1xxHandling(t *testing.T, numEarlyHints int, terminalStatus int, tooMany bool) {
var rspBuf bytes.Buffer
rstr := NewMockDatagramStream(gomock.NewController(t))
rstr.EXPECT().StreamID().Return(quic.StreamID(42)).AnyTimes()
rstr.EXPECT().Write(gomock.Any()).Do(rspBuf.Write).AnyTimes()
rw := newResponseWriter(newStream(rstr, nil, nil, func(r io.Reader, u uint64) error { return nil }, nil), nil, false, nil)
rw.header.Add("Link", "foo")
@@ -404,6 +406,7 @@ func testClientGzip(t *testing.T,
) {
var rspBuf bytes.Buffer
rstr := NewMockDatagramStream(gomock.NewController(t))
rstr.EXPECT().StreamID().Return(quic.StreamID(42)).AnyTimes()
rstr.EXPECT().Write(gomock.Any()).Do(rspBuf.Write).AnyTimes()
rw := newResponseWriter(newStream(rstr, nil, nil, func(r io.Reader, u uint64) error { return nil }, nil), nil, false, nil)
rw.WriteHeader(http.StatusOK)

View File

@@ -317,8 +317,8 @@ func (c *Conn) handleUnidirectionalStreams(hijack func(StreamType, quic.Connecti
}
func (c *Conn) handleControlStream(str *quic.ReceiveStream) {
fp := &frameParser{closeConn: c.conn.CloseWithError, r: str}
f, err := fp.ParseNext()
fp := &frameParser{closeConn: c.conn.CloseWithError, r: str, streamID: str.StreamID()}
f, err := fp.ParseNext(c.qlogger)
if err != nil {
var serr *quic.StreamError
if err == io.EOF || errors.As(err, &serr) {
@@ -362,7 +362,7 @@ func (c *Conn) handleControlStream(str *quic.ReceiveStream) {
}
for {
f, err := fp.ParseNext()
f, err := fp.ParseNext(c.qlogger)
if err != nil {
var serr *quic.StreamError
if err == io.EOF || errors.As(err, &serr) {
@@ -380,12 +380,6 @@ func (c *Conn) handleControlStream(str *quic.ReceiveStream) {
c.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameUnexpected), "")
return
}
if c.qlogger != nil {
c.qlogger.RecordEvent(qlog.FrameParsed{
StreamID: str.StreamID(),
Frame: qlog.Frame{Frame: qlog.GoAwayFrame{StreamID: goaway.StreamID}},
})
}
if goaway.StreamID%4 != 0 { // client-initiated, bidirectional streams
c.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeIDError), "")
return

View File

@@ -7,6 +7,8 @@ import (
"io"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3/qlog"
"github.com/quic-go/quic-go/qlogwriter"
"github.com/quic-go/quic-go/quicvarint"
)
@@ -21,11 +23,12 @@ var errHijacked = errors.New("hijacked")
type frameParser struct {
r io.Reader
streamID quic.StreamID
closeConn func(quic.ApplicationErrorCode, string) error
unknownFrameHandler unknownFrameHandlerFunc
}
func (p *frameParser) ParseNext() (frame, error) {
func (p *frameParser) ParseNext(qlogger qlogwriter.Recorder) (frame, error) {
qr := quicvarint.NewReader(p.r)
for {
t, err := quicvarint.Read(qr)
@@ -59,6 +62,13 @@ func (p *frameParser) ParseNext() (frame, error) {
switch t {
case 0x0:
if qlogger != nil {
qlogger.RecordEvent(qlog.FrameParsed{
StreamID: p.streamID,
Raw: qlog.RawInfo{PayloadLength: int(l)},
Frame: qlog.Frame{Frame: qlog.DataFrame{}},
})
}
return &dataFrame{Length: l}, nil
case 0x1:
return &headersFrame{Length: l}, nil
@@ -67,7 +77,17 @@ func (p *frameParser) ParseNext() (frame, error) {
case 0x3: // CANCEL_PUSH
case 0x5: // PUSH_PROMISE
case 0x7:
return parseGoAwayFrame(qr, l)
f, err := parseGoAwayFrame(qr, l)
if err != nil {
return nil, err
}
if qlogger != nil {
qlogger.RecordEvent(qlog.FrameParsed{
StreamID: p.streamID,
Frame: qlog.Frame{Frame: qlog.GoAwayFrame{StreamID: f.StreamID}},
})
}
return f, nil
case 0xd: // MAX_PUSH_ID
case 0x2, 0x6, 0x8, 0x9:
p.closeConn(quic.ApplicationErrorCode(ErrCodeFrameUnexpected), "")

View File

@@ -21,7 +21,7 @@ func testFrameParserEOF(t *testing.T, data []byte) {
b := make([]byte, i)
copy(b, data[:i])
fp := frameParser{r: bytes.NewReader(b)}
_, err := fp.ParseNext()
_, err := fp.ParseNext(nil)
require.Error(t, err)
require.ErrorIs(t, err, io.EOF)
}
@@ -40,7 +40,7 @@ func TestParserReservedFrameType(t *testing.T) {
r: bytes.NewReader(data),
closeConn: client.CloseWithError,
}
_, err := fp.ParseNext()
_, err := fp.ParseNext(nil)
require.Error(t, err)
require.ErrorContains(t, err, "http3: reserved frame type")
@@ -70,7 +70,7 @@ func TestParserUnknownFrameType(t *testing.T) {
r := bytes.NewReader(data)
fp := frameParser{r: r}
f, err := fp.ParseNext()
f, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &headersFrame{}, f)
hf = f.(*headersFrame)
@@ -90,14 +90,14 @@ func TestParserHeadersFrame(t *testing.T) {
testFrameParserEOF(t, data)
// parse
f1, err := fp.ParseNext()
f1, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &headersFrame{}, f1)
require.Equal(t, uint64(0x1337), f1.(*headersFrame).Length)
// write and parse
fp = frameParser{r: bytes.NewReader(f1.(*headersFrame).Append(nil))}
f2, err := fp.ParseNext()
f2, err := fp.ParseNext(nil)
require.NoError(t, err)
require.Equal(t, f1, f2)
}
@@ -111,14 +111,14 @@ func TestDataFrame(t *testing.T) {
testFrameParserEOF(t, data)
// parse
f1, err := fp.ParseNext()
f1, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &dataFrame{}, f1)
require.Equal(t, uint64(0x1337), f1.(*dataFrame).Length)
// write and parse
fp = frameParser{r: bytes.NewReader(f1.(*dataFrame).Append(nil))}
f2, err := fp.ParseNext()
f2, err := fp.ParseNext(nil)
require.NoError(t, err)
require.Equal(t, f1, f2)
}
@@ -140,7 +140,7 @@ func TestParserSettingsFrame(t *testing.T) {
testFrameParserEOF(t, data)
fp := frameParser{r: bytes.NewReader(data)}
frame, err := fp.ParseNext()
frame, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &settingsFrame{}, frame)
sf := frame.(*settingsFrame)
@@ -150,7 +150,7 @@ func TestParserSettingsFrame(t *testing.T) {
// write and parse
fp = frameParser{r: bytes.NewReader(sf.Append(nil))}
f2, err := fp.ParseNext()
f2, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &settingsFrame{}, f2)
sf2 := f2.(*settingsFrame)
@@ -187,7 +187,7 @@ func TestParserSettingsFrameDuplicateSettings(t *testing.T) {
data = quicvarint.Append(data, uint64(len(settings)))
data = append(data, settings...)
fp := frameParser{r: bytes.NewReader(data)}
_, err := fp.ParseNext()
_, err := fp.ParseNext(nil)
require.Error(t, err)
require.EqualError(t, err, fmt.Sprintf("duplicate setting: %d", tc.num))
})
@@ -216,14 +216,14 @@ func testParserSettingsFrameDatagram(t *testing.T, enabled bool) {
data = append(data, settings...)
fp := frameParser{r: bytes.NewReader(data)}
f, err := fp.ParseNext()
f, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &settingsFrame{}, f)
sf := f.(*settingsFrame)
require.Equal(t, enabled, sf.Datagram)
fp = frameParser{r: bytes.NewReader(sf.Append(nil))}
f2, err := fp.ParseNext()
f2, err := fp.ParseNext(nil)
require.NoError(t, err)
require.Equal(t, sf, f2)
}
@@ -235,7 +235,7 @@ func TestParserSettingsFrameDatagramInvalidValue(t *testing.T) {
data = quicvarint.Append(data, uint64(len(settings)))
data = append(data, settings...)
fp := frameParser{r: bytes.NewReader(data)}
_, err := fp.ParseNext()
_, err := fp.ParseNext(nil)
require.EqualError(t, err, "invalid value for SETTINGS_H3_DATAGRAM: 1337")
}
@@ -261,14 +261,14 @@ func testParserSettingsFrameExtendedConnect(t *testing.T, enabled bool) {
data = append(data, settings...)
fp := frameParser{r: bytes.NewReader(data)}
f, err := fp.ParseNext()
f, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &settingsFrame{}, f)
sf := f.(*settingsFrame)
require.Equal(t, enabled, sf.ExtendedConnect)
fp = frameParser{r: bytes.NewReader(sf.Append(nil))}
f2, err := fp.ParseNext()
f2, err := fp.ParseNext(nil)
require.NoError(t, err)
require.Equal(t, sf, f2)
}
@@ -280,7 +280,7 @@ func TestParserSettingsFrameExtendedConnectInvalidValue(t *testing.T) {
data = quicvarint.Append(data, uint64(len(settings)))
data = append(data, settings...)
fp := frameParser{r: bytes.NewReader(data)}
_, err := fp.ParseNext()
_, err := fp.ParseNext(nil)
require.EqualError(t, err, "invalid value for SETTINGS_ENABLE_CONNECT_PROTOCOL: 1337")
}
@@ -293,14 +293,14 @@ func TestParserGoAwayFrame(t *testing.T) {
testFrameParserEOF(t, data)
fp := frameParser{r: bytes.NewReader(data)}
f, err := fp.ParseNext()
f, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &goAwayFrame{}, f)
require.Equal(t, quic.StreamID(100), f.(*goAwayFrame).StreamID)
// write and parse
fp = frameParser{r: bytes.NewReader(f.(*goAwayFrame).Append(nil))}
f2, err := fp.ParseNext()
f2, err := fp.ParseNext(nil)
require.NoError(t, err)
require.Equal(t, f, f2)
}
@@ -344,7 +344,7 @@ func testParserHijacking(t *testing.T, hijack bool) {
return true, nil
},
}
f, err := fp.ParseNext()
f, err := fp.ParseNext(nil)
require.True(t, called)
if hijack {
require.ErrorIs(t, err, errHijacked)
@@ -375,7 +375,7 @@ func TestParserHijackError(t *testing.T) {
return true, nil
},
}
_, err := fp.ParseNext()
_, err := fp.ParseNext(nil)
require.ErrorIs(t, err, errHijacked)
require.True(t, called)
}

View File

@@ -292,7 +292,7 @@ func decodeHeader(t *testing.T, r io.Reader) map[string][]string {
fields := make(map[string][]string)
decoder := qpack.NewDecoder(nil)
frame, err := (&frameParser{r: r}).ParseNext()
frame, err := (&frameParser{r: r}).ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &headersFrame{}, frame)
headersFrame := frame.(*headersFrame)

View File

@@ -21,7 +21,7 @@ func decodeRequest(t *testing.T, str io.Reader, streamID quic.StreamID, eventRec
r := io.LimitedReader{R: str, N: 1000}
fp := frameParser{r: &r}
frame, err := fp.ParseNext()
frame, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &headersFrame{}, frame)
headersFrame := frame.(*headersFrame)

View File

@@ -31,7 +31,7 @@ func (rw *testResponseWriter) DecodeHeaders(t *testing.T, idx int) map[string][]
decoder := qpack.NewDecoder(nil)
startLen := rw.buf.Len()
frame, err := (&frameParser{r: rw.buf}).ParseNext()
frame, err := (&frameParser{r: rw.buf}).ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &headersFrame{}, frame)
payloadLen := frame.(*headersFrame).Length
@@ -65,7 +65,7 @@ func (rw *testResponseWriter) DecodeHeaders(t *testing.T, idx int) map[string][]
func (rw *testResponseWriter) DecodeBody(t *testing.T) []byte {
t.Helper()
frame, err := (&frameParser{r: rw.buf}).ParseNext()
frame, err := (&frameParser{r: rw.buf}).ParseNext(nil)
if err == io.EOF {
return nil
}

View File

@@ -580,7 +580,7 @@ func (s *Server) handleRequest(
}
}
fp := &frameParser{closeConn: conn.CloseWithError, r: str, unknownFrameHandler: ufh}
frame, err := fp.ParseNext()
frame, err := fp.ParseNext(qlogger)
if err != nil {
if !errors.Is(err, errHijacked) {
str.CancelRead(quic.StreamErrorCode(ErrCodeRequestIncomplete))

View File

@@ -102,7 +102,7 @@ func testServerSettings(t *testing.T, enableDatagrams bool, other map[uint64]uin
require.NoError(t, err)
require.EqualValues(t, streamTypeControlStream, typ)
fp := (&frameParser{r: bytes.NewReader(b[l:])})
f, err := fp.ParseNext()
f, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &settingsFrame{}, f)
settingsFrame := f.(*settingsFrame)
@@ -214,7 +214,7 @@ func testServerRequestHandling(t *testing.T,
fp := frameParser{r: str}
var content []byte
for {
frame, err := fp.ParseNext()
frame, err := fp.ParseNext(nil)
if err == io.EOF {
break
}
@@ -489,7 +489,7 @@ func TestServerHTTPStreamHijacking(t *testing.T) {
hfs := decodeHeader(t, r)
require.Equal(t, hfs[":status"], []string{"200"})
fp := frameParser{r: r}
frame, err := fp.ParseNext()
frame, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &dataFrame{}, frame)
dataFrame := frame.(*dataFrame)
@@ -822,7 +822,7 @@ func TestServerGracefulShutdown(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, streamTypeControlStream, typ)
fp := &frameParser{r: controlStr}
f, err := fp.ParseNext()
f, err := fp.ParseNext(nil)
require.NoError(t, err)
require.IsType(t, &settingsFrame{}, f)
@@ -832,7 +832,7 @@ func TestServerGracefulShutdown(t *testing.T) {
errChan <- s.Shutdown(shutdownCtx)
}()
f, err = fp.ParseNext()
f, err = fp.ParseNext(nil)
require.NoError(t, err)
require.Equal(t, &goAwayFrame{StreamID: 4}, f)

View File

@@ -63,33 +63,23 @@ func newStream(
qlogger: qlogger,
parseTrailer: parseTrailer,
frameParser: &frameParser{
closeConn: conn.CloseWithError,
r: &tracingReader{Reader: str, trace: trace},
streamID: str.StreamID(),
closeConn: conn.CloseWithError,
},
}
}
func (s *Stream) Read(b []byte) (int, error) {
fp := &frameParser{
r: s.datagramStream,
closeConn: s.conn.CloseWithError,
}
if s.bytesRemainingInFrame == 0 {
parseLoop:
for {
frame, err := fp.ParseNext()
frame, err := s.frameParser.ParseNext(s.qlogger)
if err != nil {
return 0, err
}
switch f := frame.(type) {
case *dataFrame:
if s.qlogger != nil {
s.qlogger.RecordEvent(qlog.FrameParsed{
StreamID: s.StreamID(),
Raw: qlog.RawInfo{PayloadLength: int(f.Length)},
Frame: qlog.Frame{Frame: qlog.DataFrame{}},
})
}
if s.parsedTrailer {
return 0, errors.New("DATA frame received after trailers")
}
@@ -328,7 +318,7 @@ func (s *RequestStream) ReadResponse() (*http.Response, error) {
if !s.sentRequest {
return nil, errors.New("http3: invalid duplicate use of RequestStream.ReadResponse before SendRequestHeader")
}
frame, err := s.str.frameParser.ParseNext()
frame, err := s.str.frameParser.ParseNext(s.str.qlogger)
if err != nil {
s.str.CancelRead(quic.StreamErrorCode(ErrCodeFrameError))
s.str.CancelWrite(quic.StreamErrorCode(ErrCodeFrameError))

View File

@@ -108,6 +108,7 @@ func TestStreamInvalidFrame(t *testing.T) {
mockCtrl := gomock.NewController(t)
qstr := NewMockDatagramStream(mockCtrl)
qstr.EXPECT().StreamID().Return(quic.StreamID(42)).AnyTimes()
qstr.EXPECT().Write(gomock.Any()).DoAndReturn(buf.Write).AnyTimes()
qstr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
clientConn, serverConn := newConnPair(t)
@@ -146,7 +147,7 @@ func TestStreamWrite(t *testing.T) {
startLen := buf.Len()
fp := frameParser{r: &buf}
f, err := fp.ParseNext()
f, err := fp.ParseNext(nil)
require.NoError(t, err)
f1Len := startLen - buf.Len()
require.Equal(t, &dataFrame{Length: 3}, f)
@@ -157,7 +158,7 @@ func TestStreamWrite(t *testing.T) {
startLen = buf.Len()
fp = frameParser{r: &buf}
f, err = fp.ParseNext()
f, err = fp.ParseNext(nil)
require.NoError(t, err)
f2Len := startLen - buf.Len()
require.Equal(t, &dataFrame{Length: 6}, f)