qlogwriter: pass the event time to Event.Encode (#5362)

This is needed for events such as recovery:loss_timer_updated, which
contain the timer expiration timestamp encoded as a difference from the
event time.
This commit is contained in:
Marten Seemann
2025-10-09 13:57:14 +08:00
committed by GitHub
parent c26e86c547
commit e0f9663be4
6 changed files with 173 additions and 225 deletions

View File

@@ -26,32 +26,6 @@ func (h *encoderHelper) WriteToken(t jsontext.Token) {
h.err = h.enc.WriteToken(t)
}
type eventDetails interface {
Name() string
Encode(*jsontext.Encoder) error
}
type event struct {
EventTime time.Time
RelativeTime time.Duration
eventDetails
}
func (e event) Encode(enc *jsontext.Encoder) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("time"))
h.WriteToken(jsontext.Float(milliseconds(e.RelativeTime)))
h.WriteToken(jsontext.String("name"))
h.WriteToken(jsontext.String(e.Name()))
h.WriteToken(jsontext.String("data"))
if err := e.eventDetails.Encode(enc); err != nil {
return err
}
h.WriteToken(jsontext.EndObject)
return h.err
}
type versions []Version
func (v versions) Encode(enc *jsontext.Encoder) error {
@@ -91,7 +65,7 @@ type StartedConnection struct {
func (e StartedConnection) Name() string { return "transport:connection_started" }
func (e StartedConnection) Encode(enc *jsontext.Encoder) error {
func (e StartedConnection) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
if e.SrcAddr.IP.To4() != nil {
@@ -124,7 +98,7 @@ type VersionInformation struct {
func (e VersionInformation) Name() string { return "transport:version_information" }
func (e VersionInformation) Encode(enc *jsontext.Encoder) error {
func (e VersionInformation) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
if len(e.ClientVersions) > 0 {
@@ -151,7 +125,7 @@ type ConnectionClosed struct {
func (e ConnectionClosed) Name() string { return "transport:connection_closed" }
func (e ConnectionClosed) Encode(enc *jsontext.Encoder) error {
func (e ConnectionClosed) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
var (
@@ -220,7 +194,7 @@ type PacketSent struct {
func (e PacketSent) Name() string { return "transport:packet_sent" }
func (e PacketSent) Encode(enc *jsontext.Encoder) error {
func (e PacketSent) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("header"))
@@ -264,7 +238,7 @@ type PacketReceived struct {
func (e PacketReceived) Name() string { return "transport:packet_received" }
func (e PacketReceived) Encode(enc *jsontext.Encoder) error {
func (e PacketReceived) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("header"))
@@ -304,7 +278,7 @@ type VersionNegotiationReceived struct {
func (e VersionNegotiationReceived) Name() string { return "transport:packet_received" }
func (e VersionNegotiationReceived) Encode(enc *jsontext.Encoder) error {
func (e VersionNegotiationReceived) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("header"))
@@ -326,7 +300,7 @@ type VersionNegotiationSent struct {
func (e VersionNegotiationSent) Name() string { return "transport:packet_sent" }
func (e VersionNegotiationSent) Encode(enc *jsontext.Encoder) error {
func (e VersionNegotiationSent) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("header"))
@@ -348,7 +322,7 @@ type PacketBuffered struct {
func (e PacketBuffered) Name() string { return "transport:packet_buffered" }
func (e PacketBuffered) Encode(enc *jsontext.Encoder) error {
func (e PacketBuffered) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("header"))
@@ -374,7 +348,7 @@ type PacketDropped struct {
func (e PacketDropped) Name() string { return "transport:packet_dropped" }
func (e PacketDropped) Encode(enc *jsontext.Encoder) error {
func (e PacketDropped) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("header"))
@@ -398,7 +372,7 @@ type MTUUpdated struct {
func (e MTUUpdated) Name() string { return "recovery:mtu_updated" }
func (e MTUUpdated) Encode(enc *jsontext.Encoder) error {
func (e MTUUpdated) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("mtu"))
@@ -422,7 +396,7 @@ type MetricsUpdated struct {
func (e MetricsUpdated) Name() string { return "recovery:metrics_updated" }
func (e MetricsUpdated) Encode(enc *jsontext.Encoder) error {
func (e MetricsUpdated) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
if e.MinRTT != nil {
@@ -468,7 +442,7 @@ type PacketLost struct {
func (e PacketLost) Name() string { return "recovery:packet_lost" }
func (e PacketLost) Encode(enc *jsontext.Encoder) error {
func (e PacketLost) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("header"))
@@ -490,7 +464,7 @@ type SpuriousLoss struct {
func (e SpuriousLoss) Name() string { return "recovery:spurious_loss" }
func (e SpuriousLoss) Encode(enc *jsontext.Encoder) error {
func (e SpuriousLoss) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("packet_number_space"))
@@ -514,7 +488,7 @@ type KeyUpdated struct {
func (e KeyUpdated) Name() string { return "security:key_updated" }
func (e KeyUpdated) Encode(enc *jsontext.Encoder) error {
func (e KeyUpdated) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("trigger"))
@@ -536,7 +510,7 @@ type KeyDiscarded struct {
func (e KeyDiscarded) Name() string { return "security:key_discarded" }
func (e KeyDiscarded) Encode(enc *jsontext.Encoder) error {
func (e KeyDiscarded) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
if e.KeyType != KeyTypeClient1RTT && e.KeyType != KeyTypeServer1RTT {
@@ -585,7 +559,7 @@ func (e ParametersSet) Name() string {
return "transport:parameters_set"
}
func (e ParametersSet) Encode(enc *jsontext.Encoder) error {
func (e ParametersSet) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
if !e.Restore {
@@ -708,7 +682,7 @@ type LossTimerUpdated struct {
func (e LossTimerUpdated) Name() string { return "recovery:loss_timer_updated" }
func (e LossTimerUpdated) Encode(enc *jsontext.Encoder) error {
func (e LossTimerUpdated) Encode(enc *jsontext.Encoder, t time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("event_type"))
@@ -719,7 +693,7 @@ func (e LossTimerUpdated) Encode(enc *jsontext.Encoder) error {
h.WriteToken(jsontext.String(encLevelToPacketNumberSpace(e.EncLevel)))
if e.Type == LossTimerUpdateTypeSet {
h.WriteToken(jsontext.String("delta"))
h.WriteToken(jsontext.Float(milliseconds(time.Until(e.Time))))
h.WriteToken(jsontext.Float(milliseconds(e.Time.Sub(t))))
}
h.WriteToken(jsontext.EndObject)
return h.err
@@ -729,7 +703,7 @@ type eventLossTimerCanceled struct{}
func (e eventLossTimerCanceled) Name() string { return "recovery:loss_timer_updated" }
func (e eventLossTimerCanceled) Encode(enc *jsontext.Encoder) error {
func (e eventLossTimerCanceled) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("event_type"))
@@ -744,7 +718,7 @@ type CongestionStateUpdated struct {
func (e CongestionStateUpdated) Name() string { return "recovery:congestion_state_updated" }
func (e CongestionStateUpdated) Encode(enc *jsontext.Encoder) error {
func (e CongestionStateUpdated) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("new"))
@@ -760,7 +734,7 @@ type ECNStateUpdated struct {
func (e ECNStateUpdated) Name() string { return "recovery:ecn_state_updated" }
func (e ECNStateUpdated) Encode(enc *jsontext.Encoder) error {
func (e ECNStateUpdated) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("new"))
@@ -779,7 +753,7 @@ type ALPNInformation struct {
func (e ALPNInformation) Name() string { return "transport:alpn_information" }
func (e ALPNInformation) Encode(enc *jsontext.Encoder) error {
func (e ALPNInformation) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("chosen_alpn"))

View File

@@ -10,44 +10,55 @@ import (
"github.com/quic-go/quic-go/internal/protocol"
"github.com/quic-go/quic-go/internal/qerr"
"github.com/quic-go/quic-go/internal/synctest"
"github.com/quic-go/quic-go/internal/utils"
"github.com/quic-go/quic-go/internal/wire"
"github.com/quic-go/quic-go/qlogwriter/jsontext"
"github.com/quic-go/quic-go/qlogwriter"
"github.com/stretchr/testify/require"
)
// Helper function for direct event testing
func testEventEncoding(t *testing.T, eventDetails eventDetails) (string, map[string]any, time.Duration) {
return testEventEncodingWithTime(t, eventDetails, time.Now(), 42*time.Second)
func testEventEncoding(t *testing.T, ev qlogwriter.Event) (string, map[string]any) {
t.Helper()
var buf bytes.Buffer
synctest.Test(t, func(t *testing.T) {
tr := qlogwriter.NewConnectionFileSeq(
nopWriteCloser(&buf),
true,
protocol.ParseConnectionID([]byte{1, 2, 3, 4}),
[]string{EventSchema},
)
go tr.Run()
producer := tr.AddProducer()
synctest.Wait()
time.Sleep(42 * time.Second)
producer.RecordEvent(ev)
producer.Close()
})
return decode(t, buf.String())
}
// Helper function for direct event testing with custom time
func testEventEncodingWithTime(t *testing.T, eventDetails eventDetails, eventTime time.Time, relativeTime time.Duration) (string, map[string]any, time.Duration) {
func decode(t *testing.T, data string) (string, map[string]any) {
t.Helper()
event := event{
EventTime: eventTime,
RelativeTime: relativeTime,
eventDetails: eventDetails,
}
var buf bytes.Buffer
enc := jsontext.NewEncoder(&buf)
require.NoError(t, event.Encode(enc))
var result map[string]any
require.NoError(t, json.Unmarshal(buf.Bytes(), &result))
// Extract the event data
eventData := result["data"].(map[string]any)
encodedRelativeTime := time.Duration(result["time"].(float64)*1e6) * time.Nanosecond
lines := bytes.Split([]byte(data), []byte{'\n'})
require.Len(t, lines, 3) // the first line is the trace header, the second line is the event, the third line is empty
require.Empty(t, lines[2])
require.Equal(t, qlogwriter.RecordSeparator, lines[1][0], "expected record separator at start of line")
require.NoError(t, json.Unmarshal(lines[1][1:], &result))
require.Equal(t, 42*time.Second, time.Duration(result["time"].(float64)*1e6)*time.Nanosecond)
return result["name"].(string), eventData, encodedRelativeTime
return result["name"].(string), result["data"].(map[string]any)
}
func TestStartedConnection(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &StartedConnection{
name, ev := testEventEncoding(t, &StartedConnection{
SrcAddr: &net.UDPAddr{IP: net.IPv4(192, 168, 13, 37), Port: 42},
DestAddr: &net.UDPAddr{IP: net.IPv4(192, 168, 12, 34), Port: 24},
SrcConnectionID: protocol.ParseConnectionID([]byte{1, 2, 3, 4}),
@@ -55,7 +66,6 @@ func TestStartedConnection(t *testing.T) {
})
require.Equal(t, "transport:connection_started", name)
require.Equal(t, 42*time.Second, relativeTime) // Should be 42 seconds since we set RelativeTime to 42s
require.Equal(t, "ipv4", ev["ip_version"])
require.Equal(t, "192.168.13.37", ev["src_ip"])
require.Equal(t, float64(42), ev["src_port"])
@@ -66,23 +76,21 @@ func TestStartedConnection(t *testing.T) {
}
func TestVersionInformation(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &VersionInformation{ChosenVersion: 0x1337})
name, ev := testEventEncoding(t, &VersionInformation{ChosenVersion: 0x1337})
require.Equal(t, "transport:version_information", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 1)
require.Equal(t, "1337", ev["chosen_version"])
}
func TestVersionInformationWithNegotiation(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &VersionInformation{
name, ev := testEventEncoding(t, &VersionInformation{
ChosenVersion: 0x1337,
ClientVersions: []Version{1, 2, 3},
ServerVersions: []Version{4, 5, 6},
})
require.Equal(t, "transport:version_information", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 3)
require.Equal(t, "1337", ev["chosen_version"])
require.Equal(t, []any{"1", "2", "3"}, ev["client_versions"])
@@ -90,54 +98,50 @@ func TestVersionInformationWithNegotiation(t *testing.T) {
}
func TestIdleTimeouts(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ConnectionClosed{
name, ev := testEventEncoding(t, &ConnectionClosed{
Error: &qerr.IdleTimeoutError{},
})
require.Equal(t, "transport:connection_closed", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 2)
require.Equal(t, "local", ev["owner"])
require.Equal(t, "idle_timeout", ev["trigger"])
}
func TestHandshakeTimeouts(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ConnectionClosed{
name, ev := testEventEncoding(t, &ConnectionClosed{
Error: &qerr.HandshakeTimeoutError{},
})
require.Equal(t, "transport:connection_closed", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 2)
require.Equal(t, "local", ev["owner"])
require.Equal(t, "handshake_timeout", ev["trigger"])
}
func TestReceivedStatelessResetPacket(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ConnectionClosed{
name, ev := testEventEncoding(t, &ConnectionClosed{
Error: &qerr.StatelessResetError{},
})
require.Equal(t, "transport:connection_closed", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 2)
require.Equal(t, "remote", ev["owner"])
require.Equal(t, "stateless_reset", ev["trigger"])
}
func TestVersionNegotiationFailure(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ConnectionClosed{
name, ev := testEventEncoding(t, &ConnectionClosed{
Error: &qerr.VersionNegotiationError{},
})
require.Equal(t, "transport:connection_closed", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 1)
require.Equal(t, "version_mismatch", ev["trigger"])
}
func TestApplicationErrors(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ConnectionClosed{
name, ev := testEventEncoding(t, &ConnectionClosed{
Error: &qerr.ApplicationError{
Remote: true,
ErrorCode: 1337,
@@ -146,7 +150,6 @@ func TestApplicationErrors(t *testing.T) {
})
require.Equal(t, "transport:connection_closed", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 3)
require.Equal(t, "remote", ev["owner"])
require.Equal(t, float64(1337), ev["application_code"])
@@ -154,7 +157,7 @@ func TestApplicationErrors(t *testing.T) {
}
func TestTransportErrors(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ConnectionClosed{
name, ev := testEventEncoding(t, &ConnectionClosed{
Error: &qerr.TransportError{
ErrorCode: qerr.AEADLimitReached,
ErrorMessage: "foobar",
@@ -162,7 +165,6 @@ func TestTransportErrors(t *testing.T) {
})
require.Equal(t, "transport:connection_closed", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 3)
require.Equal(t, "local", ev["owner"])
require.Equal(t, "aead_limit_reached", ev["connection_code"])
@@ -171,7 +173,7 @@ func TestTransportErrors(t *testing.T) {
func TestSentTransportParameters(t *testing.T) {
rcid := protocol.ParseConnectionID([]byte{0xde, 0xca, 0xfb, 0xad})
name, ev, relativeTime := testEventEncoding(t, &ParametersSet{
name, ev := testEventEncoding(t, &ParametersSet{
Owner: OwnerLocal,
SentBy: protocol.PerspectiveServer,
OriginalDestinationConnectionID: protocol.ParseConnectionID([]byte{0xde, 0xad, 0xc0, 0xde}),
@@ -195,7 +197,6 @@ func TestSentTransportParameters(t *testing.T) {
})
require.Equal(t, "transport:parameters_set", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, "local", ev["owner"])
require.Equal(t, "deadc0de", ev["original_destination_connection_id"])
require.Equal(t, "deadbeef", ev["initial_source_connection_id"])
@@ -217,7 +218,7 @@ func TestSentTransportParameters(t *testing.T) {
}
func TestServerTransportParametersWithoutStatelessResetToken(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ParametersSet{
name, ev := testEventEncoding(t, &ParametersSet{
Owner: OwnerLocal,
SentBy: protocol.PerspectiveServer,
OriginalDestinationConnectionID: protocol.ParseConnectionID([]byte{0xde, 0xad, 0xc0, 0xde}),
@@ -225,19 +226,17 @@ func TestServerTransportParametersWithoutStatelessResetToken(t *testing.T) {
})
require.Equal(t, "transport:parameters_set", name)
require.Equal(t, 42*time.Second, relativeTime)
require.NotContains(t, ev, "stateless_reset_token")
}
func TestTransportParametersWithoutRetrySourceConnectionID(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ParametersSet{
name, ev := testEventEncoding(t, &ParametersSet{
Owner: OwnerLocal,
SentBy: protocol.PerspectiveServer,
StatelessResetToken: &protocol.StatelessResetToken{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00},
})
require.Equal(t, "transport:parameters_set", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, "local", ev["owner"])
require.NotContains(t, ev, "retry_source_connection_id")
}
@@ -267,14 +266,13 @@ func testTransportParametersWithPreferredAddress(t *testing.T, hasIPv4, hasIPv6
if hasIPv6 {
preferredAddress.IPv6 = addr6
}
name, ev, relativeTime := testEventEncoding(t, &ParametersSet{
name, ev := testEventEncoding(t, &ParametersSet{
Owner: OwnerLocal,
SentBy: protocol.PerspectiveServer,
PreferredAddress: preferredAddress,
})
require.Equal(t, "transport:parameters_set", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, "local", ev["owner"])
require.Contains(t, ev, "preferred_address")
pa := ev["preferred_address"].(map[string]any)
@@ -297,31 +295,29 @@ func testTransportParametersWithPreferredAddress(t *testing.T, hasIPv4, hasIPv6
}
func TestTransportParametersWithDatagramExtension(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ParametersSet{
name, ev := testEventEncoding(t, &ParametersSet{
Owner: OwnerLocal,
SentBy: protocol.PerspectiveServer,
MaxDatagramFrameSize: 1337,
})
require.Equal(t, "transport:parameters_set", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, float64(1337), ev["max_datagram_frame_size"])
}
func TestReceivedTransportParameters(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ParametersSet{
name, ev := testEventEncoding(t, &ParametersSet{
Owner: OwnerRemote,
SentBy: protocol.PerspectiveClient,
})
require.Equal(t, "transport:parameters_set", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, "remote", ev["owner"])
require.NotContains(t, ev, "original_destination_connection_id")
}
func TestRestoredTransportParameters(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ParametersSet{
name, ev := testEventEncoding(t, &ParametersSet{
Restore: true,
InitialMaxStreamDataBidiLocal: 100,
InitialMaxStreamDataBidiRemote: 200,
@@ -331,7 +327,6 @@ func TestRestoredTransportParameters(t *testing.T) {
})
require.Equal(t, "transport:parameters_restored", name)
require.Equal(t, 42*time.Second, relativeTime)
require.NotContains(t, ev, "owner")
require.NotContains(t, ev, "original_destination_connection_id")
require.NotContains(t, ev, "stateless_reset_token")
@@ -345,7 +340,7 @@ func TestRestoredTransportParameters(t *testing.T) {
}
func TestPacketSent(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &PacketSent{
name, ev := testEventEncoding(t, &PacketSent{
Header: PacketHeader{
PacketType: PacketTypeHandshake,
PacketNumber: 1337,
@@ -362,7 +357,6 @@ func TestPacketSent(t *testing.T) {
})
require.Equal(t, "transport:packet_sent", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Contains(t, ev, "raw")
raw := ev["raw"].(map[string]any)
require.Equal(t, float64(987), raw["length"])
@@ -381,7 +375,7 @@ func TestPacketSent(t *testing.T) {
}
func TestPacketSentShort(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &PacketSent{
name, ev := testEventEncoding(t, &PacketSent{
Header: PacketHeader{
PacketType: PacketType1RTT,
PacketNumber: 1337,
@@ -397,7 +391,6 @@ func TestPacketSentShort(t *testing.T) {
})
require.Equal(t, "transport:packet_sent", name)
require.Equal(t, 42*time.Second, relativeTime)
raw := ev["raw"].(map[string]any)
require.Equal(t, float64(123), raw["length"])
require.NotContains(t, raw, "payload_length")
@@ -414,7 +407,7 @@ func TestPacketSentShort(t *testing.T) {
}
func TestPacketReceived(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &PacketReceived{
name, ev := testEventEncoding(t, &PacketReceived{
Header: PacketHeader{
PacketType: PacketTypeInitial,
PacketNumber: 1337,
@@ -435,7 +428,6 @@ func TestPacketReceived(t *testing.T) {
})
require.Equal(t, "transport:packet_received", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Contains(t, ev, "raw")
raw := ev["raw"].(map[string]any)
require.Equal(t, float64(789), raw["length"])
@@ -454,7 +446,7 @@ func TestPacketReceived(t *testing.T) {
}
func TestPacketReceived1RTT(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &PacketReceived{
name, ev := testEventEncoding(t, &PacketReceived{
Header: PacketHeader{
PacketType: PacketType1RTT,
PacketNumber: 1337,
@@ -470,7 +462,6 @@ func TestPacketReceived1RTT(t *testing.T) {
})
require.Equal(t, "transport:packet_received", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Contains(t, ev, "raw")
raw := ev["raw"].(map[string]any)
require.Equal(t, float64(789), raw["length"])
@@ -486,7 +477,7 @@ func TestPacketReceived1RTT(t *testing.T) {
}
func TestPacketReceivedRetry(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &PacketReceived{
name, ev := testEventEncoding(t, &PacketReceived{
Header: PacketHeader{
PacketType: PacketTypeRetry,
Version: protocol.Version1,
@@ -498,7 +489,6 @@ func TestPacketReceivedRetry(t *testing.T) {
})
require.Equal(t, "transport:packet_received", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Contains(t, ev, "raw")
raw := ev["raw"].(map[string]any)
require.Len(t, raw, 1)
@@ -517,7 +507,7 @@ func TestPacketReceivedRetry(t *testing.T) {
}
func TestVersionNegotiationReceived(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &VersionNegotiationReceived{
name, ev := testEventEncoding(t, &VersionNegotiationReceived{
Header: PacketHeaderVersionNegotiation{
SrcConnectionID: ArbitraryLenConnectionID{4, 3, 2, 1},
DestConnectionID: ArbitraryLenConnectionID{1, 2, 3, 4, 5, 6, 7, 8},
@@ -526,7 +516,6 @@ func TestVersionNegotiationReceived(t *testing.T) {
})
require.Equal(t, "transport:packet_received", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Contains(t, ev, "header")
require.NotContains(t, ev, "frames")
require.Contains(t, ev, "supported_versions")
@@ -540,7 +529,7 @@ func TestVersionNegotiationReceived(t *testing.T) {
}
func TestPacketBuffered(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &PacketBuffered{
name, ev := testEventEncoding(t, &PacketBuffered{
Header: PacketHeader{
PacketType: PacketTypeHandshake,
PacketNumber: protocol.InvalidPacketNumber,
@@ -551,7 +540,6 @@ func TestPacketBuffered(t *testing.T) {
})
require.Equal(t, "transport:packet_buffered", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Contains(t, ev, "header")
require.Contains(t, ev, "raw")
require.Equal(t, float64(1337), ev["raw"].(map[string]any)["length"])
@@ -560,14 +548,13 @@ func TestPacketBuffered(t *testing.T) {
}
func TestPacketDropped(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &PacketDropped{
name, ev := testEventEncoding(t, &PacketDropped{
Header: PacketHeader{PacketType: PacketTypeRetry},
Raw: RawInfo{Length: 1337},
Trigger: PacketDropPayloadDecryptError,
})
require.Equal(t, "transport:packet_dropped", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Contains(t, ev, "raw")
require.Equal(t, float64(1337), ev["raw"].(map[string]any)["length"])
require.Contains(t, ev, "header")
@@ -586,7 +573,7 @@ func TestMetricsUpdated(t *testing.T) {
cwndInt := 4321
bytesInt := 1234
packetsInt := 42
name, ev, relativeTime := testEventEncoding(t, &MetricsUpdated{
name, ev := testEventEncoding(t, &MetricsUpdated{
MinRTT: &minRTT,
SmoothedRTT: &smoothedRTT,
LatestRTT: &latestRTT,
@@ -597,7 +584,6 @@ func TestMetricsUpdated(t *testing.T) {
})
require.Equal(t, "recovery:metrics_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, float64(15), ev["min_rtt"])
require.Equal(t, float64(25), ev["latest_rtt"])
require.Contains(t, ev, "smoothed_rtt")
@@ -610,19 +596,18 @@ func TestMetricsUpdated(t *testing.T) {
}
func TestPacketLost(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &PacketLost{
name, ev := testEventEncoding(t, &PacketLost{
Header: PacketHeader{PacketType: PacketTypeHandshake, PacketNumber: 42},
Trigger: PacketLossReorderingThreshold,
})
require.Equal(t, "recovery:packet_lost", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Contains(t, ev, "header")
require.Equal(t, "reordering_threshold", ev["trigger"])
}
func TestSpuriousLoss(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &SpuriousLoss{
name, ev := testEventEncoding(t, &SpuriousLoss{
EncryptionLevel: protocol.Encryption1RTT,
PacketNumber: 42,
PacketReordering: 1,
@@ -630,7 +615,6 @@ func TestSpuriousLoss(t *testing.T) {
})
require.Equal(t, "recovery:spurious_loss", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Contains(t, ev, "packet_number")
require.Equal(t, float64(42), ev["packet_number"])
require.Contains(t, ev, "reordering_packets")
@@ -640,45 +624,41 @@ func TestSpuriousLoss(t *testing.T) {
}
func TestMTUUpdated(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &MTUUpdated{
name, ev := testEventEncoding(t, &MTUUpdated{
Value: 1337,
Done: true,
})
require.Equal(t, "recovery:mtu_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, float64(1337), ev["mtu"])
require.Equal(t, true, ev["done"])
}
func TestCongestionStateUpdated(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &CongestionStateUpdated{
name, ev := testEventEncoding(t, &CongestionStateUpdated{
State: CongestionStateCongestionAvoidance,
})
require.Equal(t, "recovery:congestion_state_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, "congestion_avoidance", ev["new"])
}
func TestMetricsUpdatedPTO(t *testing.T) {
value := uint32(42)
name, ev, relativeTime := testEventEncoding(t, &MetricsUpdated{PTOCount: &value})
name, ev := testEventEncoding(t, &MetricsUpdated{PTOCount: &value})
require.Equal(t, "recovery:metrics_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, float64(42), ev["pto_count"])
}
func TestKeyUpdatedTLS(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &KeyUpdated{
name, ev := testEventEncoding(t, &KeyUpdated{
Trigger: KeyUpdateTLS,
KeyType: KeyTypeClientHandshake,
KeyPhase: 0,
})
require.Equal(t, "security:key_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, "client_handshake_secret", ev["key_type"])
require.Equal(t, "tls", ev["trigger"])
require.NotContains(t, ev, "key_phase")
@@ -687,14 +667,13 @@ func TestKeyUpdatedTLS(t *testing.T) {
}
func TestKeyUpdatedTLS1RTT(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &KeyUpdated{
name, ev := testEventEncoding(t, &KeyUpdated{
Trigger: KeyUpdateTLS,
KeyType: KeyTypeServer1RTT,
KeyPhase: 0,
})
require.Equal(t, "security:key_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, "server_1rtt_secret", ev["key_type"])
require.Equal(t, "tls", ev["trigger"])
require.Equal(t, float64(0), ev["key_phase"])
@@ -703,14 +682,13 @@ func TestKeyUpdatedTLS1RTT(t *testing.T) {
}
func TestKeyUpdated(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &KeyUpdated{
name, ev := testEventEncoding(t, &KeyUpdated{
Trigger: KeyUpdateRemote,
KeyType: KeyTypeClient1RTT,
KeyPhase: 1337,
})
require.Equal(t, "security:key_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, float64(1337), ev["key_phase"])
require.Equal(t, "remote_update", ev["trigger"])
require.Contains(t, ev, "key_type")
@@ -718,25 +696,23 @@ func TestKeyUpdated(t *testing.T) {
}
func TestKeyDiscarded0RTT(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &KeyDiscarded{
name, ev := testEventEncoding(t, &KeyDiscarded{
KeyType: KeyTypeServer0RTT,
KeyPhase: 0,
})
require.Equal(t, "security:key_discarded", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, "tls", ev["trigger"])
require.Equal(t, "server_0rtt_secret", ev["key_type"])
}
func TestKeyDiscarded(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &KeyDiscarded{
name, ev := testEventEncoding(t, &KeyDiscarded{
KeyType: KeyTypeClient1RTT,
KeyPhase: 42,
})
require.Equal(t, "security:key_discarded", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Equal(t, float64(42), ev["key_phase"])
require.NotContains(t, ev, "trigger")
require.Contains(t, ev, "key_type")
@@ -744,35 +720,48 @@ func TestKeyDiscarded(t *testing.T) {
}
func TestLossTimerUpdated(t *testing.T) {
eventTime := time.Now()
timeout := eventTime.Add(137 * time.Millisecond)
name, ev, relativeTime := testEventEncodingWithTime(t, &LossTimerUpdated{
Type: LossTimerUpdateTypeSet,
TimerType: TimerTypePTO,
EncLevel: protocol.EncryptionHandshake,
Time: timeout,
}, eventTime, 42*time.Second)
synctest.Test(t, func(t *testing.T) {
var buf bytes.Buffer
tr := qlogwriter.NewConnectionFileSeq(
nopWriteCloser(&buf),
true,
protocol.ParseConnectionID([]byte{1, 2, 3, 4}),
[]string{EventSchema},
)
go tr.Run()
producer := tr.AddProducer()
require.Equal(t, "recovery:loss_timer_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 4)
require.Equal(t, "set", ev["event_type"])
require.Equal(t, "pto", ev["timer_type"])
require.Equal(t, "handshake", ev["packet_number_space"])
require.Contains(t, ev, "delta")
delta := time.Duration(ev["delta"].(float64)*1e6) * time.Nanosecond
require.WithinDuration(t, timeout, eventTime.Add(delta), time.Millisecond)
synctest.Wait()
time.Sleep(42 * time.Second)
producer.RecordEvent(&LossTimerUpdated{
Type: LossTimerUpdateTypeSet,
TimerType: TimerTypePTO,
EncLevel: protocol.EncryptionHandshake,
Time: time.Now().Add(1337 * time.Second),
})
producer.Close()
name, ev := decode(t, buf.String())
require.Equal(t, "recovery:loss_timer_updated", name)
require.Len(t, ev, 4)
require.Equal(t, "set", ev["event_type"])
require.Equal(t, "pto", ev["timer_type"])
require.Equal(t, "handshake", ev["packet_number_space"])
require.Contains(t, ev, "delta")
delta := time.Duration(ev["delta"].(float64)*1e6) * time.Nanosecond
require.Equal(t, 1337*time.Second, delta)
})
}
func TestLossTimerUpdatedExpired(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &LossTimerUpdated{
name, ev := testEventEncoding(t, &LossTimerUpdated{
Type: LossTimerUpdateTypeExpired,
TimerType: TimerTypeACK,
EncLevel: protocol.Encryption1RTT,
})
require.Equal(t, "recovery:loss_timer_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 3)
require.Equal(t, "expired", ev["event_type"])
require.Equal(t, "ack", ev["timer_type"])
@@ -780,46 +769,42 @@ func TestLossTimerUpdatedExpired(t *testing.T) {
}
func TestLossTimerUpdatedCanceled(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &eventLossTimerCanceled{})
name, ev := testEventEncoding(t, &eventLossTimerCanceled{})
require.Equal(t, "recovery:loss_timer_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 1)
require.Equal(t, "cancelled", ev["event_type"])
}
func TestECNStateUpdated(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ECNStateUpdated{
name, ev := testEventEncoding(t, &ECNStateUpdated{
State: ECNStateUnknown,
Trigger: "",
})
require.Equal(t, "recovery:ecn_state_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 1)
require.Equal(t, "unknown", ev["new"])
}
func TestECNStateUpdatedWithTrigger(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ECNStateUpdated{
name, ev := testEventEncoding(t, &ECNStateUpdated{
State: ECNStateFailed,
Trigger: "ACK doesn't contain ECN marks",
})
require.Equal(t, "recovery:ecn_state_updated", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 2)
require.Equal(t, "failed", ev["new"])
require.Equal(t, "ACK doesn't contain ECN marks", ev["trigger"])
}
func TestALPNInformation(t *testing.T) {
name, ev, relativeTime := testEventEncoding(t, &ALPNInformation{
name, ev := testEventEncoding(t, &ALPNInformation{
ChosenALPN: "h3",
})
require.Equal(t, "transport:alpn_information", name)
require.Equal(t, 42*time.Second, relativeTime)
require.Len(t, ev, 1)
require.Equal(t, "h3", ev["chosen_alpn"])
}

View File

@@ -1,50 +0,0 @@
package qlogwriter
import (
"time"
"github.com/quic-go/quic-go/qlogwriter/jsontext"
)
func milliseconds(dur time.Duration) float64 { return float64(dur.Nanoseconds()) / 1e6 }
// Event represents a qlog event that can be encoded to JSON.
// Each event must provide its name and a method to encode itself using a jsontext.Encoder.
type Event interface {
// Name returns the name of the event, as it should appear in the qlog output
Name() string
// Encode writes the event's data to the provided jsontext.Encoder
Encode(*jsontext.Encoder) error
}
type event struct {
RelativeTime time.Duration
Event
}
type encoderHelper struct {
enc *jsontext.Encoder
err error
}
func (h *encoderHelper) WriteToken(t jsontext.Token) {
if h.err != nil {
return
}
h.err = h.enc.WriteToken(t)
}
func (e *event) Encode(enc *jsontext.Encoder) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("time"))
h.WriteToken(jsontext.Float(milliseconds(e.RelativeTime)))
h.WriteToken(jsontext.String("name"))
h.WriteToken(jsontext.String(e.Name()))
h.WriteToken(jsontext.String("data"))
if err := e.Event.Encode(enc); err != nil {
return err
}
h.WriteToken(jsontext.EndObject)
return h.err
}

View File

@@ -38,6 +38,18 @@ func init() {
}
}
type encoderHelper struct {
enc *jsontext.Encoder
err error
}
func (h *encoderHelper) WriteToken(t jsontext.Token) {
if h.err != nil {
return
}
h.err = h.enc.WriteToken(t)
}
type traceHeader struct {
VantagePointType string
GroupID *ConnectionID

View File

@@ -30,9 +30,26 @@ type Recorder interface {
io.Closer
}
const eventChanSize = 50
// Event represents a qlog event that can be encoded to JSON.
// Each event must provide its name and a method to encode itself using a jsontext.Encoder.
type Event interface {
// Name returns the name of the event, as it should appear in the qlog output
Name() string
// Encode writes the event's data to the provided jsontext.Encoder
Encode(encoder *jsontext.Encoder, eventTime time.Time) error
}
var recordSeparator = []byte{0x1e}
// RecordSeparator is the record separator byte for the JSON-SEQ format
const RecordSeparator byte = 0x1e
var recordSeparator = []byte{RecordSeparator}
type event struct {
Time time.Time
Event Event
}
const eventChanSize = 50
// FileSeq represents a qlog trace using the JSON-SEQ format,
// https://www.ietf.org/archive/id/draft-ietf-quic-qlog-main-schema-12.html#section-5
@@ -116,17 +133,14 @@ func (t *FileSeq) record(eventTime time.Time, details Event) {
}
t.mx.Unlock()
t.events <- event{
RelativeTime: eventTime.Sub(t.referenceTime),
Event: details,
}
t.events <- event{Time: eventTime, Event: details}
}
func (t *FileSeq) Run() {
defer close(t.runStopped)
enc := jsontext.NewEncoder(t.w)
for ev := range t.events {
for e := range t.events {
if t.encodeErr != nil { // if encoding failed, just continue draining the event channel
continue
}
@@ -134,10 +148,22 @@ func (t *FileSeq) Run() {
t.encodeErr = err
continue
}
if err := ev.Encode(enc); err != nil {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("time"))
h.WriteToken(jsontext.Float(float64(e.Time.Sub(t.referenceTime).Nanoseconds()) / 1e6))
h.WriteToken(jsontext.String("name"))
h.WriteToken(jsontext.String(e.Event.Name()))
h.WriteToken(jsontext.String("data"))
if err := e.Event.Encode(enc, e.Time); err != nil {
t.encodeErr = err
continue
}
h.WriteToken(jsontext.EndObject)
if h.err != nil {
t.encodeErr = h.err
}
}
}

View File

@@ -8,6 +8,7 @@ import (
"log"
"os"
"testing"
"time"
"github.com/quic-go/quic-go/qlogwriter/jsontext"
@@ -22,7 +23,7 @@ func (e testEvent) Name() string {
return "transport:test_event"
}
func (e testEvent) Encode(enc *jsontext.Encoder) error {
func (e testEvent) Encode(enc *jsontext.Encoder, _ time.Time) error {
h := encoderHelper{enc: enc}
h.WriteToken(jsontext.BeginObject)
h.WriteToken(jsontext.String("message"))