reject new connection attempts if the server's accept queue is full

This commit is contained in:
Marten Seemann
2019-01-06 13:36:52 +07:00
parent 1301610a54
commit 90514d53d1
7 changed files with 186 additions and 13 deletions

View File

@@ -8,10 +8,12 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"
"github.com/lucas-clemente/quic-go/internal/handshake"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/qerr"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"
)
@@ -88,7 +90,8 @@ type server struct {
errorChan chan struct{}
closed bool
sessionQueue chan Session
sessionQueue chan Session
sessionQueueLen int32 // to be used as an atomic
sessionRunner sessionRunner
@@ -164,9 +167,15 @@ func listen(conn net.PacketConn, tlsConf *tls.Config, config *Config) (*server,
func (s *server) setup() error {
s.sessionRunner = &runner{
onHandshakeCompleteImpl: func(sess Session) { go func() { s.sessionQueue <- sess }() },
retireConnectionIDImpl: s.sessionHandler.Retire,
removeConnectionIDImpl: s.sessionHandler.Remove,
onHandshakeCompleteImpl: func(sess Session) {
go func() {
atomic.AddInt32(&s.sessionQueueLen, 1)
s.sessionQueue <- sess // blocks until the session is accepted
atomic.AddInt32(&s.sessionQueueLen, -1)
}()
},
retireConnectionIDImpl: s.sessionHandler.Retire,
removeConnectionIDImpl: s.sessionHandler.Remove,
}
cookieGenerator, err := handshake.NewCookieGenerator()
if err != nil {
@@ -333,7 +342,7 @@ func (s *server) handleInitial(p *receivedPacket) {
s.logger.Errorf("Error occurred handling initial packet: %s", err)
return
}
if sess == nil { // a retry was done
if sess == nil { // a retry was done, or the connection attempt was rejected
p.buffer.Release()
return
}
@@ -371,6 +380,11 @@ func (s *server) handleInitialImpl(p *receivedPacket) (quicSession, protocol.Con
return nil, nil, s.sendRetry(p.remoteAddr, hdr)
}
if queueLen := atomic.LoadInt32(&s.sessionQueueLen); queueLen >= protocol.MaxAcceptQueueSize {
s.logger.Debugf("Rejecting new connection. Server currently busy. Accept queue length: %d (max %d)", queueLen, protocol.MaxAcceptQueueSize)
return nil, nil, s.sendServerBusy(p.remoteAddr, hdr)
}
connID, err := protocol.GenerateConnectionID(s.config.ConnectionIDLength)
if err != nil {
return nil, nil, err
@@ -460,6 +474,54 @@ func (s *server) sendRetry(remoteAddr net.Addr, hdr *wire.Header) error {
return nil
}
func (s *server) sendServerBusy(remoteAddr net.Addr, hdr *wire.Header) error {
sealer, _, err := handshake.NewInitialAEAD(hdr.DestConnectionID, protocol.PerspectiveServer)
if err != nil {
return err
}
packetBuffer := getPacketBuffer()
defer packetBuffer.Release()
buf := bytes.NewBuffer(packetBuffer.Slice[:0])
// TODO(#1567): use the SERVER_BUSY error code
ccf := &wire.ConnectionCloseFrame{ErrorCode: qerr.PeerGoingAway}
replyHdr := &wire.ExtendedHeader{}
replyHdr.IsLongHeader = true
replyHdr.Type = protocol.PacketTypeInitial
replyHdr.Version = hdr.Version
replyHdr.SrcConnectionID = hdr.DestConnectionID
replyHdr.DestConnectionID = hdr.SrcConnectionID
replyHdr.PacketNumberLen = protocol.PacketNumberLen4
replyHdr.Length = 4 /* packet number len */ + ccf.Length(hdr.Version) + protocol.ByteCount(sealer.Overhead())
if err := replyHdr.Write(buf, hdr.Version); err != nil {
return err
}
payloadOffset := buf.Len()
if err := ccf.Write(buf, hdr.Version); err != nil {
return err
}
raw := buf.Bytes()
_ = sealer.Seal(raw[payloadOffset:payloadOffset], raw[payloadOffset:], replyHdr.PacketNumber, raw[:payloadOffset])
raw = raw[0 : buf.Len()+sealer.Overhead()]
pnOffset := payloadOffset - int(replyHdr.PacketNumberLen)
sealer.EncryptHeader(
raw[pnOffset+4:pnOffset+4+16],
&raw[0],
raw[pnOffset:payloadOffset],
)
replyHdr.Log(s.logger)
wire.LogFrame(s.logger, ccf, true)
if _, err := s.conn.WriteTo(raw, remoteAddr); err != nil {
s.logger.Debugf("Error rejecting connection: %s", err)
}
return nil
}
func (s *server) sendVersionNegotiationPacket(p *receivedPacket) {
defer p.buffer.Release()
hdr := p.hdr