forked from quic-go/quic-go
implement handling of coalesced packets
This commit is contained in:
@@ -153,11 +153,7 @@ func (h *packetHandlerMap) listen() {
|
||||
h.close(err)
|
||||
return
|
||||
}
|
||||
data = data[:n]
|
||||
|
||||
if err := h.handlePacket(addr, buffer, data); err != nil {
|
||||
h.logger.Debugf("error handling packet from %s: %s", addr, err)
|
||||
}
|
||||
h.handlePacket(addr, buffer, data[:n])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,56 +161,102 @@ func (h *packetHandlerMap) handlePacket(
|
||||
addr net.Addr,
|
||||
buffer *packetBuffer,
|
||||
data []byte,
|
||||
) error {
|
||||
r := bytes.NewReader(data)
|
||||
hdr, err := wire.ParseHeader(r, h.connIDLen)
|
||||
// drop the packet if we can't parse the header
|
||||
) {
|
||||
packets, err := h.parsePacket(addr, buffer, data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing header: %s", err)
|
||||
h.logger.Debugf("error parsing packets from %s: %s", addr, err)
|
||||
// This is just the error from parsing the last packet.
|
||||
// We still need to process the packets that were successfully parsed before.
|
||||
}
|
||||
if len(packets) == 0 {
|
||||
putPacketBuffer(buffer)
|
||||
return
|
||||
}
|
||||
h.handleParsedPackets(packets)
|
||||
}
|
||||
|
||||
if hdr.IsLongHeader {
|
||||
if protocol.ByteCount(r.Len()) < hdr.Length {
|
||||
return fmt.Errorf("packet length (%d bytes) is smaller than the expected length (%d bytes)", len(data)-int(hdr.ParsedLen()), hdr.Length)
|
||||
func (h *packetHandlerMap) parsePacket(
|
||||
addr net.Addr,
|
||||
buffer *packetBuffer,
|
||||
data []byte,
|
||||
) ([]*receivedPacket, error) {
|
||||
rcvTime := time.Now()
|
||||
packets := make([]*receivedPacket, 0, 1)
|
||||
|
||||
var counter int
|
||||
var lastConnID protocol.ConnectionID
|
||||
for len(data) > 0 {
|
||||
if counter > 0 && h.logger.Debug() {
|
||||
h.logger.Debugf("Parsed a coalesced packet. Part %d: %d bytes", counter, len(packets[counter-1].data))
|
||||
}
|
||||
data = data[:int(hdr.ParsedLen()+hdr.Length)]
|
||||
// TODO(#1312): implement parsing of compound packets
|
||||
}
|
||||
|
||||
p := &receivedPacket{
|
||||
remoteAddr: addr,
|
||||
hdr: hdr,
|
||||
rcvTime: time.Now(),
|
||||
data: data,
|
||||
buffer: buffer,
|
||||
}
|
||||
hdr, err := wire.ParseHeader(bytes.NewReader(data), h.connIDLen)
|
||||
// drop the packet if we can't parse the header
|
||||
if err != nil {
|
||||
return packets, fmt.Errorf("error parsing header: %s", err)
|
||||
}
|
||||
if counter > 0 && !hdr.DestConnectionID.Equal(lastConnID) {
|
||||
return packets, fmt.Errorf("coalesced packet has different destination connection ID: %s, expected %s", hdr.DestConnectionID, lastConnID)
|
||||
}
|
||||
lastConnID = hdr.DestConnectionID
|
||||
|
||||
var rest []byte
|
||||
if hdr.IsLongHeader {
|
||||
if protocol.ByteCount(len(data)) < hdr.Length {
|
||||
return packets, fmt.Errorf("packet length (%d bytes) is smaller than the expected length (%d bytes)", len(data)-int(hdr.ParsedLen()), hdr.Length)
|
||||
}
|
||||
packetLen := int(hdr.ParsedLen() + hdr.Length)
|
||||
rest = data[packetLen:]
|
||||
data = data[:packetLen]
|
||||
}
|
||||
|
||||
if counter > 0 {
|
||||
buffer.Split()
|
||||
}
|
||||
counter++
|
||||
packets = append(packets, &receivedPacket{
|
||||
remoteAddr: addr,
|
||||
hdr: hdr,
|
||||
rcvTime: rcvTime,
|
||||
data: data,
|
||||
buffer: buffer,
|
||||
})
|
||||
data = rest
|
||||
}
|
||||
return packets, nil
|
||||
}
|
||||
|
||||
func (h *packetHandlerMap) handleParsedPackets(packets []*receivedPacket) {
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
|
||||
handlerEntry, handlerFound := h.handlers[string(hdr.DestConnectionID)]
|
||||
// coalesced packets all have the same destination connection ID
|
||||
handlerEntry, handlerFound := h.handlers[string(packets[0].hdr.DestConnectionID)]
|
||||
|
||||
if handlerFound { // existing session
|
||||
handlerEntry.handler.handlePacket(p)
|
||||
return nil
|
||||
}
|
||||
// No session found.
|
||||
// This might be a stateless reset.
|
||||
if !hdr.IsLongHeader {
|
||||
if len(data) >= protocol.MinStatelessResetSize {
|
||||
var token [16]byte
|
||||
copy(token[:], data[len(data)-16:])
|
||||
if sess, ok := h.resetTokens[token]; ok {
|
||||
sess.destroy(errors.New("received a stateless reset"))
|
||||
return nil
|
||||
}
|
||||
for _, p := range packets {
|
||||
if handlerFound { // existing session
|
||||
handlerEntry.handler.handlePacket(p)
|
||||
continue
|
||||
}
|
||||
// TODO(#943): send a stateless reset
|
||||
return fmt.Errorf("received a short header packet with an unexpected connection ID %s", hdr.DestConnectionID)
|
||||
// No session found.
|
||||
// This might be a stateless reset.
|
||||
if !p.hdr.IsLongHeader {
|
||||
if len(p.data) >= protocol.MinStatelessResetSize {
|
||||
var token [16]byte
|
||||
copy(token[:], p.data[len(p.data)-16:])
|
||||
if sess, ok := h.resetTokens[token]; ok {
|
||||
sess.destroy(errors.New("received a stateless reset"))
|
||||
continue
|
||||
}
|
||||
}
|
||||
// TODO(#943): send a stateless reset
|
||||
h.logger.Debugf("received a short header packet with an unexpected connection ID %s", p.hdr.DestConnectionID)
|
||||
break // a short header packet is always the last in a coalesced packet
|
||||
|
||||
}
|
||||
if h.server != nil { // no server set
|
||||
h.server.handlePacket(p)
|
||||
}
|
||||
h.logger.Debugf("received a packet with an unexpected connection ID %s", p.hdr.DestConnectionID)
|
||||
}
|
||||
if h.server == nil { // no server set
|
||||
return fmt.Errorf("received a packet with an unexpected connection ID %s", hdr.DestConnectionID)
|
||||
}
|
||||
h.server.handlePacket(p)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user