forked from quic-go/quic-go
queue undecryptable packets for later decryption & possibly send PRST
fixes #38, fixes #69, fixes #73
This commit is contained in:
47
session.go
47
session.go
@@ -19,7 +19,7 @@ import (
|
||||
type receivedPacket struct {
|
||||
remoteAddr interface{}
|
||||
publicHeader *PublicHeader
|
||||
r *bytes.Reader
|
||||
data []byte
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -59,6 +59,9 @@ type Session struct {
|
||||
closeChan chan struct{}
|
||||
closed bool
|
||||
|
||||
undecryptablePackets []receivedPacket
|
||||
aeadChanged chan struct{}
|
||||
|
||||
connectionParametersManager *handshake.ConnectionParametersManager
|
||||
|
||||
// Used to calculate the next packet number from the truncated wire
|
||||
@@ -86,10 +89,12 @@ func NewSession(conn connection, v protocol.VersionNumber, connectionID protocol
|
||||
sendingScheduled: make(chan struct{}, 1),
|
||||
rttStats: congestion.RTTStats{},
|
||||
connectionParametersManager: handshake.NewConnectionParamatersManager(),
|
||||
undecryptablePackets: make([]receivedPacket, 0, protocol.MaxUndecryptablePackets),
|
||||
aeadChanged: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
cryptoStream, _ := session.NewStream(1)
|
||||
cryptoSetup := handshake.NewCryptoSetup(connectionID, v, sCfg, cryptoStream, session.connectionParametersManager)
|
||||
cryptoSetup := handshake.NewCryptoSetup(connectionID, v, sCfg, cryptoStream, session.connectionParametersManager, session.aeadChanged)
|
||||
|
||||
go func() {
|
||||
if err := cryptoSetup.HandleCryptoStream(); err != nil {
|
||||
@@ -132,10 +137,16 @@ func (s *Session) Run() {
|
||||
case <-s.closeChan:
|
||||
return
|
||||
case p := <-s.receivedPackets:
|
||||
err = s.handlePacket(p.remoteAddr, p.publicHeader, p.r)
|
||||
err = s.handlePacket(p.remoteAddr, p.publicHeader, p.data)
|
||||
if qErr, ok := err.(*protocol.QuicError); ok && qErr.ErrorCode == errorcodes.QUIC_DECRYPTION_FAILURE {
|
||||
s.tryQueueingUndecryptablePacket(p)
|
||||
continue
|
||||
}
|
||||
s.scheduleSending()
|
||||
case <-s.sendingScheduled:
|
||||
err = s.sendPacket()
|
||||
case <-s.aeadChanged:
|
||||
s.tryDecryptingQueuedPackets()
|
||||
case <-time.After(s.connectionParametersManager.GetIdleConnectionStateLifetime()):
|
||||
s.Close(protocol.NewQuicError(errorcodes.QUIC_NETWORK_IDLE_TIMEOUT, "No recent network activity."), true)
|
||||
}
|
||||
@@ -158,7 +169,9 @@ func (s *Session) Run() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) handlePacket(remoteAddr interface{}, publicHeader *PublicHeader, r *bytes.Reader) error {
|
||||
func (s *Session) handlePacket(remoteAddr interface{}, publicHeader *PublicHeader, data []byte) error {
|
||||
r := bytes.NewReader(data)
|
||||
|
||||
// Calcualate packet number
|
||||
publicHeader.PacketNumber = protocol.InferPacketNumber(
|
||||
publicHeader.PacketNumberLen,
|
||||
@@ -173,13 +186,6 @@ func (s *Session) handlePacket(remoteAddr interface{}, publicHeader *PublicHeade
|
||||
|
||||
packet, err := s.unpacker.Unpack(publicHeader.Raw, publicHeader, r)
|
||||
if err != nil {
|
||||
// TODO: We currently treat un-decryptable packets as lost. We should
|
||||
// instead save them to a queue and retry later.
|
||||
// See issue https://github.com/lucas-clemente/quic-go/issues/38
|
||||
if qErr, ok := err.(*protocol.QuicError); ok && qErr.ErrorCode == errorcodes.QUIC_DECRYPTION_FAILURE {
|
||||
utils.Infof("Discarding packet due to decryption failure.")
|
||||
return nil // Discard packet
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -222,8 +228,8 @@ func (s *Session) handlePacket(remoteAddr interface{}, publicHeader *PublicHeade
|
||||
}
|
||||
|
||||
// HandlePacket handles a packet
|
||||
func (s *Session) HandlePacket(remoteAddr interface{}, publicHeader *PublicHeader, r *bytes.Reader) {
|
||||
s.receivedPackets <- receivedPacket{remoteAddr: remoteAddr, publicHeader: publicHeader, r: r}
|
||||
func (s *Session) HandlePacket(remoteAddr interface{}, publicHeader *PublicHeader, data []byte) {
|
||||
s.receivedPackets <- receivedPacket{remoteAddr: remoteAddr, publicHeader: publicHeader, data: data}
|
||||
}
|
||||
|
||||
// TODO: Ignore data for closed streams
|
||||
@@ -516,3 +522,18 @@ func (s *Session) scheduleSending() {
|
||||
func (s *Session) congestionAllowsSending() bool {
|
||||
return s.sentPacketHandler.BytesInFlight() <= s.congestion.GetCongestionWindow()
|
||||
}
|
||||
|
||||
func (s *Session) tryQueueingUndecryptablePacket(p receivedPacket) {
|
||||
utils.Debugf("Queueing packet 0x%x for later decryption", p.publicHeader.PacketNumber)
|
||||
if len(s.undecryptablePackets)+1 >= protocol.MaxUndecryptablePackets {
|
||||
s.Close(protocol.NewQuicError(errorcodes.QUIC_DECRYPTION_FAILURE, "too many undecryptable packets received"), true)
|
||||
}
|
||||
s.undecryptablePackets = append(s.undecryptablePackets, p)
|
||||
}
|
||||
|
||||
func (s *Session) tryDecryptingQueuedPackets() {
|
||||
for _, p := range s.undecryptablePackets {
|
||||
s.HandlePacket(p.remoteAddr, p.publicHeader, p.data)
|
||||
}
|
||||
s.undecryptablePackets = s.undecryptablePackets[:0]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user