forked from quic-go/quic-go
http3: convert Connection interface to Conn struct (#5204)
This commit is contained in:
117
http3/conn.go
117
http3/conn.go
@@ -24,28 +24,11 @@ const maxQuarterStreamID = 1<<60 - 1
|
||||
|
||||
var errGoAway = errors.New("connection in graceful shutdown")
|
||||
|
||||
// Connection is an HTTP/3 connection.
|
||||
// Conn is an HTTP/3 connection.
|
||||
// It has all methods from the quic.Conn expect for AcceptStream, AcceptUniStream,
|
||||
// SendDatagram and ReceiveDatagram.
|
||||
type Connection interface {
|
||||
OpenStream() (*quic.Stream, error)
|
||||
OpenStreamSync(context.Context) (*quic.Stream, error)
|
||||
OpenUniStream() (*quic.SendStream, error)
|
||||
OpenUniStreamSync(context.Context) (*quic.SendStream, error)
|
||||
LocalAddr() net.Addr
|
||||
RemoteAddr() net.Addr
|
||||
CloseWithError(quic.ApplicationErrorCode, string) error
|
||||
Context() context.Context
|
||||
ConnectionState() quic.ConnectionState
|
||||
|
||||
// ReceivedSettings returns a channel that is closed once the client's SETTINGS frame was received.
|
||||
ReceivedSettings() <-chan struct{}
|
||||
// Settings returns the settings received on this connection.
|
||||
Settings() *Settings
|
||||
}
|
||||
|
||||
type connection struct {
|
||||
*quic.Conn
|
||||
type Conn struct {
|
||||
conn *quic.Conn
|
||||
|
||||
ctx context.Context
|
||||
|
||||
@@ -75,10 +58,10 @@ func newConnection(
|
||||
perspective protocol.Perspective,
|
||||
logger *slog.Logger,
|
||||
idleTimeout time.Duration,
|
||||
) *connection {
|
||||
c := &connection{
|
||||
) *Conn {
|
||||
c := &Conn{
|
||||
ctx: ctx,
|
||||
Conn: quicConn,
|
||||
conn: quicConn,
|
||||
perspective: perspective,
|
||||
logger: logger,
|
||||
idleTimeout: idleTimeout,
|
||||
@@ -95,11 +78,43 @@ func newConnection(
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *connection) onIdleTimer() {
|
||||
func (c *Conn) OpenStream() (*quic.Stream, error) {
|
||||
return c.conn.OpenStream()
|
||||
}
|
||||
|
||||
func (c *Conn) OpenStreamSync(ctx context.Context) (*quic.Stream, error) {
|
||||
return c.conn.OpenStreamSync(ctx)
|
||||
}
|
||||
|
||||
func (c *Conn) OpenUniStream() (*quic.SendStream, error) {
|
||||
return c.conn.OpenUniStream()
|
||||
}
|
||||
|
||||
func (c *Conn) OpenUniStreamSync(ctx context.Context) (*quic.SendStream, error) {
|
||||
return c.conn.OpenUniStreamSync(ctx)
|
||||
}
|
||||
|
||||
func (c *Conn) LocalAddr() net.Addr {
|
||||
return c.conn.LocalAddr()
|
||||
}
|
||||
|
||||
func (c *Conn) RemoteAddr() net.Addr {
|
||||
return c.conn.RemoteAddr()
|
||||
}
|
||||
|
||||
func (c *Conn) HandshakeComplete() <-chan struct{} {
|
||||
return c.conn.HandshakeComplete()
|
||||
}
|
||||
|
||||
func (c *Conn) ConnectionState() quic.ConnectionState {
|
||||
return c.conn.ConnectionState()
|
||||
}
|
||||
|
||||
func (c *Conn) onIdleTimer() {
|
||||
c.CloseWithError(quic.ApplicationErrorCode(ErrCodeNoError), "idle timeout")
|
||||
}
|
||||
|
||||
func (c *connection) clearStream(id quic.StreamID) {
|
||||
func (c *Conn) clearStream(id quic.StreamID) {
|
||||
c.streamMx.Lock()
|
||||
defer c.streamMx.Unlock()
|
||||
|
||||
@@ -116,7 +131,7 @@ func (c *connection) clearStream(id quic.StreamID) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *connection) openRequestStream(
|
||||
func (c *Conn) openRequestStream(
|
||||
ctx context.Context,
|
||||
requestWriter *requestWriter,
|
||||
reqDone chan<- struct{},
|
||||
@@ -168,7 +183,7 @@ func (c *connection) openRequestStream(
|
||||
), nil
|
||||
}
|
||||
|
||||
func (c *connection) decodeTrailers(r io.Reader, l, maxHeaderBytes uint64) (http.Header, error) {
|
||||
func (c *Conn) decodeTrailers(r io.Reader, l, maxHeaderBytes uint64) (http.Header, error) {
|
||||
if l > maxHeaderBytes {
|
||||
return nil, fmt.Errorf("HEADERS frame too large: %d bytes (max: %d)", l, maxHeaderBytes)
|
||||
}
|
||||
@@ -185,8 +200,8 @@ func (c *connection) decodeTrailers(r io.Reader, l, maxHeaderBytes uint64) (http
|
||||
}
|
||||
|
||||
// only used by the server
|
||||
func (c *connection) acceptStream(ctx context.Context) (*stateTrackingStream, error) {
|
||||
str, err := c.AcceptStream(ctx)
|
||||
func (c *Conn) acceptStream(ctx context.Context) (*stateTrackingStream, error) {
|
||||
str, err := c.conn.AcceptStream(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -203,14 +218,14 @@ func (c *connection) acceptStream(ctx context.Context) (*stateTrackingStream, er
|
||||
return hstr, nil
|
||||
}
|
||||
|
||||
func (c *connection) CloseWithError(code quic.ApplicationErrorCode, msg string) error {
|
||||
func (c *Conn) CloseWithError(code quic.ApplicationErrorCode, msg string) error {
|
||||
if c.idleTimer != nil {
|
||||
c.idleTimer.Stop()
|
||||
}
|
||||
return c.Conn.CloseWithError(code, msg)
|
||||
return c.conn.CloseWithError(code, msg)
|
||||
}
|
||||
|
||||
func (c *connection) handleUnidirectionalStreams(hijack func(StreamType, quic.ConnectionTracingID, *quic.ReceiveStream, error) (hijacked bool)) {
|
||||
func (c *Conn) handleUnidirectionalStreams(hijack func(StreamType, quic.ConnectionTracingID, *quic.ReceiveStream, error) (hijacked bool)) {
|
||||
var (
|
||||
rcvdControlStr atomic.Bool
|
||||
rcvdQPACKEncoderStr atomic.Bool
|
||||
@@ -218,7 +233,7 @@ func (c *connection) handleUnidirectionalStreams(hijack func(StreamType, quic.Co
|
||||
)
|
||||
|
||||
for {
|
||||
str, err := c.AcceptUniStream(context.Background())
|
||||
str, err := c.conn.AcceptUniStream(context.Background())
|
||||
if err != nil {
|
||||
if c.logger != nil {
|
||||
c.logger.Debug("accepting unidirectional stream failed", "error", err)
|
||||
@@ -279,7 +294,7 @@ func (c *connection) handleUnidirectionalStreams(hijack func(StreamType, quic.Co
|
||||
}
|
||||
// Only a single control stream is allowed.
|
||||
if isFirstControlStr := rcvdControlStr.CompareAndSwap(false, true); !isFirstControlStr {
|
||||
c.Conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "duplicate control stream")
|
||||
c.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "duplicate control stream")
|
||||
return
|
||||
}
|
||||
c.handleControlStream(str)
|
||||
@@ -287,21 +302,21 @@ func (c *connection) handleUnidirectionalStreams(hijack func(StreamType, quic.Co
|
||||
}
|
||||
}
|
||||
|
||||
func (c *connection) handleControlStream(str *quic.ReceiveStream) {
|
||||
fp := &frameParser{closeConn: c.Conn.CloseWithError, r: str}
|
||||
func (c *Conn) handleControlStream(str *quic.ReceiveStream) {
|
||||
fp := &frameParser{closeConn: c.conn.CloseWithError, r: str}
|
||||
f, err := fp.ParseNext()
|
||||
if err != nil {
|
||||
var serr *quic.StreamError
|
||||
if err == io.EOF || errors.As(err, &serr) {
|
||||
c.Conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeClosedCriticalStream), "")
|
||||
c.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeClosedCriticalStream), "")
|
||||
return
|
||||
}
|
||||
c.Conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameError), "")
|
||||
c.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameError), "")
|
||||
return
|
||||
}
|
||||
sf, ok := f.(*settingsFrame)
|
||||
if !ok {
|
||||
c.Conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeMissingSettings), "")
|
||||
c.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeMissingSettings), "")
|
||||
return
|
||||
}
|
||||
c.settings = &Settings{
|
||||
@@ -337,10 +352,10 @@ func (c *connection) handleControlStream(str *quic.ReceiveStream) {
|
||||
if err != nil {
|
||||
var serr *quic.StreamError
|
||||
if err == io.EOF || errors.As(err, &serr) {
|
||||
c.Conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeClosedCriticalStream), "")
|
||||
c.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeClosedCriticalStream), "")
|
||||
return
|
||||
}
|
||||
c.Conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameError), "")
|
||||
c.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameError), "")
|
||||
return
|
||||
}
|
||||
// GOAWAY is the only frame allowed at this point:
|
||||
@@ -348,17 +363,17 @@ func (c *connection) handleControlStream(str *quic.ReceiveStream) {
|
||||
// * we don't support any extension that might add support for more frames
|
||||
goaway, ok := f.(*goAwayFrame)
|
||||
if !ok {
|
||||
c.Conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameUnexpected), "")
|
||||
c.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameUnexpected), "")
|
||||
return
|
||||
}
|
||||
if goaway.StreamID%4 != 0 { // client-initiated, bidirectional streams
|
||||
c.Conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeIDError), "")
|
||||
c.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeIDError), "")
|
||||
return
|
||||
}
|
||||
c.streamMx.Lock()
|
||||
if c.maxStreamID != protocol.InvalidStreamID && goaway.StreamID > c.maxStreamID {
|
||||
c.streamMx.Unlock()
|
||||
c.Conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeIDError), "")
|
||||
c.conn.CloseWithError(quic.ApplicationErrorCode(ErrCodeIDError), "")
|
||||
return
|
||||
}
|
||||
c.maxStreamID = goaway.StreamID
|
||||
@@ -373,17 +388,17 @@ func (c *connection) handleControlStream(str *quic.ReceiveStream) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *connection) sendDatagram(streamID protocol.StreamID, b []byte) error {
|
||||
func (c *Conn) sendDatagram(streamID protocol.StreamID, b []byte) error {
|
||||
// TODO: this creates a lot of garbage and an additional copy
|
||||
data := make([]byte, 0, len(b)+8)
|
||||
data = quicvarint.Append(data, uint64(streamID/4))
|
||||
data = append(data, b...)
|
||||
return c.SendDatagram(data)
|
||||
return c.conn.SendDatagram(data)
|
||||
}
|
||||
|
||||
func (c *connection) receiveDatagrams() error {
|
||||
func (c *Conn) receiveDatagrams() error {
|
||||
for {
|
||||
b, err := c.ReceiveDatagram(context.Background())
|
||||
b, err := c.conn.ReceiveDatagram(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -409,11 +424,11 @@ func (c *connection) receiveDatagrams() error {
|
||||
|
||||
// ReceivedSettings returns a channel that is closed once the peer's SETTINGS frame was received.
|
||||
// Settings can be optained from the Settings method after the channel was closed.
|
||||
func (c *connection) ReceivedSettings() <-chan struct{} { return c.receivedSettings }
|
||||
func (c *Conn) ReceivedSettings() <-chan struct{} { return c.receivedSettings }
|
||||
|
||||
// Settings returns the settings received on this connection.
|
||||
// It is only valid to call this function after the channel returned by ReceivedSettings was closed.
|
||||
func (c *connection) Settings() *Settings { return c.settings }
|
||||
func (c *Conn) Settings() *Settings { return c.settings }
|
||||
|
||||
// Context returns the context of the underlying QUIC connection.
|
||||
func (c *connection) Context() context.Context { return c.ctx }
|
||||
func (c *Conn) Context() context.Context { return c.ctx }
|
||||
|
||||
Reference in New Issue
Block a user