Merge pull request #2417 from lucas-clemente/qlog-continuous-encoding

continuously encode qlog events
This commit is contained in:
Marten Seemann
2020-03-18 12:48:40 +07:00
committed by GitHub
6 changed files with 131 additions and 41 deletions

View File

@@ -1,6 +1,7 @@
package main
import (
"bufio"
"bytes"
"crypto/tls"
"crypto/x509"
@@ -61,7 +62,13 @@ func main() {
log.Fatal(err)
}
log.Printf("Creating qlog file %s.\n", filename)
return f
return struct {
io.Writer
io.Closer
}{
bufio.NewWriter(f),
f,
}
}
}
roundTripper := &http3.RoundTripper{

View File

@@ -1,6 +1,7 @@
package main
import (
"bufio"
"crypto/md5"
"errors"
"flag"
@@ -215,7 +216,13 @@ func main() {
log.Fatal(err)
}
log.Printf("Creating qlog file %s.\n", filename)
return f
return struct {
io.Writer
io.Closer
}{
bufio.NewWriter(f),
f,
}
}
}

View File

@@ -248,6 +248,7 @@ type Config struct {
// GetLogWriter is used to pass in a writer for the qlog.
// If it is nil, no qlog will be collected and exported.
// If it returns nil, no qlog will be collected and exported for the respective connection.
// It is recommended to use a buffered writer here.
GetLogWriter func(connectionID []byte) io.WriteCloser
}

View File

@@ -1,6 +1,7 @@
package utils
import (
"bufio"
"fmt"
"io"
"log"
@@ -38,6 +39,12 @@ func GetQLOGWriter() (func(connID []byte) io.WriteCloser, error) {
if err != nil {
log.Fatalf("Failed to create qlog file %s: %s", path, err.Error())
}
return f
return struct {
io.Writer
io.Closer
}{
bufio.NewWriter(f),
f,
}
}, nil
}

View File

