From bf54ffe0df8b08873bf8e459b6117fb6759371a4 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 21 Dec 2020 15:18:09 +0700 Subject: [PATCH] accept the control stream and parse SETTINGS frame, for the H3 server --- http3/http3_suite_test.go | 13 +++++ http3/server.go | 39 ++++++++++++- http3/server_test.go | 118 +++++++++++++++++++++++++++++++++++++- 3 files changed, 166 insertions(+), 4 deletions(-) diff --git a/http3/http3_suite_test.go b/http3/http3_suite_test.go index 14147c637..c94d932a0 100644 --- a/http3/http3_suite_test.go +++ b/http3/http3_suite_test.go @@ -1,7 +1,10 @@ package http3 import ( + "os" + "strconv" "testing" + "time" "github.com/golang/mock/gomock" @@ -23,3 +26,13 @@ var _ = BeforeEach(func() { var _ = AfterEach(func() { mockCtrl.Finish() }) + +//nolint:unparam +func scaleDuration(t time.Duration) time.Duration { + scaleFactor := 1 + if f, err := strconv.Atoi(os.Getenv("TIMESCALE_FACTOR")); err == nil { // parsing "" errors, so this works fine if the env is not set + scaleFactor = f + } + Expect(scaleFactor).ToNot(BeZero()) + return time.Duration(scaleFactor) * t +} diff --git a/http3/server.go b/http3/server.go index adc4fc861..fbc46b2cc 100644 --- a/http3/server.go +++ b/http3/server.go @@ -37,8 +37,9 @@ var ( ) const ( - nextProtoH3Draft29 = "h3-29" - nextProtoH3Draft32 = "h3-32" + nextProtoH3Draft29 = "h3-29" + nextProtoH3Draft32 = "h3-32" + streamTypeControlStream = 0 ) func versionToALPN(v protocol.VersionNumber) string { @@ -219,10 +220,13 @@ func (s *Server) handleConn(sess quic.EarlySession) { s.logger.Debugf("Opening the control stream failed.") return } - buf := bytes.NewBuffer([]byte{0}) + buf := &bytes.Buffer{} + utils.WriteVarInt(buf, streamTypeControlStream) // stream type (&settingsFrame{}).Write(buf) str.Write(buf.Bytes()) + go s.handleUnidirectionalStreams(sess) + // Process all requests immediately. // It's the client's responsibility to decide which requests are eligible for 0-RTT. for { @@ -254,6 +258,35 @@ func (s *Server) handleConn(sess quic.EarlySession) { } } +func (s *Server) handleUnidirectionalStreams(sess quic.EarlySession) { + for { + str, err := sess.AcceptUniStream(context.Background()) + if err != nil { + s.logger.Debugf("accepting unidirectional stream failed: %s", err) + return + } + + go func(str quic.ReceiveStream) { + streamType, err := utils.ReadVarInt(&byteReaderImpl{str}) + if err != nil { + s.logger.Debugf("reading stream type on stream %d failed: %s", str.StreamID(), err) + return + } + if streamType != streamTypeControlStream { + return + } + f, err := parseNextFrame(str) + if err != nil { + sess.CloseWithError(quic.ErrorCode(errorFrameError), "") + return + } + if _, ok := f.(*settingsFrame); !ok { + sess.CloseWithError(quic.ErrorCode(errorMissingSettings), "") + } + }(str) + } +} + func (s *Server) maxHeaderBytes() uint64 { if s.Server.MaxHeaderBytes <= 0 { return http.DefaultMaxHeaderBytes diff --git a/http3/server_test.go b/http3/server_test.go index 9d43443e2..b4afaf227 100644 --- a/http3/server_test.go +++ b/http3/server_test.go @@ -181,21 +181,137 @@ var _ = Describe("Server", func() { Expect(hfs).To(HaveKeyWithValue(":status", []string{"500"})) }) - Context("stream- and connection-level errors", func() { + Context("control stream handling", func() { var sess *mockquic.MockEarlySession + testDone := make(chan struct{}) BeforeEach(func() { + sess = mockquic.NewMockEarlySession(mockCtrl) + controlStr := mockquic.NewMockStream(mockCtrl) + controlStr.EXPECT().Write(gomock.Any()) + sess.EXPECT().OpenUniStream().Return(controlStr, nil) + sess.EXPECT().AcceptStream(gomock.Any()).Return(nil, errors.New("done")) + sess.EXPECT().RemoteAddr().Return(&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1337}).AnyTimes() + sess.EXPECT().LocalAddr().AnyTimes() + }) + + AfterEach(func() { testDone <- struct{}{} }) + + It("parses the SETTINGS frame", func() { + buf := &bytes.Buffer{} + utils.WriteVarInt(buf, streamTypeControlStream) + (&settingsFrame{}).Write(buf) + controlStr := mockquic.NewMockStream(mockCtrl) + controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes() + sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) { + return controlStr, nil + }) + sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) { + <-testDone + return nil, errors.New("test done") + }) + s.handleConn(sess) + time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to sess.CloseWithError + }) + + It("ignores streams other than the control stream", func() { + controlBuf := &bytes.Buffer{} + utils.WriteVarInt(controlBuf, streamTypeControlStream) + (&settingsFrame{}).Write(controlBuf) + controlStr := mockquic.NewMockStream(mockCtrl) + controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(controlBuf.Read).AnyTimes() + + otherBuf := &bytes.Buffer{} + utils.WriteVarInt(otherBuf, 1337) + otherStr := mockquic.NewMockStream(mockCtrl) + otherStr.EXPECT().Read(gomock.Any()).DoAndReturn(otherBuf.Read).AnyTimes() + + sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) { + return otherStr, nil + }) + sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) { + return controlStr, nil + }) + sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) { + <-testDone + return nil, errors.New("test done") + }) + s.handleConn(sess) + time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to sess.CloseWithError + }) + + It("errors when the first frame on the control stream is not a SETTINGS frame", func() { + buf := &bytes.Buffer{} + utils.WriteVarInt(buf, streamTypeControlStream) + (&dataFrame{}).Write(buf) + controlStr := mockquic.NewMockStream(mockCtrl) + controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes() + sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) { + return controlStr, nil + }) + sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) { + <-testDone + return nil, errors.New("test done") + }) + done := make(chan struct{}) + sess.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).Do(func(code quic.ErrorCode, _ string) { + defer GinkgoRecover() + Expect(code).To(BeEquivalentTo(errorMissingSettings)) + close(done) + }) + s.handleConn(sess) + Eventually(done).Should(BeClosed()) + }) + + It("errors when parsing the frame on the control stream fails", func() { + buf := &bytes.Buffer{} + utils.WriteVarInt(buf, streamTypeControlStream) + b := &bytes.Buffer{} + (&settingsFrame{}).Write(b) + buf.Write(b.Bytes()[:b.Len()-1]) + controlStr := mockquic.NewMockStream(mockCtrl) + controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes() + sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) { + return controlStr, nil + }) + sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) { + <-testDone + return nil, errors.New("test done") + }) + done := make(chan struct{}) + sess.EXPECT().CloseWithError(gomock.Any(), gomock.Any()).Do(func(code quic.ErrorCode, _ string) { + defer GinkgoRecover() + Expect(code).To(BeEquivalentTo(errorFrameError)) + close(done) + }) + s.handleConn(sess) + Eventually(done).Should(BeClosed()) + }) + }) + + Context("stream- and connection-level errors", func() { + var sess *mockquic.MockEarlySession + testDone := make(chan struct{}) + + BeforeEach(func() { + testDone = make(chan struct{}) addr := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1337} sess = mockquic.NewMockEarlySession(mockCtrl) controlStr := mockquic.NewMockStream(mockCtrl) controlStr.EXPECT().Write(gomock.Any()) sess.EXPECT().OpenUniStream().Return(controlStr, nil) + sess.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) { + <-testDone + return nil, errors.New("test done") + }) sess.EXPECT().AcceptStream(gomock.Any()).Return(str, nil) sess.EXPECT().AcceptStream(gomock.Any()).Return(nil, errors.New("done")) sess.EXPECT().RemoteAddr().Return(addr).AnyTimes() sess.EXPECT().LocalAddr().AnyTimes() }) + AfterEach(func() { testDone <- struct{}{} }) + It("cancels reading when client sends a body in GET request", func() { handlerCalled := make(chan struct{}) s.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {