From 76bdd4a39779e695499f1b2ab6c86a0d924e9d32 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 13 Mar 2020 12:50:48 +0700 Subject: [PATCH] continuously encode qlog events --- interface.go | 1 + qlog/qlog.go | 116 +++++++++++++++++++++++++++++++--------------- qlog/qlog_test.go | 28 +++++++++++ 3 files changed, 107 insertions(+), 38 deletions(-) diff --git a/interface.go b/interface.go index c4abae62..429e1caa 100644 --- a/interface.go +++ b/interface.go @@ -248,6 +248,7 @@ type Config struct { // GetLogWriter is used to pass in a writer for the qlog. // If it is nil, no qlog will be collected and exported. // If it returns nil, no qlog will be collected and exported for the respective connection. + // It is recommended to use a buffered writer here. GetLogWriter func(connectionID []byte) io.WriteCloser } diff --git a/qlog/qlog.go b/qlog/qlog.go index 27eda8d3..e1852772 100644 --- a/qlog/qlog.go +++ b/qlog/qlog.go @@ -1,6 +1,8 @@ package qlog import ( + "bytes" + "fmt" "io" "net" "time" @@ -12,6 +14,8 @@ import ( "github.com/francoispqt/gojay" ) +const eventChanSize = 50 + // A Tracer records events to be exported to a qlog. type Tracer interface { Export() error @@ -36,35 +40,71 @@ type tracer struct { odcid protocol.ConnectionID perspective protocol.Perspective - events []event + suffix []byte + events chan event + encodeErr error + runStopped chan struct{} } var _ Tracer = &tracer{} // NewTracer creates a new tracer to record a qlog. func NewTracer(w io.WriteCloser, p protocol.Perspective, odcid protocol.ConnectionID) Tracer { - return &tracer{ + t := &tracer{ w: w, perspective: p, odcid: odcid, + runStopped: make(chan struct{}), + events: make(chan event, eventChanSize), } + go t.run() + return t } -func (t *tracer) Active() bool { return true } - -// Export writes a qlog. -func (t *tracer) Export() error { - enc := gojay.NewEncoder(t.w) +func (t *tracer) run() { + defer close(t.runStopped) + buf := &bytes.Buffer{} + enc := gojay.NewEncoder(buf) tl := &topLevel{ traces: traces{ { VantagePoint: vantagePoint{Type: t.perspective}, CommonFields: commonFields{ODCID: connectionID(t.odcid), GroupID: connectionID(t.odcid)}, EventFields: eventFields[:], - Events: t.events, }, }} if err := enc.Encode(tl); err != nil { + panic(fmt.Sprintf("qlog encoding into a bytes.Buffer failed: %s", err)) + } + data := buf.Bytes() + t.suffix = data[buf.Len()-4:] + if _, err := t.w.Write(data[:buf.Len()-4]); err != nil { + t.encodeErr = err + } + enc = gojay.NewEncoder(t.w) + isFirst := true + for ev := range t.events { + if t.encodeErr != nil { // if encoding failed, just continue draining the event channel + continue + } + if !isFirst { + t.w.Write([]byte(",")) + } + if err := enc.Encode(ev); err != nil { + t.encodeErr = err + } + isFirst = false + } +} + +// Export writes a qlog. +func (t *tracer) Export() error { + close(t.events) + <-t.runStopped + if t.encodeErr != nil { + return t.encodeErr + } + if _, err := t.w.Write(t.suffix); err != nil { return err } return t.w.Close() @@ -80,7 +120,7 @@ func (t *tracer) StartedConnection(time time.Time, local, remote net.Addr, versi if !ok { return } - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventConnectionStarted{ SrcAddr: localAddr, @@ -89,7 +129,7 @@ func (t *tracer) StartedConnection(time time.Time, local, remote net.Addr, versi SrcConnectionID: srcConnID, DestConnectionID: destConnID, }, - }) + } } func (t *tracer) SentTransportParameters(time time.Time, tp *wire.TransportParameters) { @@ -101,7 +141,7 @@ func (t *tracer) ReceivedTransportParameters(time time.Time, tp *wire.TransportP } func (t *tracer) recordTransportParameters(time time.Time, owner owner, tp *wire.TransportParameters) { - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventTransportParameters{ Owner: owner, @@ -120,7 +160,7 @@ func (t *tracer) recordTransportParameters(time time.Time, owner owner, tp *wire InitialMaxStreamsBidi: int64(tp.MaxBidiStreamNum), InitialMaxStreamsUni: int64(tp.MaxUniStreamNum), }, - }) + } } func (t *tracer) SentPacket(time time.Time, hdr *wire.ExtendedHeader, packetSize protocol.ByteCount, ack *wire.AckFrame, frames []wire.Frame) { @@ -137,14 +177,14 @@ func (t *tracer) SentPacket(time time.Time, hdr *wire.ExtendedHeader, packetSize } header := *transformExtendedHeader(hdr) header.PacketSize = packetSize - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventPacketSent{ PacketType: PacketTypeFromHeader(&hdr.Header), Header: header, Frames: fs, }, - }) + } } func (t *tracer) ReceivedPacket(time time.Time, hdr *wire.ExtendedHeader, packetSize protocol.ByteCount, frames []wire.Frame) { @@ -154,45 +194,45 @@ func (t *tracer) ReceivedPacket(time time.Time, hdr *wire.ExtendedHeader, packet } header := *transformExtendedHeader(hdr) header.PacketSize = packetSize - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventPacketReceived{ PacketType: PacketTypeFromHeader(&hdr.Header), Header: header, Frames: fs, }, - }) + } } func (t *tracer) ReceivedRetry(time time.Time, hdr *wire.Header) { - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventRetryReceived{ Header: *transformHeader(hdr), }, - }) + } } func (t *tracer) BufferedPacket(time time.Time, packetType PacketType) { - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventPacketBuffered{PacketType: packetType}, - }) + } } func (t *tracer) DroppedPacket(time time.Time, packetType PacketType, size protocol.ByteCount, dropReason PacketDropReason) { - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventPacketDropped{ PacketType: packetType, PacketSize: size, Trigger: dropReason, }, - }) + } } func (t *tracer) UpdatedMetrics(time time.Time, rttStats *congestion.RTTStats, cwnd, bytesInFlight protocol.ByteCount, packetsInFlight int) { - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventMetricsUpdated{ MinRTT: rttStats.MinRTT(), @@ -203,35 +243,35 @@ func (t *tracer) UpdatedMetrics(time time.Time, rttStats *congestion.RTTStats, c BytesInFlight: bytesInFlight, PacketsInFlight: packetsInFlight, }, - }) + } } func (t *tracer) LostPacket(time time.Time, encLevel protocol.EncryptionLevel, pn protocol.PacketNumber, lossReason PacketLossReason) { - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventPacketLost{ PacketType: getPacketTypeFromEncryptionLevel(encLevel), PacketNumber: pn, Trigger: lossReason, }, - }) + } } func (t *tracer) UpdatedPTOCount(time time.Time, value uint32) { - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventUpdatedPTO{Value: value}, - }) + } } func (t *tracer) UpdatedKeyFromTLS(time time.Time, encLevel protocol.EncryptionLevel, pers protocol.Perspective) { - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventKeyUpdated{ Trigger: keyUpdateTLS, KeyType: encLevelToKeyType(encLevel, pers), }, - }) + } } func (t *tracer) UpdatedKey(time time.Time, generation protocol.KeyPhase, remote bool) { @@ -239,35 +279,35 @@ func (t *tracer) UpdatedKey(time time.Time, generation protocol.KeyPhase, remote if remote { trigger = keyUpdateRemote } - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventKeyUpdated{ Trigger: trigger, KeyType: keyTypeClient1RTT, Generation: generation, }, - }) - t.events = append(t.events, event{ + } + t.events <- event{ Time: time, eventDetails: eventKeyUpdated{ Trigger: trigger, KeyType: keyTypeServer1RTT, Generation: generation, }, - }) + } } func (t *tracer) DroppedEncryptionLevel(time time.Time, encLevel protocol.EncryptionLevel) { - t.events = append(t.events, event{ + t.events <- event{ Time: time, eventDetails: eventKeyRetired{ KeyType: encLevelToKeyType(encLevel, protocol.PerspectiveServer), }, - }) - t.events = append(t.events, event{ + } + t.events <- event{ Time: time, eventDetails: eventKeyRetired{ KeyType: encLevelToKeyType(encLevel, protocol.PerspectiveClient), }, - }) + } } diff --git a/qlog/qlog_test.go b/qlog/qlog_test.go index 4b4173c0..3ff60da8 100644 --- a/qlog/qlog_test.go +++ b/qlog/qlog_test.go @@ -3,6 +3,7 @@ package qlog import ( "bytes" "encoding/json" + "errors" "io" "net" "time" @@ -23,6 +24,21 @@ func nopWriteCloser(w io.Writer) io.WriteCloser { return &nopWriteCloserImpl{Writer: w} } +type limitedWriter struct { + io.WriteCloser + N int + written int +} + +func (w *limitedWriter) Write(p []byte) (int, error) { + if w.written+len(p) > w.N { + return 0, errors.New("writer full") + } + n, err := w.WriteCloser.Write(p) + w.written += n + return n, err +} + type entry struct { Time time.Time Category string @@ -69,6 +85,18 @@ var _ = Describe("Tracer", func() { Expect(vantagePoint).To(HaveKeyWithValue("type", "server")) }) + It("stops writing when encountering an error", func() { + tracer = NewTracer( + &limitedWriter{WriteCloser: nopWriteCloser(buf), N: 250}, + protocol.PerspectiveServer, + protocol.ConnectionID{0xde, 0xad, 0xbe, 0xef}, + ) + for i := uint32(0); i < 1000; i++ { + tracer.UpdatedPTOCount(time.Now(), i) + } + Expect(tracer.Export()).To(MatchError("writer full")) + }) + Context("Events", func() { exportAndParse := func() []entry { Expect(tracer.Export()).To(Succeed())