forked from quic-go/quic-go
http3: add http3.Server.ServeQUICConn to serve a single QUIC connection (#3587)
This commit is contained in:
@@ -226,11 +226,22 @@ func (s *Server) ListenAndServeTLS(certFile, keyFile string) error {
|
|||||||
|
|
||||||
// Serve an existing UDP connection.
|
// Serve an existing UDP connection.
|
||||||
// It is possible to reuse the same connection for outgoing connections.
|
// It is possible to reuse the same connection for outgoing connections.
|
||||||
// Closing the server does not close the packet conn.
|
// Closing the server does not close the connection.
|
||||||
func (s *Server) Serve(conn net.PacketConn) error {
|
func (s *Server) Serve(conn net.PacketConn) error {
|
||||||
return s.serveConn(s.TLSConfig, conn)
|
return s.serveConn(s.TLSConfig, conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServeQUICConn serves a single QUIC connection.
|
||||||
|
func (s *Server) ServeQUICConn(conn quic.Connection) error {
|
||||||
|
s.mutex.Lock()
|
||||||
|
if s.logger == nil {
|
||||||
|
s.logger = utils.DefaultLogger.WithPrefix("server")
|
||||||
|
}
|
||||||
|
s.mutex.Unlock()
|
||||||
|
|
||||||
|
return s.handleConn(conn)
|
||||||
|
}
|
||||||
|
|
||||||
// ServeListener serves an existing QUIC listener.
|
// ServeListener serves an existing QUIC listener.
|
||||||
// Make sure you use http3.ConfigureTLSConfig to configure a tls.Config
|
// Make sure you use http3.ConfigureTLSConfig to configure a tls.Config
|
||||||
// and use it to construct a http3-friendly QUIC listener.
|
// and use it to construct a http3-friendly QUIC listener.
|
||||||
@@ -297,7 +308,11 @@ func (s *Server) serveListener(ln quic.EarlyListener) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go s.handleConn(conn)
|
go func() {
|
||||||
|
if err := s.handleConn(conn); err != nil {
|
||||||
|
s.logger.Debugf(err.Error())
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -405,14 +420,13 @@ func (s *Server) removeListener(l *quic.EarlyListener) {
|
|||||||
s.mutex.Unlock()
|
s.mutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleConn(conn quic.EarlyConnection) {
|
func (s *Server) handleConn(conn quic.Connection) error {
|
||||||
decoder := qpack.NewDecoder(nil)
|
decoder := qpack.NewDecoder(nil)
|
||||||
|
|
||||||
// send a SETTINGS frame
|
// send a SETTINGS frame
|
||||||
str, err := conn.OpenUniStream()
|
str, err := conn.OpenUniStream()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Debugf("Opening the control stream failed.")
|
return fmt.Errorf("opening the control stream failed: %w", err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
b := make([]byte, 0, 64)
|
b := make([]byte, 0, 64)
|
||||||
b = quicvarint.Append(b, streamTypeControlStream) // stream type
|
b = quicvarint.Append(b, streamTypeControlStream) // stream type
|
||||||
@@ -426,8 +440,11 @@ func (s *Server) handleConn(conn quic.EarlyConnection) {
|
|||||||
for {
|
for {
|
||||||
str, err := conn.AcceptStream(context.Background())
|
str, err := conn.AcceptStream(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Debugf("Accepting stream failed: %s", err)
|
var appErr *quic.ApplicationError
|
||||||
return
|
if errors.As(err, &appErr) && appErr.ErrorCode == quic.ApplicationErrorCode(errorNoError) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("accepting stream failed: %w", err)
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
rerr := s.handleRequest(conn, str, decoder, func() {
|
rerr := s.handleRequest(conn, str, decoder, func() {
|
||||||
@@ -455,7 +472,7 @@ func (s *Server) handleConn(conn quic.EarlyConnection) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleUnidirectionalStreams(conn quic.EarlyConnection) {
|
func (s *Server) handleUnidirectionalStreams(conn quic.Connection) {
|
||||||
for {
|
for {
|
||||||
str, err := conn.AcceptUniStream(context.Background())
|
str, err := conn.AcceptUniStream(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -1151,6 +1151,30 @@ var _ = Describe("Server", func() {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Context("ServeQUICConn", func() {
|
||||||
|
It("serves a QUIC connection", func() {
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.HandleFunc("/hello", func(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
w.Write([]byte("foobar"))
|
||||||
|
})
|
||||||
|
s.Handler = mux
|
||||||
|
tlsConf := testdata.GetTLSConfig()
|
||||||
|
tlsConf.NextProtos = []string{NextProtoH3}
|
||||||
|
conn := mockquic.NewMockEarlyConnection(mockCtrl)
|
||||||
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
||||||
|
controlStr.EXPECT().Write(gomock.Any())
|
||||||
|
conn.EXPECT().OpenUniStream().Return(controlStr, nil)
|
||||||
|
testDone := make(chan struct{})
|
||||||
|
conn.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
||||||
|
<-testDone
|
||||||
|
return nil, errors.New("test done")
|
||||||
|
}).MaxTimes(1)
|
||||||
|
conn.EXPECT().AcceptStream(gomock.Any()).Return(nil, &quic.ApplicationError{ErrorCode: quic.ApplicationErrorCode(errorNoError)})
|
||||||
|
s.ServeQUICConn(conn)
|
||||||
|
close(testDone)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
Context("ListenAndServe", func() {
|
Context("ListenAndServe", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
s.Addr = "localhost:0"
|
s.Addr = "localhost:0"
|
||||||
|
|||||||
@@ -351,6 +351,29 @@ var _ = Describe("HTTP tests", func() {
|
|||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(repl).To(Equal(data))
|
Expect(repl).To(Equal(data))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if version != protocol.VersionDraft29 {
|
||||||
|
It("serves other QUIC connections", func() {
|
||||||
|
tlsConf := testdata.GetTLSConfig()
|
||||||
|
tlsConf.NextProtos = []string{"h3"}
|
||||||
|
ln, err := quic.ListenAddr("localhost:0", tlsConf, nil)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
defer close(done)
|
||||||
|
conn, err := ln.Accept(context.Background())
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(server.ServeQUICConn(conn)).To(Succeed())
|
||||||
|
}()
|
||||||
|
|
||||||
|
resp, err := client.Get(fmt.Sprintf("https://localhost:%d/hello", ln.Addr().(*net.UDPAddr).Port))
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(resp.StatusCode).To(Equal(http.StatusOK))
|
||||||
|
client.Transport.(io.Closer).Close()
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
|
})
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user