forked from quic-go/quic-go
http3: remove Settingser, StreamCreator, return Connection from Hijacker (#4425)
This commit is contained in:
@@ -3,7 +3,6 @@ package http3
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
|
||||
"github.com/quic-go/quic-go"
|
||||
)
|
||||
@@ -17,33 +16,10 @@ type HTTPStreamer interface {
|
||||
HTTPStream() Stream
|
||||
}
|
||||
|
||||
type StreamCreator interface {
|
||||
// Context returns a context that is cancelled when the underlying connection is closed.
|
||||
Context() context.Context
|
||||
OpenStream() (quic.Stream, error)
|
||||
OpenStreamSync(context.Context) (quic.Stream, error)
|
||||
OpenUniStream() (quic.SendStream, error)
|
||||
OpenUniStreamSync(context.Context) (quic.SendStream, error)
|
||||
LocalAddr() net.Addr
|
||||
RemoteAddr() net.Addr
|
||||
ConnectionState() quic.ConnectionState
|
||||
}
|
||||
|
||||
var _ StreamCreator = quic.Connection(nil)
|
||||
|
||||
// A Hijacker allows hijacking of the stream creating part of a quic.Session from a http.Response.Body.
|
||||
// It is used by WebTransport to create WebTransport streams after a session has been established.
|
||||
type Hijacker interface {
|
||||
StreamCreator() StreamCreator
|
||||
}
|
||||
|
||||
// Settingser allows the server to retrieve the client's SETTINGS.
|
||||
// The http.Request.Body implements this interface.
|
||||
type Settingser interface {
|
||||
// Settings returns the client's HTTP settings.
|
||||
// It blocks until the SETTINGS frame has been received.
|
||||
// Note that it is not guaranteed that this happens during the lifetime of the request.
|
||||
Settings(context.Context) (*Settings, error)
|
||||
Connection() Connection
|
||||
}
|
||||
|
||||
// The body is used in the requestBody (for a http.Request) and the responseBody (for a http.Response).
|
||||
@@ -83,7 +59,6 @@ type requestBody struct {
|
||||
var (
|
||||
_ io.ReadCloser = &requestBody{}
|
||||
_ HTTPStreamer = &requestBody{}
|
||||
_ Settingser = &requestBody{}
|
||||
)
|
||||
|
||||
func newRequestBody(str Stream, connCtx context.Context, rcvdSettings <-chan struct{}, getSettings func() *Settings) *requestBody {
|
||||
@@ -95,20 +70,8 @@ func newRequestBody(str Stream, connCtx context.Context, rcvdSettings <-chan str
|
||||
}
|
||||
}
|
||||
|
||||
func (r *requestBody) Settings(ctx context.Context) (*Settings, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, context.Cause(ctx)
|
||||
case <-r.connCtx.Done():
|
||||
return nil, context.Cause(r.connCtx)
|
||||
case <-r.rcvdSettings:
|
||||
return r.getSettings(), nil
|
||||
}
|
||||
}
|
||||
|
||||
type hijackableBody struct {
|
||||
body body
|
||||
conn quic.Connection // only needed to implement Hijacker
|
||||
|
||||
// only set for the http.Response
|
||||
// The channel is closed when the user is done with this response:
|
||||
@@ -117,16 +80,12 @@ type hijackableBody struct {
|
||||
reqDoneClosed bool
|
||||
}
|
||||
|
||||
var (
|
||||
_ io.ReadCloser = &hijackableBody{}
|
||||
_ Hijacker = &hijackableBody{}
|
||||
)
|
||||
var _ io.ReadCloser = &hijackableBody{}
|
||||
|
||||
func newResponseBody(str Stream, conn quic.Connection, done chan<- struct{}) *hijackableBody {
|
||||
func newResponseBody(str Stream, done chan<- struct{}) *hijackableBody {
|
||||
return &hijackableBody{
|
||||
body: body{str: str},
|
||||
reqDone: done,
|
||||
conn: conn,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,5 +113,3 @@ func (r *hijackableBody) Close() error {
|
||||
r.body.str.CancelRead(quic.StreamErrorCode(ErrCodeRequestCanceled))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *hijackableBody) StreamCreator() StreamCreator { return r.conn }
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package http3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/quic-go/quic-go"
|
||||
@@ -12,29 +11,6 @@ import (
|
||||
"go.uber.org/mock/gomock"
|
||||
)
|
||||
|
||||
var _ = Describe("Request Body", func() {
|
||||
It("makes the SETTINGS available", func() {
|
||||
str := mockquic.NewMockStream(mockCtrl)
|
||||
rcvdSettings := make(chan struct{})
|
||||
close(rcvdSettings)
|
||||
settings := &Settings{EnableExtendedConnect: true}
|
||||
body := newRequestBody(str, context.Background(), rcvdSettings, func() *Settings { return settings })
|
||||
s, err := body.Settings(context.Background())
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(s).To(Equal(settings))
|
||||
})
|
||||
|
||||
It("unblocks Settings() when the connection is closed", func() {
|
||||
str := mockquic.NewMockStream(mockCtrl)
|
||||
ctx, cancel := context.WithCancelCause(context.Background())
|
||||
testErr := errors.New("test error")
|
||||
cancel(testErr)
|
||||
body := newRequestBody(str, ctx, make(chan struct{}), func() *Settings { return nil })
|
||||
_, err := body.Settings(context.Background())
|
||||
Expect(err).To(MatchError(testErr))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("Response Body", func() {
|
||||
var reqDone chan struct{}
|
||||
|
||||
@@ -43,7 +19,7 @@ var _ = Describe("Response Body", func() {
|
||||
It("closes the reqDone channel when Read errors", func() {
|
||||
str := mockquic.NewMockStream(mockCtrl)
|
||||
str.EXPECT().Read(gomock.Any()).Return(0, errors.New("test error"))
|
||||
rb := newResponseBody(str, nil, reqDone)
|
||||
rb := newResponseBody(str, reqDone)
|
||||
_, err := rb.Read([]byte{0})
|
||||
Expect(err).To(MatchError("test error"))
|
||||
Expect(reqDone).To(BeClosed())
|
||||
@@ -52,7 +28,7 @@ var _ = Describe("Response Body", func() {
|
||||
It("allows multiple calls to Read, when Read errors", func() {
|
||||
str := mockquic.NewMockStream(mockCtrl)
|
||||
str.EXPECT().Read(gomock.Any()).Return(0, errors.New("test error")).Times(2)
|
||||
rb := newResponseBody(str, nil, reqDone)
|
||||
rb := newResponseBody(str, reqDone)
|
||||
_, err := rb.Read([]byte{0})
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(reqDone).To(BeClosed())
|
||||
@@ -62,14 +38,14 @@ var _ = Describe("Response Body", func() {
|
||||
|
||||
It("closes responses", func() {
|
||||
str := mockquic.NewMockStream(mockCtrl)
|
||||
rb := newResponseBody(str, nil, reqDone)
|
||||
rb := newResponseBody(str, reqDone)
|
||||
str.EXPECT().CancelRead(quic.StreamErrorCode(ErrCodeRequestCanceled))
|
||||
Expect(rb.Close()).To(Succeed())
|
||||
})
|
||||
|
||||
It("allows multiple calls to Close", func() {
|
||||
str := mockquic.NewMockStream(mockCtrl)
|
||||
rb := newResponseBody(str, nil, reqDone)
|
||||
rb := newResponseBody(str, reqDone)
|
||||
str.EXPECT().CancelRead(quic.StreamErrorCode(ErrCodeRequestCanceled)).MaxTimes(2)
|
||||
Expect(rb.Close()).To(Succeed())
|
||||
Expect(reqDone).To(BeClosed())
|
||||
|
||||
@@ -10,6 +10,14 @@ import (
|
||||
"github.com/quic-go/quic-go/quicvarint"
|
||||
)
|
||||
|
||||
type Connection interface {
|
||||
quic.Connection
|
||||
// ReceivedSettings returns a channel that is closed once the client's SETTINGS frame was received.
|
||||
ReceivedSettings() <-chan struct{}
|
||||
// Settings returns the settings received on this connection.
|
||||
Settings() *Settings
|
||||
}
|
||||
|
||||
type connection struct {
|
||||
quic.Connection
|
||||
|
||||
|
||||
@@ -210,7 +210,7 @@ func (s *requestStream) ReadResponse() (*http.Response, error) {
|
||||
} else {
|
||||
httpStr = s.stream
|
||||
}
|
||||
respBody := newResponseBody(httpStr, s.conn, s.reqDone)
|
||||
respBody := newResponseBody(httpStr, s.reqDone)
|
||||
|
||||
// Rules for when to set Content-Length are defined in https://tools.ietf.org/html/rfc7230#section-3.3.2.
|
||||
_, hasTransferEncoding := res.Header["Transfer-Encoding"]
|
||||
|
||||
@@ -63,7 +63,7 @@ func (hw *headerWriter) Write(p []byte) (int, error) {
|
||||
|
||||
type responseWriter struct {
|
||||
*headerWriter
|
||||
conn quic.Connection
|
||||
conn Connection
|
||||
bufferedStr *bufio.Writer
|
||||
buf []byte
|
||||
|
||||
@@ -79,7 +79,7 @@ var (
|
||||
_ Hijacker = &responseWriter{}
|
||||
)
|
||||
|
||||
func newResponseWriter(str quic.Stream, conn quic.Connection, logger utils.Logger) *responseWriter {
|
||||
func newResponseWriter(str quic.Stream, conn Connection, logger utils.Logger) *responseWriter {
|
||||
hw := &headerWriter{
|
||||
str: str,
|
||||
header: http.Header{},
|
||||
@@ -196,7 +196,7 @@ func (w *responseWriter) Flush() {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *responseWriter) StreamCreator() StreamCreator {
|
||||
func (w *responseWriter) Connection() Connection {
|
||||
return w.conn
|
||||
}
|
||||
|
||||
|
||||
@@ -590,13 +590,13 @@ var _ = Describe("HTTP tests", func() {
|
||||
settingsChan := make(chan *http3.Settings, 1)
|
||||
mux.HandleFunc("/settings", func(w http.ResponseWriter, r *http.Request) {
|
||||
defer GinkgoRecover()
|
||||
// The http.Request.Body is guaranteed to implement the http3.Settingser interface.
|
||||
settings, err := r.Body.(http3.Settingser).Settings(context.Background())
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
conn := w.(http3.Hijacker).Connection()
|
||||
select {
|
||||
case <-conn.ReceivedSettings():
|
||||
case <-time.After(5 * time.Second):
|
||||
Fail("didn't receive SETTINGS")
|
||||
}
|
||||
settingsChan <- settings
|
||||
settingsChan <- conn.Settings()
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
|
||||
@@ -615,7 +615,8 @@ var _ = Describe("HTTP tests", func() {
|
||||
_, err = rt.RoundTrip(req)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
var settings *http3.Settings
|
||||
Eventually(settingsChan).Should(Receive(&settings))
|
||||
Expect(settingsChan).To(Receive(&settings))
|
||||
Expect(settings).ToNot(BeNil())
|
||||
Expect(settings.EnableDatagram).To(BeTrue())
|
||||
Expect(settings.EnableExtendedConnect).To(BeFalse())
|
||||
Expect(settings.Other).To(HaveKeyWithValue(uint64(1337), uint64(42)))
|
||||
|
||||
Reference in New Issue
Block a user