forked from quic-go/quic-go
This makes it consistent with the quic.Config and the config flag on the http3.Server and http3.RoundTripper.
430 lines
15 KiB
Go
430 lines
15 KiB
Go
package http3
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/quic-go/quic-go"
|
|
mockquic "github.com/quic-go/quic-go/internal/mocks/quic"
|
|
"github.com/quic-go/quic-go/internal/protocol"
|
|
"github.com/quic-go/quic-go/internal/qerr"
|
|
"github.com/quic-go/quic-go/quicvarint"
|
|
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
"go.uber.org/mock/gomock"
|
|
)
|
|
|
|
var _ = Describe("Connection", func() {
|
|
Context("control stream handling", func() {
|
|
It("parses the SETTINGS frame", func() {
|
|
qconn := mockquic.NewMockEarlyConnection(mockCtrl)
|
|
qconn.EXPECT().ReceiveDatagram(gomock.Any()).Return(nil, errors.New("no datagrams"))
|
|
conn := newConnection(
|
|
qconn,
|
|
false,
|
|
protocol.PerspectiveServer,
|
|
nil,
|
|
)
|
|
b := quicvarint.Append(nil, streamTypeControlStream)
|
|
b = (&settingsFrame{
|
|
Datagram: true,
|
|
ExtendedConnect: true,
|
|
Other: map[uint64]uint64{1337: 42},
|
|
}).Append(b)
|
|
r := bytes.NewReader(b)
|
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
|
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(controlStr, nil)
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(nil, errors.New("test done"))
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
defer close(done)
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(conn.ReceivedSettings()).Should(BeClosed())
|
|
Expect(conn.Settings().EnableDatagrams).To(BeTrue())
|
|
Expect(conn.Settings().EnableExtendedConnect).To(BeTrue())
|
|
Expect(conn.Settings().Other).To(HaveKeyWithValue(uint64(1337), uint64(42)))
|
|
Eventually(done).Should(BeClosed())
|
|
})
|
|
|
|
It("rejects duplicate control streams", func() {
|
|
qconn := mockquic.NewMockEarlyConnection(mockCtrl)
|
|
conn := newConnection(
|
|
qconn,
|
|
false,
|
|
protocol.PerspectiveServer,
|
|
nil,
|
|
)
|
|
b := quicvarint.Append(nil, streamTypeControlStream)
|
|
b = (&settingsFrame{}).Append(b)
|
|
r1 := bytes.NewReader(b)
|
|
controlStr1 := mockquic.NewMockStream(mockCtrl)
|
|
controlStr1.EXPECT().Read(gomock.Any()).DoAndReturn(r1.Read).AnyTimes()
|
|
r2 := bytes.NewReader(b)
|
|
controlStr2 := mockquic.NewMockStream(mockCtrl)
|
|
controlStr2.EXPECT().Read(gomock.Any()).DoAndReturn(r2.Read).AnyTimes()
|
|
done := make(chan struct{})
|
|
closed := make(chan struct{})
|
|
qconn.EXPECT().CloseWithError(qerr.ApplicationErrorCode(ErrCodeStreamCreationError), "duplicate control stream").Do(func(qerr.ApplicationErrorCode, string) error {
|
|
close(closed)
|
|
return nil
|
|
})
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(controlStr1, nil)
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(controlStr2, nil)
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(nil, errors.New("test done"))
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
defer close(done)
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(closed).Should(BeClosed())
|
|
Eventually(done).Should(BeClosed())
|
|
})
|
|
|
|
for _, t := range []uint64{streamTypeQPACKEncoderStream, streamTypeQPACKDecoderStream} {
|
|
streamType := t
|
|
name := "encoder"
|
|
if streamType == streamTypeQPACKDecoderStream {
|
|
name = "decoder"
|
|
}
|
|
|
|
It(fmt.Sprintf("ignores the QPACK %s streams", name), func() {
|
|
qconn := mockquic.NewMockEarlyConnection(mockCtrl)
|
|
conn := newConnection(
|
|
qconn,
|
|
false,
|
|
protocol.PerspectiveClient,
|
|
nil,
|
|
)
|
|
buf := bytes.NewBuffer(quicvarint.Append(nil, streamType))
|
|
str := mockquic.NewMockStream(mockCtrl)
|
|
str.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(str, nil)
|
|
testDone := make(chan struct{})
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
|
<-testDone
|
|
return nil, errors.New("test done")
|
|
})
|
|
time.Sleep(scaleDuration(20 * time.Millisecond)) // don't EXPECT any calls to str.CancelRead
|
|
close(testDone)
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
defer close(done)
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(done).Should(BeClosed())
|
|
})
|
|
|
|
It(fmt.Sprintf("rejects duplicate QPACK %s streams", name), func() {
|
|
qconn := mockquic.NewMockEarlyConnection(mockCtrl)
|
|
conn := newConnection(
|
|
qconn,
|
|
false,
|
|
protocol.PerspectiveClient,
|
|
nil,
|
|
)
|
|
buf := bytes.NewBuffer(quicvarint.Append(nil, streamType))
|
|
str1 := mockquic.NewMockStream(mockCtrl)
|
|
str1.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
|
|
buf2 := bytes.NewBuffer(quicvarint.Append(nil, streamType))
|
|
str2 := mockquic.NewMockStream(mockCtrl)
|
|
str2.EXPECT().Read(gomock.Any()).DoAndReturn(buf2.Read).AnyTimes()
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(str1, nil)
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(str2, nil)
|
|
testDone := make(chan struct{})
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
|
|
<-testDone
|
|
return nil, errors.New("test done")
|
|
})
|
|
qconn.EXPECT().CloseWithError(qerr.ApplicationErrorCode(ErrCodeStreamCreationError), gomock.Any()).Do(func(qerr.ApplicationErrorCode, string) error {
|
|
close(testDone)
|
|
return nil
|
|
})
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
defer close(done)
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(done).Should(BeClosed())
|
|
})
|
|
}
|
|
|
|
It("resets streams other than the control stream and the QPACK streams", func() {
|
|
qconn := mockquic.NewMockEarlyConnection(mockCtrl)
|
|
conn := newConnection(
|
|
qconn,
|
|
false,
|
|
protocol.PerspectiveServer,
|
|
nil,
|
|
)
|
|
buf := bytes.NewBuffer(quicvarint.Append(nil, 0x1337))
|
|
str := mockquic.NewMockStream(mockCtrl)
|
|
str.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
|
|
reset := make(chan struct{})
|
|
str.EXPECT().CancelRead(quic.StreamErrorCode(ErrCodeStreamCreationError)).Do(func(quic.StreamErrorCode) { close(reset) })
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(str, nil)
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(nil, errors.New("test done"))
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
defer close(done)
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(done).Should(BeClosed())
|
|
Eventually(reset).Should(BeClosed())
|
|
})
|
|
|
|
It("errors when the first frame on the control stream is not a SETTINGS frame", func() {
|
|
qconn := mockquic.NewMockEarlyConnection(mockCtrl)
|
|
conn := newConnection(
|
|
qconn,
|
|
false,
|
|
protocol.PerspectiveServer,
|
|
nil,
|
|
)
|
|
b := quicvarint.Append(nil, streamTypeControlStream)
|
|
b = (&dataFrame{}).Append(b)
|
|
r := bytes.NewReader(b)
|
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
|
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(controlStr, nil)
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(nil, errors.New("test done"))
|
|
closed := make(chan struct{})
|
|
qconn.EXPECT().CloseWithError(quic.ApplicationErrorCode(ErrCodeMissingSettings), gomock.Any()).Do(func(quic.ApplicationErrorCode, string) error {
|
|
close(closed)
|
|
return nil
|
|
})
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
defer close(done)
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(done).Should(BeClosed())
|
|
Eventually(closed).Should(BeClosed())
|
|
})
|
|
|
|
It("errors when parsing the frame on the control stream fails", func() {
|
|
qconn := mockquic.NewMockEarlyConnection(mockCtrl)
|
|
conn := newConnection(
|
|
qconn,
|
|
false,
|
|
protocol.PerspectiveServer,
|
|
nil,
|
|
)
|
|
b := quicvarint.Append(nil, streamTypeControlStream)
|
|
b = (&settingsFrame{}).Append(b)
|
|
r := bytes.NewReader(b[:len(b)-1])
|
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
|
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(controlStr, nil)
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(nil, errors.New("test done"))
|
|
closed := make(chan struct{})
|
|
qconn.EXPECT().CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameError), gomock.Any()).Do(func(code quic.ApplicationErrorCode, _ string) error {
|
|
close(closed)
|
|
return nil
|
|
})
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
defer close(done)
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(done).Should(BeClosed())
|
|
Eventually(closed).Should(BeClosed())
|
|
})
|
|
|
|
for _, pers := range []protocol.Perspective{protocol.PerspectiveServer, protocol.PerspectiveClient} {
|
|
pers := pers
|
|
expectedErr := ErrCodeIDError
|
|
if pers == protocol.PerspectiveClient {
|
|
expectedErr = ErrCodeStreamCreationError
|
|
}
|
|
|
|
It(fmt.Sprintf("errors when parsing the %s opens a push stream", pers), func() {
|
|
qconn := mockquic.NewMockEarlyConnection(mockCtrl)
|
|
conn := newConnection(
|
|
qconn,
|
|
false,
|
|
pers.Opposite(),
|
|
nil,
|
|
)
|
|
buf := bytes.NewBuffer(quicvarint.Append(nil, streamTypePushStream))
|
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
|
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(controlStr, nil)
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(nil, errors.New("test done"))
|
|
closed := make(chan struct{})
|
|
qconn.EXPECT().CloseWithError(quic.ApplicationErrorCode(expectedErr), gomock.Any()).Do(func(quic.ApplicationErrorCode, string) error {
|
|
close(closed)
|
|
return nil
|
|
})
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
defer close(done)
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(done).Should(BeClosed())
|
|
Eventually(closed).Should(BeClosed())
|
|
})
|
|
}
|
|
|
|
It("errors when the server advertises datagram support (and we enabled support for it)", func() {
|
|
qconn := mockquic.NewMockEarlyConnection(mockCtrl)
|
|
conn := newConnection(
|
|
qconn,
|
|
true,
|
|
protocol.PerspectiveClient,
|
|
nil,
|
|
)
|
|
b := quicvarint.Append(nil, streamTypeControlStream)
|
|
b = (&settingsFrame{Datagram: true}).Append(b)
|
|
r := bytes.NewReader(b)
|
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
|
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(controlStr, nil)
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(nil, errors.New("test done"))
|
|
qconn.EXPECT().ConnectionState().Return(quic.ConnectionState{SupportsDatagrams: false})
|
|
closed := make(chan struct{})
|
|
qconn.EXPECT().CloseWithError(quic.ApplicationErrorCode(ErrCodeSettingsError), "missing QUIC Datagram support").Do(func(quic.ApplicationErrorCode, string) error {
|
|
close(closed)
|
|
return nil
|
|
})
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
defer close(done)
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(done).Should(BeClosed())
|
|
Eventually(closed).Should(BeClosed())
|
|
})
|
|
})
|
|
|
|
Context("datagram handling", func() {
|
|
var (
|
|
qconn *mockquic.MockEarlyConnection
|
|
conn *connection
|
|
)
|
|
|
|
BeforeEach(func() {
|
|
qconn = mockquic.NewMockEarlyConnection(mockCtrl)
|
|
conn = newConnection(
|
|
qconn,
|
|
true,
|
|
protocol.PerspectiveClient,
|
|
nil,
|
|
)
|
|
b := quicvarint.Append(nil, streamTypeControlStream)
|
|
b = (&settingsFrame{Datagram: true}).Append(b)
|
|
r := bytes.NewReader(b)
|
|
controlStr := mockquic.NewMockStream(mockCtrl)
|
|
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(controlStr, nil).MaxTimes(1)
|
|
qconn.EXPECT().AcceptUniStream(gomock.Any()).Return(nil, errors.New("test done")).MaxTimes(1)
|
|
qconn.EXPECT().ConnectionState().Return(quic.ConnectionState{SupportsDatagrams: true}).MaxTimes(1)
|
|
})
|
|
|
|
It("closes the connection if it can't parse the quarter stream ID", func() {
|
|
qconn.EXPECT().ReceiveDatagram(gomock.Any()).Return([]byte{128}, nil) // return an invalid varint
|
|
done := make(chan struct{})
|
|
qconn.EXPECT().CloseWithError(qerr.ApplicationErrorCode(ErrCodeDatagramError), gomock.Any()).Do(func(qerr.ApplicationErrorCode, string) error {
|
|
close(done)
|
|
return nil
|
|
})
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(done).Should(BeClosed())
|
|
})
|
|
|
|
It("closes the connection if the quarter stream ID is invalid", func() {
|
|
b := quicvarint.Append([]byte{}, maxQuarterStreamID+1)
|
|
qconn.EXPECT().ReceiveDatagram(gomock.Any()).Return(b, nil)
|
|
done := make(chan struct{})
|
|
qconn.EXPECT().CloseWithError(qerr.ApplicationErrorCode(ErrCodeDatagramError), gomock.Any()).Do(func(qerr.ApplicationErrorCode, string) error {
|
|
close(done)
|
|
return nil
|
|
})
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(done).Should(BeClosed())
|
|
})
|
|
|
|
It("drops datagrams for non-existent streams", func() {
|
|
const strID = 4
|
|
// first deliver the datagram...
|
|
b := quicvarint.Append([]byte{}, strID/4)
|
|
b = append(b, []byte("foobar")...)
|
|
delivered := make(chan struct{})
|
|
qconn.EXPECT().ReceiveDatagram(gomock.Any()).DoAndReturn(func(context.Context) ([]byte, error) {
|
|
close(delivered)
|
|
return b, nil
|
|
})
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
Eventually(delivered).Should(BeClosed())
|
|
|
|
// ... then open the stream
|
|
qstr := mockquic.NewMockStream(mockCtrl)
|
|
qstr.EXPECT().StreamID().Return(strID).MinTimes(1)
|
|
qconn.EXPECT().OpenStreamSync(gomock.Any()).Return(qstr, nil)
|
|
str, err := conn.openRequestStream(context.Background(), nil, nil, true, 1000)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
_, err = str.ReceiveDatagram(ctx)
|
|
Expect(err).To(MatchError(context.Canceled))
|
|
})
|
|
|
|
It("delivers datagrams for existing streams", func() {
|
|
const strID = 4
|
|
|
|
// first open the stream...
|
|
qstr := mockquic.NewMockStream(mockCtrl)
|
|
qstr.EXPECT().StreamID().Return(strID).MinTimes(1)
|
|
qconn.EXPECT().OpenStreamSync(gomock.Any()).Return(qstr, nil)
|
|
str, err := conn.openRequestStream(context.Background(), nil, nil, true, 1000)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
// ... then deliver the datagram
|
|
b := quicvarint.Append([]byte{}, strID/4)
|
|
b = append(b, []byte("foobar")...)
|
|
qconn.EXPECT().ReceiveDatagram(gomock.Any()).Return(b, nil)
|
|
qconn.EXPECT().ReceiveDatagram(gomock.Any()).Return(nil, errors.New("test done"))
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
conn.HandleUnidirectionalStreams(nil)
|
|
}()
|
|
|
|
data, err := str.ReceiveDatagram(context.Background())
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(data).To(Equal([]byte("foobar")))
|
|
})
|
|
|
|
It("sends datagrams", func() {
|
|
const strID = 404
|
|
expected := quicvarint.Append([]byte{}, strID/4)
|
|
expected = append(expected, []byte("foobar")...)
|
|
testErr := errors.New("test error")
|
|
qconn.EXPECT().SendDatagram(expected).Return(testErr)
|
|
|
|
Expect(conn.sendDatagram(strID, []byte("foobar"))).To(MatchError(testErr))
|
|
})
|
|
})
|
|
})
|