send coalesced packets

This commit is contained in:
Marten Seemann
2020-02-13 17:13:29 +07:00
parent d642bf9098
commit 5aad7cae5d
6 changed files with 452 additions and 186 deletions

View File

@@ -1309,13 +1309,24 @@ func (s *session) sendPacket() (bool, error) {
}
s.windowUpdateQueue.QueueAll()
var packet *packedPacket
var err error
if !s.handshakeConfirmed {
packet, err = s.packer.PackPacket()
} else {
packet, err = s.packer.PackAppDataPacket()
now := time.Now()
packet, err := s.packer.PackPacket()
if err != nil || packet == nil {
return false, err
}
for _, p := range packet.packets {
if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && p.IsAckEliciting() {
s.firstAckElicitingPacketAfterIdleSentTime = now
}
s.sentPacketHandler.SentPacket(p.ToAckHandlerPacket(now, s.retransmissionQueue))
}
s.connIDManager.SentPacket()
s.logCoalescedPacket(now, packet)
s.sendQueue.Send(packet.buffer)
return true, nil
}
packet, err := s.packer.PackAppDataPacket()
if err != nil || packet == nil {
return false, err
}
@@ -1324,35 +1335,13 @@ func (s *session) sendPacket() (bool, error) {
}
func (s *session) sendPackedPacket(packet *packedPacket) {
if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() {
s.firstAckElicitingPacketAfterIdleSentTime = time.Now()
}
now := time.Now()
s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket(now, s.retransmissionQueue))
if s.traceCallback != nil {
frames := make([]wire.Frame, 0, len(packet.frames))
for _, f := range packet.frames {
frames = append(frames, f.Frame)
}
s.traceCallback(quictrace.Event{
Time: now,
EventType: quictrace.PacketSent,
TransportState: s.sentPacketHandler.GetStats(),
EncryptionLevel: packet.EncryptionLevel(),
PacketNumber: packet.header.PacketNumber,
PacketSize: packet.buffer.Len(),
Frames: frames,
})
if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() {
s.firstAckElicitingPacketAfterIdleSentTime = now
}
if s.qlogger != nil {
frames := make([]wire.Frame, 0, len(packet.frames))
for _, f := range packet.frames {
frames = append(frames, f.Frame)
}
s.qlogger.SentPacket(now, packet.header, packet.buffer.Len(), packet.ack, frames)
}
s.logPacket(packet)
s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket(time.Now(), s.retransmissionQueue))
s.connIDManager.SentPacket()
s.logPacket(now, packet)
s.sendQueue.Send(packet.buffer)
}
@@ -1375,25 +1364,66 @@ func (s *session) sendConnectionClose(quicErr *qerr.QuicError) ([]byte, error) {
if err != nil {
return nil, err
}
s.logPacket(packet)
s.logPacket(time.Now(), packet)
return packet.buffer.Data, s.conn.Write(packet.buffer.Data)
}
func (s *session) logPacket(packet *packedPacket) {
func (s *session) logPacketContents(now time.Time, p *packetContents) {
// qlog
if s.qlogger != nil {
frames := make([]wire.Frame, 0, len(p.frames))
for _, f := range p.frames {
frames = append(frames, f.Frame)
}
s.qlogger.SentPacket(now, p.header, p.length, p.ack, frames)
}
// quic-trace
if s.traceCallback != nil {
frames := make([]wire.Frame, 0, len(p.frames))
for _, f := range p.frames {
frames = append(frames, f.Frame)
}
s.traceCallback(quictrace.Event{
Time: now,
EventType: quictrace.PacketSent,
TransportState: s.sentPacketHandler.GetStats(),
EncryptionLevel: p.EncryptionLevel(),
PacketNumber: p.header.PacketNumber,
PacketSize: p.length,
Frames: frames,
})
}
// quic-go logging
if !s.logger.Debug() {
// We don't need to allocate the slices for calling the format functions
return
}
s.logger.Debugf("-> Sending packet 0x%x (%d bytes) for connection %s, %s", packet.header.PacketNumber, packet.buffer.Len(), s.logID, packet.EncryptionLevel())
packet.header.Log(s.logger)
if packet.ack != nil {
wire.LogFrame(s.logger, packet.ack, true)
p.header.Log(s.logger)
if p.ack != nil {
wire.LogFrame(s.logger, p.ack, true)
}
for _, frame := range packet.frames {
for _, frame := range p.frames {
wire.LogFrame(s.logger, frame.Frame, true)
}
}
func (s *session) logCoalescedPacket(now time.Time, packet *coalescedPacket) {
if s.logger.Debug() {
s.logger.Debugf("-> Sending coalesced packet (%d parts, %d bytes) for connection %s", len(packet.packets), packet.buffer.Len(), s.logID)
}
for _, p := range packet.packets {
s.logPacketContents(now, p)
}
}
func (s *session) logPacket(now time.Time, packet *packedPacket) {
if s.logger.Debug() {
s.logger.Debugf("-> Sending packet %#x (%d bytes) for connection %s, %s", packet.header.PacketNumber, packet.buffer.Len(), s.logID, packet.EncryptionLevel())
}
s.logPacketContents(now, packet.packetContents)
}
// AcceptStream returns the next stream openend by the peer
func (s *session) AcceptStream(ctx context.Context) (Stream, error) {
return s.streamsMap.AcceptStream(ctx)