@@ -1,6 +1,8 @@
package qlog
import (
"bytes"
"fmt"
"io"
"net"
"time"
@@ -12,6 +14,8 @@ import (
"github.com/francoispqt/gojay"
)
const eventChanSize = 50
// A Tracer records events to be exported to a qlog.
type Tracer interface {
Export() error
@@ -36,35 +40,71 @@ type tracer struct {
odcid protocol.ConnectionID
perspective protocol.Perspective
events []event
suffix []byte
events chan event
encodeErr error
runStopped chan struct{}
}
var _ Tracer = &tracer{}
// NewTracer creates a new tracer to record a qlog.
func NewTracer(w io.WriteCloser, p protocol.Perspective, odcid protocol.ConnectionID) Tracer {
return &tracer{
t := &tracer{
w: w,
perspective: p,
odcid: odcid,
runStopped: make(chan struct{}),
events: make(chan event, eventChanSize),
}
go t.run()
return t
}
func (t *tracer) Active() bool { return true }
// Export writes a qlog.
func (t *tracer) Export() error {
enc := gojay.NewEncoder(t.w)
func (t *tracer) run() {
defer close(t.runStopped)
buf := &bytes.Buffer{}
enc := gojay.NewEncoder(buf)
tl := &topLevel{
traces: traces{
{
VantagePoint: vantagePoint{Type: t.perspective},
CommonFields: commonFields{ODCID: connectionID(t.odcid), GroupID: connectionID(t.odcid)},
EventFields: eventFields[:],
Events: t.events,
},
}}
if err := enc.Encode(tl); err != nil {
panic(fmt.Sprintf("qlog encoding into a bytes.Buffer failed: %s", err))
}
data := buf.Bytes()
t.suffix = data[buf.Len()-4:]
if _, err := t.w.Write(data[:buf.Len()-4]); err != nil {
t.encodeErr = err
}
enc = gojay.NewEncoder(t.w)
isFirst := true
for ev := range t.events {
if t.encodeErr != nil { // if encoding failed, just continue draining the event channel
continue
}
if !isFirst {
t.w.Write([]byte(","))
}
if err := enc.Encode(ev); err != nil {
t.encodeErr = err
}
isFirst = false
}
}
// Export writes a qlog.
func (t *tracer) Export() error {
close(t.events)
<-t.runStopped
if t.encodeErr != nil {
return t.encodeErr
}
if _, err := t.w.Write(t.suffix); err != nil {
return err
}
return t.w.Close()
@@ -80,7 +120,7 @@ func (t *tracer) StartedConnection(time time.Time, local, remote net.Addr, versi
if !ok {
return
}
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventConnectionStarted{
SrcAddr: localAddr,
@@ -89,7 +129,7 @@ func (t *tracer) StartedConnection(time time.Time, local, remote net.Addr, versi
SrcConnectionID: srcConnID,
DestConnectionID: destConnID,
},
})
}
}
func (t *tracer) SentTransportParameters(time time.Time, tp *wire.TransportParameters) {
@@ -101,7 +141,7 @@ func (t *tracer) ReceivedTransportParameters(time time.Time, tp *wire.TransportP
}
func (t *tracer) recordTransportParameters(time time.Time, owner owner, tp *wire.TransportParameters) {
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventTransportParameters{
Owner: owner,
@@ -120,7 +160,7 @@ func (t *tracer) recordTransportParameters(time time.Time, owner owner, tp *wire
InitialMaxStreamsBidi: int64(tp.MaxBidiStreamNum),
InitialMaxStreamsUni: int64(tp.MaxUniStreamNum),
},
})
}
}
func (t *tracer) SentPacket(time time.Time, hdr *wire.ExtendedHeader, packetSize protocol.ByteCount, ack *wire.AckFrame, frames []wire.Frame) {
@@ -137,14 +177,14 @@ func (t *tracer) SentPacket(time time.Time, hdr *wire.ExtendedHeader, packetSize
}
header := *transformExtendedHeader(hdr)
header.PacketSize = packetSize
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventPacketSent{
PacketType: PacketTypeFromHeader(&hdr.Header),
Header: header,
Frames: fs,
},
})
}
}
func (t *tracer) ReceivedPacket(time time.Time, hdr *wire.ExtendedHeader, packetSize protocol.ByteCount, frames []wire.Frame) {
@@ -154,45 +194,45 @@ func (t *tracer) ReceivedPacket(time time.Time, hdr *wire.ExtendedHeader, packet
}
header := *transformExtendedHeader(hdr)
header.PacketSize = packetSize
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventPacketReceived{
PacketType: PacketTypeFromHeader(&hdr.Header),
Header: header,
Frames: fs,
},
})
}
}
func (t *tracer) ReceivedRetry(time time.Time, hdr *wire.Header) {
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventRetryReceived{
Header: *transformHeader(hdr),
},
})
}
}
func (t *tracer) BufferedPacket(time time.Time, packetType PacketType) {
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventPacketBuffered{PacketType: packetType},
})
}
}
func (t *tracer) DroppedPacket(time time.Time, packetType PacketType, size protocol.ByteCount, dropReason PacketDropReason) {
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventPacketDropped{
PacketType: packetType,
PacketSize: size,
Trigger: dropReason,
},
})
}
}
func (t *tracer) UpdatedMetrics(time time.Time, rttStats *congestion.RTTStats, cwnd, bytesInFlight protocol.ByteCount, packetsInFlight int) {
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventMetricsUpdated{
MinRTT: rttStats.MinRTT(),
@@ -203,35 +243,35 @@ func (t *tracer) UpdatedMetrics(time time.Time, rttStats *congestion.RTTStats, c
BytesInFlight: bytesInFlight,
PacketsInFlight: packetsInFlight,
},
})
}
}
func (t *tracer) LostPacket(time time.Time, encLevel protocol.EncryptionLevel, pn protocol.PacketNumber, lossReason PacketLossReason) {
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventPacketLost{
PacketType: getPacketTypeFromEncryptionLevel(encLevel),
PacketNumber: pn,
Trigger: lossReason,
},
})
}
}
func (t *tracer) UpdatedPTOCount(time time.Time, value uint32) {
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventUpdatedPTO{Value: value},
})
}
}
func (t *tracer) UpdatedKeyFromTLS(time time.Time, encLevel protocol.EncryptionLevel, pers protocol.Perspective) {
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventKeyUpdated{
Trigger: keyUpdateTLS,
KeyType: encLevelToKeyType(encLevel, pers),
},
})
}
}
func (t *tracer) UpdatedKey(time time.Time, generation protocol.KeyPhase, remote bool) {
@@ -239,35 +279,35 @@ func (t *tracer) UpdatedKey(time time.Time, generation protocol.KeyPhase, remote
if remote {
trigger = keyUpdateRemote
}
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventKeyUpdated{
Trigger: trigger,
KeyType: keyTypeClient1RTT,
Generation: generation,
},
})
t.events = append(t.events, event{
}
t.events <- event{
Time: time,
eventDetails: eventKeyUpdated{
Trigger: trigger,
KeyType: keyTypeServer1RTT,
Generation: generation,
},
})
}
}
func (t *tracer) DroppedEncryptionLevel(time time.Time, encLevel protocol.EncryptionLevel) {
t.events = append(t.events, event{
t.events <- event{
Time: time,
eventDetails: eventKeyRetired{
KeyType: encLevelToKeyType(encLevel, protocol.PerspectiveServer),
},
})
t.events = append(t.events, event{
}
t.events <- event{
Time: time,
eventDetails: eventKeyRetired{
KeyType: encLevelToKeyType(encLevel, protocol.PerspectiveClient),
},
})
}
}

View File

@@ -3,6 +3,7 @@ package qlog
import (
"bytes"
"encoding/json"
"errors"
"io"
"net"
"time"
@@ -23,6 +24,21 @@ func nopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloserImpl{Writer: w}
}
type limitedWriter struct {
io.WriteCloser
N int
written int
}
func (w *limitedWriter) Write(p []byte) (int, error) {
if w.written+len(p) > w.N {
return 0, errors.New("writer full")
}
n, err := w.WriteCloser.Write(p)
w.written += n
return n, err
}
type entry struct {
Time time.Time
Category string
@@ -69,6 +85,18 @@ var _ = Describe("Tracer", func() {
Expect(vantagePoint).To(HaveKeyWithValue("type", "server"))
})
It("stops writing when encountering an error", func() {
tracer = NewTracer(
&limitedWriter{WriteCloser: nopWriteCloser(buf), N: 250},
protocol.PerspectiveServer,
protocol.ConnectionID{0xde, 0xad, 0xbe, 0xef},
)
for i := uint32(0); i < 1000; i++ {
tracer.UpdatedPTOCount(time.Now(), i)
}
Expect(tracer.Export()).To(MatchError("writer full"))
})
Context("Events", func() {
exportAndParse := func() []entry {
Expect(tracer.Export()).To(Succeed())