forked from quic-go/quic-go
http3: explicitly expose all method on the RequestStream (#5216)
This commit is contained in:
@@ -344,6 +344,44 @@ func (c *MockDatagramStreamSendDatagramCall) DoAndReturn(f func([]byte) error) *
|
||||
return c
|
||||
}
|
||||
|
||||
// SetDeadline mocks base method.
|
||||
func (m *MockDatagramStream) SetDeadline(arg0 time.Time) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SetDeadline", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SetDeadline indicates an expected call of SetDeadline.
|
||||
func (mr *MockDatagramStreamMockRecorder) SetDeadline(arg0 any) *MockDatagramStreamSetDeadlineCall {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetDeadline", reflect.TypeOf((*MockDatagramStream)(nil).SetDeadline), arg0)
|
||||
return &MockDatagramStreamSetDeadlineCall{Call: call}
|
||||
}
|
||||
|
||||
// MockDatagramStreamSetDeadlineCall wrap *gomock.Call
|
||||
type MockDatagramStreamSetDeadlineCall struct {
|
||||
*gomock.Call
|
||||
}
|
||||
|
||||
// Return rewrite *gomock.Call.Return
|
||||
func (c *MockDatagramStreamSetDeadlineCall) Return(arg0 error) *MockDatagramStreamSetDeadlineCall {
|
||||
c.Call = c.Call.Return(arg0)
|
||||
return c
|
||||
}
|
||||
|
||||
// Do rewrite *gomock.Call.Do
|
||||
func (c *MockDatagramStreamSetDeadlineCall) Do(f func(time.Time) error) *MockDatagramStreamSetDeadlineCall {
|
||||
c.Call = c.Call.Do(f)
|
||||
return c
|
||||
}
|
||||
|
||||
// DoAndReturn rewrite *gomock.Call.DoAndReturn
|
||||
func (c *MockDatagramStreamSetDeadlineCall) DoAndReturn(f func(time.Time) error) *MockDatagramStreamSetDeadlineCall {
|
||||
c.Call = c.Call.DoAndReturn(f)
|
||||
return c
|
||||
}
|
||||
|
||||
// SetReadDeadline mocks base method.
|
||||
func (m *MockDatagramStream) SetReadDeadline(arg0 time.Time) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
||||
134
http3/stream.go
134
http3/stream.go
@@ -21,6 +21,7 @@ type datagramStream interface {
|
||||
CancelWrite(quic.StreamErrorCode)
|
||||
StreamID() quic.StreamID
|
||||
Context() context.Context
|
||||
SetDeadline(time.Time) error
|
||||
SetReadDeadline(time.Time) error
|
||||
SetWriteDeadline(time.Time) error
|
||||
SendDatagram(b []byte) error
|
||||
@@ -29,7 +30,8 @@ type datagramStream interface {
|
||||
QUICStream() *quic.Stream
|
||||
}
|
||||
|
||||
// A Stream is an HTTP/3 request stream.
|
||||
// A Stream is an HTTP/3 stream.
|
||||
//
|
||||
// When writing to and reading from the stream, data is framed in HTTP/3 DATA frames.
|
||||
type Stream struct {
|
||||
datagramStream
|
||||
@@ -121,12 +123,25 @@ func (s *Stream) StreamID() protocol.StreamID {
|
||||
return s.datagramStream.StreamID()
|
||||
}
|
||||
|
||||
func (s *Stream) SendDatagram(b []byte) error {
|
||||
// TODO: reject if datagrams are not negotiated (yet)
|
||||
return s.datagramStream.SendDatagram(b)
|
||||
}
|
||||
|
||||
func (s *Stream) ReceiveDatagram(ctx context.Context) ([]byte, error) {
|
||||
// TODO: reject if datagrams are not negotiated (yet)
|
||||
return s.datagramStream.ReceiveDatagram(ctx)
|
||||
}
|
||||
|
||||
// A RequestStream is a low-level abstraction representing an HTTP/3 request stream.
|
||||
// It decouples sending of the HTTP request from reading the HTTP response, allowing
|
||||
// the application to optimistically use the stream (and, for example, send datagrams)
|
||||
// before receiving the response.
|
||||
//
|
||||
// This is only needed for advanced use case, e.g. WebTransport and the various
|
||||
// MASQUE proxying protocols.
|
||||
type RequestStream struct {
|
||||
*Stream
|
||||
str *Stream
|
||||
|
||||
responseBody io.ReadCloser // set by ReadResponse
|
||||
|
||||
@@ -155,7 +170,7 @@ func newRequestStream(
|
||||
trace *httptrace.ClientTrace,
|
||||
) *RequestStream {
|
||||
return &RequestStream{
|
||||
Stream: str,
|
||||
str: str,
|
||||
requestWriter: requestWriter,
|
||||
reqDone: reqDone,
|
||||
decoder: decoder,
|
||||
@@ -166,6 +181,10 @@ func newRequestStream(
|
||||
}
|
||||
}
|
||||
|
||||
// Read reads data from the underlying stream.
|
||||
//
|
||||
// It can only be used after the request has been sent (using SendRequestHeader)
|
||||
// and the response has been consumed (using ReadResponse).
|
||||
func (s *RequestStream) Read(b []byte) (int, error) {
|
||||
if s.responseBody == nil {
|
||||
return 0, errors.New("http3: invalid use of RequestStream.Read: need to call ReadResponse first")
|
||||
@@ -173,7 +192,77 @@ func (s *RequestStream) Read(b []byte) (int, error) {
|
||||
return s.responseBody.Read(b)
|
||||
}
|
||||
|
||||
// StreamID returns the QUIC stream ID of the underlying QUIC stream.
|
||||
func (s *RequestStream) StreamID() protocol.StreamID {
|
||||
return s.str.StreamID()
|
||||
}
|
||||
|
||||
// Write writes data to the stream.
|
||||
//
|
||||
// It can only be used after the request has been sent (using SendRequestHeader).
|
||||
func (s *RequestStream) Write(b []byte) (int, error) {
|
||||
return s.str.Write(b)
|
||||
}
|
||||
|
||||
// Close closes the send-direction of the stream.
|
||||
// It does not close the receive-direction of the stream.
|
||||
func (s *RequestStream) Close() error {
|
||||
return s.str.Close()
|
||||
}
|
||||
|
||||
// CancelRead aborts receiving on this stream.
|
||||
// See [quic.Stream.CancelRead] for more details.
|
||||
func (s *RequestStream) CancelRead(errorCode quic.StreamErrorCode) {
|
||||
s.str.CancelRead(errorCode)
|
||||
}
|
||||
|
||||
// CancelWrite aborts sending on this stream.
|
||||
// See [quic.Stream.CancelWrite] for more details.
|
||||
func (s *RequestStream) CancelWrite(errorCode quic.StreamErrorCode) {
|
||||
s.str.CancelWrite(errorCode)
|
||||
}
|
||||
|
||||
// Context returns a context derived from the underlying QUIC stream's context.
|
||||
// See [quic.Stream.Context] for more details.
|
||||
func (s *RequestStream) Context() context.Context {
|
||||
return s.str.Context()
|
||||
}
|
||||
|
||||
// SetReadDeadline sets the deadline for Read calls.
|
||||
func (s *RequestStream) SetReadDeadline(t time.Time) error {
|
||||
return s.str.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
// SetWriteDeadline sets the deadline for Write calls.
|
||||
func (s *RequestStream) SetWriteDeadline(t time.Time) error {
|
||||
return s.str.SetWriteDeadline(t)
|
||||
}
|
||||
|
||||
// SetDeadline sets the read and write deadlines associated with the stream.
|
||||
// It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
|
||||
func (s *RequestStream) SetDeadline(t time.Time) error {
|
||||
return s.str.SetDeadline(t)
|
||||
}
|
||||
|
||||
// SendDatagrams send a new HTTP Datagram (RFC 9297).
|
||||
//
|
||||
// It is only possible to send datagrams if the server enabled support for this extension.
|
||||
// It is recommended (though not required) to send the request before calling this method,
|
||||
// as the server might drop datagrams which it can't associate with an existing request.
|
||||
func (s *RequestStream) SendDatagram(b []byte) error {
|
||||
return s.str.SendDatagram(b)
|
||||
}
|
||||
|
||||
// ReceiveDatagram receives HTTP Datagrams (RFC 9297).
|
||||
//
|
||||
// It is only possible if support for HTTP Datagrams was enabled, using the EnableDatagram
|
||||
// option on the [Transport].
|
||||
func (s *RequestStream) ReceiveDatagram(ctx context.Context) ([]byte, error) {
|
||||
return s.str.ReceiveDatagram(ctx)
|
||||
}
|
||||
|
||||
// SendRequestHeader sends the HTTP request.
|
||||
//
|
||||
// It is invalid to call it more than once.
|
||||
// It is invalid to call it after Write has been called.
|
||||
func (s *RequestStream) SendRequestHeader(req *http.Request) error {
|
||||
@@ -186,17 +275,18 @@ func (s *RequestStream) SendRequestHeader(req *http.Request) error {
|
||||
}
|
||||
s.isConnect = req.Method == http.MethodConnect
|
||||
s.sentRequest = true
|
||||
return s.requestWriter.WriteRequestHeader(s.datagramStream, req, s.requestedGzip)
|
||||
return s.requestWriter.WriteRequestHeader(s.str.datagramStream, req, s.requestedGzip)
|
||||
}
|
||||
|
||||
// ReadResponse reads the HTTP response from the stream.
|
||||
//
|
||||
// It is invalid to call it more than once.
|
||||
// It doesn't set Response.Request and Response.TLS.
|
||||
// It is invalid to call it after Read has been called.
|
||||
func (s *RequestStream) ReadResponse() (*http.Response, error) {
|
||||
qstr := s.datagramStream
|
||||
qstr := s.str.datagramStream
|
||||
fp := &frameParser{
|
||||
closeConn: s.conn.CloseWithError,
|
||||
closeConn: s.str.conn.CloseWithError,
|
||||
r: &tracingReader{
|
||||
Reader: qstr,
|
||||
first: &s.firstByte,
|
||||
@@ -205,42 +295,42 @@ func (s *RequestStream) ReadResponse() (*http.Response, error) {
|
||||
}
|
||||
frame, err := fp.ParseNext()
|
||||
if err != nil {
|
||||
s.CancelRead(quic.StreamErrorCode(ErrCodeFrameError))
|
||||
s.CancelWrite(quic.StreamErrorCode(ErrCodeFrameError))
|
||||
s.str.CancelRead(quic.StreamErrorCode(ErrCodeFrameError))
|
||||
s.str.CancelWrite(quic.StreamErrorCode(ErrCodeFrameError))
|
||||
return nil, fmt.Errorf("http3: parsing frame failed: %w", err)
|
||||
}
|
||||
hf, ok := frame.(*headersFrame)
|
||||
if !ok {
|
||||
s.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameUnexpected), "expected first frame to be a HEADERS frame")
|
||||
s.str.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameUnexpected), "expected first frame to be a HEADERS frame")
|
||||
return nil, errors.New("http3: expected first frame to be a HEADERS frame")
|
||||
}
|
||||
if hf.Length > s.maxHeaderBytes {
|
||||
s.CancelRead(quic.StreamErrorCode(ErrCodeFrameError))
|
||||
s.CancelWrite(quic.StreamErrorCode(ErrCodeFrameError))
|
||||
s.str.CancelRead(quic.StreamErrorCode(ErrCodeFrameError))
|
||||
s.str.CancelWrite(quic.StreamErrorCode(ErrCodeFrameError))
|
||||
return nil, fmt.Errorf("http3: HEADERS frame too large: %d bytes (max: %d)", hf.Length, s.maxHeaderBytes)
|
||||
}
|
||||
headerBlock := make([]byte, hf.Length)
|
||||
if _, err := io.ReadFull(qstr, headerBlock); err != nil {
|
||||
s.CancelRead(quic.StreamErrorCode(ErrCodeRequestIncomplete))
|
||||
s.CancelWrite(quic.StreamErrorCode(ErrCodeRequestIncomplete))
|
||||
s.str.CancelRead(quic.StreamErrorCode(ErrCodeRequestIncomplete))
|
||||
s.str.CancelWrite(quic.StreamErrorCode(ErrCodeRequestIncomplete))
|
||||
return nil, fmt.Errorf("http3: failed to read response headers: %w", err)
|
||||
}
|
||||
hfs, err := s.decoder.DecodeFull(headerBlock)
|
||||
if err != nil {
|
||||
// TODO: use the right error code
|
||||
s.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeGeneralProtocolError), "")
|
||||
s.str.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeGeneralProtocolError), "")
|
||||
return nil, fmt.Errorf("http3: failed to decode response headers: %w", err)
|
||||
}
|
||||
res := s.response
|
||||
if err := updateResponseFromHeaders(res, hfs); err != nil {
|
||||
s.CancelRead(quic.StreamErrorCode(ErrCodeMessageError))
|
||||
s.CancelWrite(quic.StreamErrorCode(ErrCodeMessageError))
|
||||
s.str.CancelRead(quic.StreamErrorCode(ErrCodeMessageError))
|
||||
s.str.CancelWrite(quic.StreamErrorCode(ErrCodeMessageError))
|
||||
return nil, fmt.Errorf("http3: invalid response: %w", err)
|
||||
}
|
||||
|
||||
// Check that the server doesn't send more data in DATA frames than indicated by the Content-Length header (if set).
|
||||
// See section 4.1.2 of RFC 9114.
|
||||
respBody := newResponseBody(s.Stream, res.ContentLength, s.reqDone)
|
||||
respBody := newResponseBody(s.str, res.ContentLength, s.reqDone)
|
||||
|
||||
// Rules for when to set Content-Length are defined in https://tools.ietf.org/html/rfc7230#section-3.3.2.
|
||||
isInformational := res.StatusCode >= 100 && res.StatusCode < 200
|
||||
@@ -262,16 +352,6 @@ func (s *RequestStream) ReadResponse() (*http.Response, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *Stream) SendDatagram(b []byte) error {
|
||||
// TODO: reject if datagrams are not negotiated (yet)
|
||||
return s.datagramStream.SendDatagram(b)
|
||||
}
|
||||
|
||||
func (s *Stream) ReceiveDatagram(ctx context.Context) ([]byte, error) {
|
||||
// TODO: reject if datagrams are not negotiated (yet)
|
||||
return s.datagramStream.ReceiveDatagram(ctx)
|
||||
}
|
||||
|
||||
type tracingReader struct {
|
||||
io.Reader
|
||||
first *bool
|
||||
|
||||
Reference in New Issue
Block a user