use the HandshakeComplete() context for accepting sessions in the server

This commit is contained in:
Marten Seemann
2019-07-06 17:37:15 +07:00
parent 3724f91f33
commit b06d7b0cba
8 changed files with 70 additions and 107 deletions

View File

@@ -46,6 +46,7 @@ type packetHandlerManager interface {
type quicSession interface {
Session
HandshakeComplete() context.Context
handlePacket(*receivedPacket)
GetVersion() protocol.VersionNumber
getPerspective() protocol.Perspective
@@ -56,23 +57,12 @@ type quicSession interface {
}
type sessionRunner interface {
OnHandshakeComplete(Session)
Retire(protocol.ConnectionID)
Remove(protocol.ConnectionID)
AddResetToken([16]byte, packetHandler)
RemoveResetToken([16]byte)
}
type runner struct {
packetHandlerManager
onHandshakeCompleteImpl func(Session)
}
func (r *runner) OnHandshakeComplete(s Session) { r.onHandshakeCompleteImpl(s) }
var _ sessionRunner = &runner{}
// A Listener of QUIC
type server struct {
mutex sync.Mutex
@@ -99,8 +89,6 @@ type server struct {
sessionQueue chan Session
sessionQueueLen int32 // to be used as an atomic
sessionRunner sessionRunner
logger utils.Logger
}
@@ -169,21 +157,6 @@ func listen(conn net.PacketConn, tlsConf *tls.Config, config *Config) (*server,
newSession: newSession,
logger: utils.DefaultLogger.WithPrefix("server"),
}
s.sessionRunner = &runner{
packetHandlerManager: s.sessionHandler,
onHandshakeCompleteImpl: func(sess Session) {
go func() {
atomic.AddInt32(&s.sessionQueueLen, 1)
select {
case s.sessionQueue <- sess:
// blocks until the session is accepted
case <-sess.Context().Done():
atomic.AddInt32(&s.sessionQueueLen, -1)
// don't pass sessions that were already closed to Accept()
}
}()
},
}
sessionHandler.SetServer(s)
s.logger.Debugf("Listening for %s connections on %s", conn.LocalAddr().Network(), conn.LocalAddr().String())
return s, nil
@@ -454,7 +427,7 @@ func (s *server) createNewSession(
}
sess, err := s.newSession(
&conn{pconn: s.conn, currentAddr: remoteAddr},
s.sessionRunner,
s.sessionHandler,
clientDestConnID,
destConnID,
srcConnID,
@@ -469,9 +442,28 @@ func (s *server) createNewSession(
return nil, err
}
go sess.run()
go s.waitUntilHandshakeComplete(sess)
return sess, nil
}
func (s *server) waitUntilHandshakeComplete(sess quicSession) {
sessCtx := sess.Context()
select {
case <-sess.HandshakeComplete().Done():
case <-sessCtx.Done():
return
}
atomic.AddInt32(&s.sessionQueueLen, 1)
select {
case s.sessionQueue <- sess:
// blocks until the session is accepted
case <-sessCtx.Done():
atomic.AddInt32(&s.sessionQueueLen, -1)
// don't pass sessions that were already closed to Accept()
}
}
func (s *server) sendRetry(remoteAddr net.Addr, hdr *wire.Header) error {
token, err := s.tokenGenerator.NewRetryToken(remoteAddr, hdr.DestConnectionID)
if err != nil {