From 66e2539b2e2db557f3f8ec360d5508200bf2edc2 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 19 Jun 2025 18:16:26 +0800 Subject: [PATCH] http3: explicitly expose all method on the RequestStream (#5216) --- http3/mock_datagram_stream_test.go | 38 ++++++++ http3/stream.go | 134 +++++++++++++++++++++++------ 2 files changed, 145 insertions(+), 27 deletions(-) diff --git a/http3/mock_datagram_stream_test.go b/http3/mock_datagram_stream_test.go index 7ca0ae63..f9fbf895 100644 --- a/http3/mock_datagram_stream_test.go +++ b/http3/mock_datagram_stream_test.go @@ -344,6 +344,44 @@ func (c *MockDatagramStreamSendDatagramCall) DoAndReturn(f func([]byte) error) * return c } +// SetDeadline mocks base method. +func (m *MockDatagramStream) SetDeadline(arg0 time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetDeadline", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetDeadline indicates an expected call of SetDeadline. +func (mr *MockDatagramStreamMockRecorder) SetDeadline(arg0 any) *MockDatagramStreamSetDeadlineCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetDeadline", reflect.TypeOf((*MockDatagramStream)(nil).SetDeadline), arg0) + return &MockDatagramStreamSetDeadlineCall{Call: call} +} + +// MockDatagramStreamSetDeadlineCall wrap *gomock.Call +type MockDatagramStreamSetDeadlineCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockDatagramStreamSetDeadlineCall) Return(arg0 error) *MockDatagramStreamSetDeadlineCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockDatagramStreamSetDeadlineCall) Do(f func(time.Time) error) *MockDatagramStreamSetDeadlineCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockDatagramStreamSetDeadlineCall) DoAndReturn(f func(time.Time) error) *MockDatagramStreamSetDeadlineCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // SetReadDeadline mocks base method. func (m *MockDatagramStream) SetReadDeadline(arg0 time.Time) error { m.ctrl.T.Helper() diff --git a/http3/stream.go b/http3/stream.go index bcc627d6..11b97679 100644 --- a/http3/stream.go +++ b/http3/stream.go @@ -21,6 +21,7 @@ type datagramStream interface { CancelWrite(quic.StreamErrorCode) StreamID() quic.StreamID Context() context.Context + SetDeadline(time.Time) error SetReadDeadline(time.Time) error SetWriteDeadline(time.Time) error SendDatagram(b []byte) error @@ -29,7 +30,8 @@ type datagramStream interface { QUICStream() *quic.Stream } -// A Stream is an HTTP/3 request stream. +// A Stream is an HTTP/3 stream. +// // When writing to and reading from the stream, data is framed in HTTP/3 DATA frames. type Stream struct { datagramStream @@ -121,12 +123,25 @@ func (s *Stream) StreamID() protocol.StreamID { return s.datagramStream.StreamID() } +func (s *Stream) SendDatagram(b []byte) error { + // TODO: reject if datagrams are not negotiated (yet) + return s.datagramStream.SendDatagram(b) +} + +func (s *Stream) ReceiveDatagram(ctx context.Context) ([]byte, error) { + // TODO: reject if datagrams are not negotiated (yet) + return s.datagramStream.ReceiveDatagram(ctx) +} + // A RequestStream is a low-level abstraction representing an HTTP/3 request stream. // It decouples sending of the HTTP request from reading the HTTP response, allowing // the application to optimistically use the stream (and, for example, send datagrams) // before receiving the response. +// +// This is only needed for advanced use case, e.g. WebTransport and the various +// MASQUE proxying protocols. type RequestStream struct { - *Stream + str *Stream responseBody io.ReadCloser // set by ReadResponse @@ -155,7 +170,7 @@ func newRequestStream( trace *httptrace.ClientTrace, ) *RequestStream { return &RequestStream{ - Stream: str, + str: str, requestWriter: requestWriter, reqDone: reqDone, decoder: decoder, @@ -166,6 +181,10 @@ func newRequestStream( } } +// Read reads data from the underlying stream. +// +// It can only be used after the request has been sent (using SendRequestHeader) +// and the response has been consumed (using ReadResponse). func (s *RequestStream) Read(b []byte) (int, error) { if s.responseBody == nil { return 0, errors.New("http3: invalid use of RequestStream.Read: need to call ReadResponse first") @@ -173,7 +192,77 @@ func (s *RequestStream) Read(b []byte) (int, error) { return s.responseBody.Read(b) } +// StreamID returns the QUIC stream ID of the underlying QUIC stream. +func (s *RequestStream) StreamID() protocol.StreamID { + return s.str.StreamID() +} + +// Write writes data to the stream. +// +// It can only be used after the request has been sent (using SendRequestHeader). +func (s *RequestStream) Write(b []byte) (int, error) { + return s.str.Write(b) +} + +// Close closes the send-direction of the stream. +// It does not close the receive-direction of the stream. +func (s *RequestStream) Close() error { + return s.str.Close() +} + +// CancelRead aborts receiving on this stream. +// See [quic.Stream.CancelRead] for more details. +func (s *RequestStream) CancelRead(errorCode quic.StreamErrorCode) { + s.str.CancelRead(errorCode) +} + +// CancelWrite aborts sending on this stream. +// See [quic.Stream.CancelWrite] for more details. +func (s *RequestStream) CancelWrite(errorCode quic.StreamErrorCode) { + s.str.CancelWrite(errorCode) +} + +// Context returns a context derived from the underlying QUIC stream's context. +// See [quic.Stream.Context] for more details. +func (s *RequestStream) Context() context.Context { + return s.str.Context() +} + +// SetReadDeadline sets the deadline for Read calls. +func (s *RequestStream) SetReadDeadline(t time.Time) error { + return s.str.SetReadDeadline(t) +} + +// SetWriteDeadline sets the deadline for Write calls. +func (s *RequestStream) SetWriteDeadline(t time.Time) error { + return s.str.SetWriteDeadline(t) +} + +// SetDeadline sets the read and write deadlines associated with the stream. +// It is equivalent to calling both SetReadDeadline and SetWriteDeadline. +func (s *RequestStream) SetDeadline(t time.Time) error { + return s.str.SetDeadline(t) +} + +// SendDatagrams send a new HTTP Datagram (RFC 9297). +// +// It is only possible to send datagrams if the server enabled support for this extension. +// It is recommended (though not required) to send the request before calling this method, +// as the server might drop datagrams which it can't associate with an existing request. +func (s *RequestStream) SendDatagram(b []byte) error { + return s.str.SendDatagram(b) +} + +// ReceiveDatagram receives HTTP Datagrams (RFC 9297). +// +// It is only possible if support for HTTP Datagrams was enabled, using the EnableDatagram +// option on the [Transport]. +func (s *RequestStream) ReceiveDatagram(ctx context.Context) ([]byte, error) { + return s.str.ReceiveDatagram(ctx) +} + // SendRequestHeader sends the HTTP request. +// // It is invalid to call it more than once. // It is invalid to call it after Write has been called. func (s *RequestStream) SendRequestHeader(req *http.Request) error { @@ -186,17 +275,18 @@ func (s *RequestStream) SendRequestHeader(req *http.Request) error { } s.isConnect = req.Method == http.MethodConnect s.sentRequest = true - return s.requestWriter.WriteRequestHeader(s.datagramStream, req, s.requestedGzip) + return s.requestWriter.WriteRequestHeader(s.str.datagramStream, req, s.requestedGzip) } // ReadResponse reads the HTTP response from the stream. +// // It is invalid to call it more than once. // It doesn't set Response.Request and Response.TLS. // It is invalid to call it after Read has been called. func (s *RequestStream) ReadResponse() (*http.Response, error) { - qstr := s.datagramStream + qstr := s.str.datagramStream fp := &frameParser{ - closeConn: s.conn.CloseWithError, + closeConn: s.str.conn.CloseWithError, r: &tracingReader{ Reader: qstr, first: &s.firstByte, @@ -205,42 +295,42 @@ func (s *RequestStream) ReadResponse() (*http.Response, error) { } frame, err := fp.ParseNext() if err != nil { - s.CancelRead(quic.StreamErrorCode(ErrCodeFrameError)) - s.CancelWrite(quic.StreamErrorCode(ErrCodeFrameError)) + s.str.CancelRead(quic.StreamErrorCode(ErrCodeFrameError)) + s.str.CancelWrite(quic.StreamErrorCode(ErrCodeFrameError)) return nil, fmt.Errorf("http3: parsing frame failed: %w", err) } hf, ok := frame.(*headersFrame) if !ok { - s.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameUnexpected), "expected first frame to be a HEADERS frame") + s.str.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameUnexpected), "expected first frame to be a HEADERS frame") return nil, errors.New("http3: expected first frame to be a HEADERS frame") } if hf.Length > s.maxHeaderBytes { - s.CancelRead(quic.StreamErrorCode(ErrCodeFrameError)) - s.CancelWrite(quic.StreamErrorCode(ErrCodeFrameError)) + s.str.CancelRead(quic.StreamErrorCode(ErrCodeFrameError)) + s.str.CancelWrite(quic.StreamErrorCode(ErrCodeFrameError)) return nil, fmt.Errorf("http3: HEADERS frame too large: %d bytes (max: %d)", hf.Length, s.maxHeaderBytes) } headerBlock := make([]byte, hf.Length) if _, err := io.ReadFull(qstr, headerBlock); err != nil { - s.CancelRead(quic.StreamErrorCode(ErrCodeRequestIncomplete)) - s.CancelWrite(quic.StreamErrorCode(ErrCodeRequestIncomplete)) + s.str.CancelRead(quic.StreamErrorCode(ErrCodeRequestIncomplete)) + s.str.CancelWrite(quic.StreamErrorCode(ErrCodeRequestIncomplete)) return nil, fmt.Errorf("http3: failed to read response headers: %w", err) } hfs, err := s.decoder.DecodeFull(headerBlock) if err != nil { // TODO: use the right error code - s.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeGeneralProtocolError), "") + s.str.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeGeneralProtocolError), "") return nil, fmt.Errorf("http3: failed to decode response headers: %w", err) } res := s.response if err := updateResponseFromHeaders(res, hfs); err != nil { - s.CancelRead(quic.StreamErrorCode(ErrCodeMessageError)) - s.CancelWrite(quic.StreamErrorCode(ErrCodeMessageError)) + s.str.CancelRead(quic.StreamErrorCode(ErrCodeMessageError)) + s.str.CancelWrite(quic.StreamErrorCode(ErrCodeMessageError)) return nil, fmt.Errorf("http3: invalid response: %w", err) } // Check that the server doesn't send more data in DATA frames than indicated by the Content-Length header (if set). // See section 4.1.2 of RFC 9114. - respBody := newResponseBody(s.Stream, res.ContentLength, s.reqDone) + respBody := newResponseBody(s.str, res.ContentLength, s.reqDone) // Rules for when to set Content-Length are defined in https://tools.ietf.org/html/rfc7230#section-3.3.2. isInformational := res.StatusCode >= 100 && res.StatusCode < 200 @@ -262,16 +352,6 @@ func (s *RequestStream) ReadResponse() (*http.Response, error) { return res, nil } -func (s *Stream) SendDatagram(b []byte) error { - // TODO: reject if datagrams are not negotiated (yet) - return s.datagramStream.SendDatagram(b) -} - -func (s *Stream) ReceiveDatagram(ctx context.Context) ([]byte, error) { - // TODO: reject if datagrams are not negotiated (yet) - return s.datagramStream.ReceiveDatagram(ctx) -} - type tracingReader struct { io.Reader first *bool