implement a quic.Listener, privatize the Server

This commit is contained in:
Marten Seemann
2017-02-17 15:48:27 +07:00
parent 6dd163020a
commit 5029ab0934
6 changed files with 221 additions and 139 deletions

View File

@@ -20,10 +20,8 @@ import (
)
type streamCreator interface {
AcceptStream() (utils.Stream, error)
quic.Session
GetOrOpenStream(protocol.StreamID) (utils.Stream, error)
Close(error) error
RemoteAddr() net.Addr
}
// Server is a HTTP2 server listening for QUIC connections.
@@ -35,8 +33,8 @@ type Server struct {
port uint32 // used atomically
server *quic.Server
serverMutex sync.Mutex
listenerMutex sync.Mutex
listener quic.Listener
}
// ListenAndServe listens on the UDP address s.Addr and calls s.Handler to handle HTTP/2 requests on incoming connections.
@@ -72,31 +70,41 @@ func (s *Server) serveImpl(tlsConfig *tls.Config, conn *net.UDPConn) error {
if s.Server == nil {
return errors.New("use of h2quic.Server without http.Server")
}
s.serverMutex.Lock()
if s.server != nil {
s.serverMutex.Unlock()
s.listenerMutex.Lock()
if s.listener != nil {
s.listenerMutex.Unlock()
return errors.New("ListenAndServe may only be called once")
}
var err error
server, err := quic.NewServer(s.Addr, tlsConfig, s.handleStreamCb)
config := quic.Config{
TLSConfig: tlsConfig,
ConnState: func(session quic.Session, connState quic.ConnState) {
sess := session.(streamCreator)
if connState == quic.ConnStateVersionNegotiated {
s.handleHeaderStream(sess)
}
},
}
ln, err := quic.NewListener(&config)
if err != nil {
s.serverMutex.Unlock()
s.listenerMutex.Unlock()
return err
}
s.server = server
s.serverMutex.Unlock()
s.listener = ln
s.listenerMutex.Unlock()
if conn == nil {
return server.ListenAndServe()
return ln.ListenAddr(s.Addr)
}
return server.Serve(conn)
return ln.Listen(conn)
}
func (s *Server) handleStreamCb(session quic.Session, stream utils.Stream) {
s.handleStream(session.(streamCreator), stream)
}
func (s *Server) handleStream(session streamCreator, stream utils.Stream) {
func (s *Server) handleHeaderStream(session streamCreator) {
stream, err := session.AcceptStream()
if err != nil {
session.Close(qerr.Error(qerr.InvalidHeadersStreamData, err.Error()))
return
}
if stream.StreamID() != 3 {
session.Close(qerr.Error(qerr.InternalError, "h2quic server BUG: header stream does not have stream ID 3"))
return
}
@@ -214,11 +222,11 @@ func (s *Server) handleRequest(session streamCreator, headerStream utils.Stream,
// Close the server immediately, aborting requests and sending CONNECTION_CLOSE frames to connected clients.
// Close in combination with ListenAndServe() (instead of Serve()) may race if it is called before a UDP socket is established.
func (s *Server) Close() error {
s.serverMutex.Lock()
defer s.serverMutex.Unlock()
if s.server != nil {
err := s.server.Close()
s.server = nil
s.listenerMutex.Lock()
defer s.listenerMutex.Unlock()
if s.listener != nil {
err := s.listener.Close()
s.listener = nil
return err
}
return nil