diff --git a/client.go b/client.go index 6a7ad884..d45e2edc 100644 --- a/client.go +++ b/client.go @@ -107,7 +107,7 @@ func (c *Client) Listen() error { } // OpenStream opens a stream, for client-side created streams (i.e. odd streamIDs) -func (c *Client) OpenStream() (utils.Stream, error) { +func (c *Client) OpenStream() (Stream, error) { return c.session.OpenStream() } diff --git a/h2quic/client.go b/h2quic/client.go index 01ff516a..1e270421 100644 --- a/h2quic/client.go +++ b/h2quic/client.go @@ -21,7 +21,7 @@ import ( ) type quicClient interface { - OpenStream() (utils.Stream, error) + OpenStream() (quic.Stream, error) Close(error) error Listen() error } @@ -37,7 +37,7 @@ type Client struct { encryptionLevel protocol.EncryptionLevel client quicClient - headerStream utils.Stream + headerStream quic.Stream headerErr *qerr.QuicError requestWriter *requestWriter @@ -241,7 +241,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { return res, nil } -func (c *Client) writeRequestBody(dataStream utils.Stream, body io.ReadCloser) (err error) { +func (c *Client) writeRequestBody(dataStream quic.Stream, body io.ReadCloser) (err error) { defer func() { cerr := body.Close() if err == nil { diff --git a/h2quic/client_test.go b/h2quic/client_test.go index ca70e812..38d973ed 100644 --- a/h2quic/client_test.go +++ b/h2quic/client_test.go @@ -9,9 +9,9 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + quic "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/protocol" "github.com/lucas-clemente/quic-go/qerr" - "github.com/lucas-clemente/quic-go/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -25,7 +25,7 @@ type mockQuicClient struct { func (m *mockQuicClient) Close(e error) error { m.closeErr = e; return nil } func (m *mockQuicClient) Listen() error { panic("not implemented") } -func (m *mockQuicClient) OpenStream() (utils.Stream, error) { +func (m *mockQuicClient) OpenStream() (quic.Stream, error) { id := m.nextStream ms := &mockStream{id: id} m.streams[id] = ms diff --git a/h2quic/request_body.go b/h2quic/request_body.go index 41ff5c69..2d4d5954 100644 --- a/h2quic/request_body.go +++ b/h2quic/request_body.go @@ -3,18 +3,18 @@ package h2quic import ( "io" - "github.com/lucas-clemente/quic-go/utils" + quic "github.com/lucas-clemente/quic-go" ) type requestBody struct { requestRead bool - dataStream utils.Stream + dataStream quic.Stream } // make sure the requestBody can be used as a http.Request.Body var _ io.ReadCloser = &requestBody{} -func newRequestBody(stream utils.Stream) *requestBody { +func newRequestBody(stream quic.Stream) *requestBody { return &requestBody{dataStream: stream} } diff --git a/h2quic/request_writer.go b/h2quic/request_writer.go index e837b0f5..834ebbbe 100644 --- a/h2quic/request_writer.go +++ b/h2quic/request_writer.go @@ -12,13 +12,14 @@ import ( "golang.org/x/net/http2/hpack" "golang.org/x/net/lex/httplex" + quic "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/protocol" "github.com/lucas-clemente/quic-go/utils" ) type requestWriter struct { mutex sync.Mutex - headerStream utils.Stream + headerStream quic.Stream henc *hpack.Encoder hbuf bytes.Buffer // HPACK encoder writes into this @@ -26,7 +27,7 @@ type requestWriter struct { const defaultUserAgent = "quic-go" -func newRequestWriter(headerStream utils.Stream) *requestWriter { +func newRequestWriter(headerStream quic.Stream) *requestWriter { rw := &requestWriter{ headerStream: headerStream, } diff --git a/h2quic/response_writer.go b/h2quic/response_writer.go index 7bd804ff..0f043890 100644 --- a/h2quic/response_writer.go +++ b/h2quic/response_writer.go @@ -7,6 +7,7 @@ import ( "strings" "sync" + quic "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/protocol" "github.com/lucas-clemente/quic-go/utils" "golang.org/x/net/http2" @@ -15,9 +16,9 @@ import ( type responseWriter struct { dataStreamID protocol.StreamID - dataStream utils.Stream + dataStream quic.Stream - headerStream utils.Stream + headerStream quic.Stream headerStreamMutex *sync.Mutex header http.Header @@ -25,7 +26,7 @@ type responseWriter struct { headerWritten bool } -func newResponseWriter(headerStream utils.Stream, headerStreamMutex *sync.Mutex, dataStream utils.Stream, dataStreamID protocol.StreamID) *responseWriter { +func newResponseWriter(headerStream quic.Stream, headerStreamMutex *sync.Mutex, dataStream quic.Stream, dataStreamID protocol.StreamID) *responseWriter { return &responseWriter{ header: http.Header{}, headerStream: headerStream, diff --git a/h2quic/server.go b/h2quic/server.go index 087f8722..9796c683 100644 --- a/h2quic/server.go +++ b/h2quic/server.go @@ -21,7 +21,7 @@ import ( type streamCreator interface { quic.Session - GetOrOpenStream(protocol.StreamID) (utils.Stream, error) + GetOrOpenStream(protocol.StreamID) (quic.Stream, error) } // Server is a HTTP2 server listening for QUIC connections. @@ -128,7 +128,7 @@ func (s *Server) handleHeaderStream(session streamCreator) { }() } -func (s *Server) handleRequest(session streamCreator, headerStream utils.Stream, headerStreamMutex *sync.Mutex, hpackDecoder *hpack.Decoder, h2framer *http2.Framer) error { +func (s *Server) handleRequest(session streamCreator, headerStream quic.Stream, headerStreamMutex *sync.Mutex, hpackDecoder *hpack.Decoder, h2framer *http2.Framer) error { h2frame, err := h2framer.ReadFrame() if err != nil { return err diff --git a/h2quic/server_test.go b/h2quic/server_test.go index a3e642ce..c9b7a167 100644 --- a/h2quic/server_test.go +++ b/h2quic/server_test.go @@ -13,10 +13,10 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + quic "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/protocol" "github.com/lucas-clemente/quic-go/qerr" "github.com/lucas-clemente/quic-go/testdata" - "github.com/lucas-clemente/quic-go/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -25,20 +25,20 @@ import ( type mockSession struct { closed bool closedWithError error - dataStream utils.Stream - streamToAccept utils.Stream + dataStream quic.Stream + streamToAccept quic.Stream } -func (s *mockSession) GetOrOpenStream(id protocol.StreamID) (utils.Stream, error) { +func (s *mockSession) GetOrOpenStream(id protocol.StreamID) (quic.Stream, error) { return s.dataStream, nil } -func (s *mockSession) AcceptStream() (utils.Stream, error) { +func (s *mockSession) AcceptStream() (quic.Stream, error) { return s.streamToAccept, nil } -func (s *mockSession) OpenStream() (utils.Stream, error) { +func (s *mockSession) OpenStream() (quic.Stream, error) { panic("not implemented") } -func (s *mockSession) OpenStreamSync() (utils.Stream, error) { +func (s *mockSession) OpenStreamSync() (quic.Stream, error) { panic("not implemented") } func (s *mockSession) Close(e error) error { diff --git a/handshake/crypto_setup_client.go b/handshake/crypto_setup_client.go index 0fc46e50..b860dc94 100644 --- a/handshake/crypto_setup_client.go +++ b/handshake/crypto_setup_client.go @@ -25,7 +25,7 @@ type cryptoSetupClient struct { version protocol.VersionNumber negotiatedVersions []protocol.VersionNumber - cryptoStream utils.Stream + cryptoStream io.ReadWriter serverConfig *serverConfigClient @@ -64,7 +64,7 @@ func NewCryptoSetupClient( hostname string, connID protocol.ConnectionID, version protocol.VersionNumber, - cryptoStream utils.Stream, + cryptoStream io.ReadWriter, tlsConfig *tls.Config, connectionParameters ConnectionParametersManager, aeadChanged chan struct{}, diff --git a/handshake/crypto_setup_server.go b/handshake/crypto_setup_server.go index 09d24d25..0b7024eb 100644 --- a/handshake/crypto_setup_server.go +++ b/handshake/crypto_setup_server.go @@ -36,7 +36,7 @@ type cryptoSetupServer struct { keyDerivation KeyDerivationFunction keyExchange KeyExchangeFunction - cryptoStream utils.Stream + cryptoStream io.ReadWriter connectionParameters ConnectionParametersManager @@ -51,7 +51,7 @@ func NewCryptoSetup( sourceAddr []byte, version protocol.VersionNumber, scfg *ServerConfig, - cryptoStream utils.Stream, + cryptoStream io.ReadWriter, connectionParametersManager ConnectionParametersManager, aeadChanged chan struct{}, ) (CryptoSetup, error) { diff --git a/interface.go b/interface.go index c8b1af96..09c5ce90 100644 --- a/interface.go +++ b/interface.go @@ -2,21 +2,32 @@ package quic import ( "crypto/tls" + "io" "net" - "github.com/lucas-clemente/quic-go/utils" + "github.com/lucas-clemente/quic-go/protocol" ) +// Stream is the interface for QUIC streams +type Stream interface { + io.Reader + io.Writer + io.Closer + StreamID() protocol.StreamID + CloseRemote(offset protocol.ByteCount) + Reset(error) +} + // A Session is a QUIC Session type Session interface { // get the next stream opened by the client // first stream returned has StreamID 3 - AcceptStream() (utils.Stream, error) + AcceptStream() (Stream, error) // guaranteed to return the smallest unopened stream // special error for "too many streams, retry later" - OpenStream() (utils.Stream, error) + OpenStream() (Stream, error) // blocks until a new stream can be opened, if the maximum number of stream is opened - OpenStreamSync() (utils.Stream, error) + OpenStreamSync() (Stream, error) RemoteAddr() net.Addr Close(error) error } diff --git a/server_test.go b/server_test.go index dc867e93..98e1998c 100644 --- a/server_test.go +++ b/server_test.go @@ -32,13 +32,13 @@ func (s *mockSession) Close(e error) error { s.closed = true return nil } -func (s *mockSession) AcceptStream() (utils.Stream, error) { +func (s *mockSession) AcceptStream() (Stream, error) { panic("not implemented") } -func (s *mockSession) OpenStream() (utils.Stream, error) { +func (s *mockSession) OpenStream() (Stream, error) { return &stream{streamID: 1337}, nil } -func (s *mockSession) OpenStreamSync() (utils.Stream, error) { +func (s *mockSession) OpenStreamSync() (Stream, error) { panic("not implemented") } func (s *mockSession) RemoteAddr() net.Addr { diff --git a/session.go b/session.go index 17555d96..966ababe 100644 --- a/session.go +++ b/session.go @@ -662,26 +662,26 @@ func (s *session) logPacket(packet *packedPacket) { // GetOrOpenStream either returns an existing stream, a newly opened stream, or nil if a stream with the provided ID is already closed. // Newly opened streams should only originate from the client. To open a stream from the server, OpenStream should be used. -func (s *session) GetOrOpenStream(id protocol.StreamID) (utils.Stream, error) { +func (s *session) GetOrOpenStream(id protocol.StreamID) (Stream, error) { str, err := s.streamsMap.GetOrOpenStream(id) if str != nil { return str, err } - // make sure to return an actual nil value here, not an utils.Stream with value nil + // make sure to return an actual nil value here, not an Stream with value nil return nil, err } // AcceptStream returns the next stream openend by the peer -func (s *session) AcceptStream() (utils.Stream, error) { +func (s *session) AcceptStream() (Stream, error) { return s.streamsMap.AcceptStream() } // OpenStream opens a stream -func (s *session) OpenStream() (utils.Stream, error) { +func (s *session) OpenStream() (Stream, error) { return s.streamsMap.OpenStream() } -func (s *session) OpenStreamSync() (utils.Stream, error) { +func (s *session) OpenStreamSync() (Stream, error) { return s.streamsMap.OpenStreamSync() } diff --git a/session_test.go b/session_test.go index c25f028e..2af56538 100644 --- a/session_test.go +++ b/session_test.go @@ -21,7 +21,6 @@ import ( "github.com/lucas-clemente/quic-go/protocol" "github.com/lucas-clemente/quic-go/qerr" "github.com/lucas-clemente/quic-go/testdata" - "github.com/lucas-clemente/quic-go/utils" ) type mockConnection struct { @@ -571,18 +570,18 @@ var _ = Describe("Session", func() { Context("accepting streams", func() { It("waits for new streams", func() { - var str utils.Stream + var str Stream go func() { defer GinkgoRecover() var err error str, err = sess.AcceptStream() Expect(err).ToNot(HaveOccurred()) }() - Consistently(func() utils.Stream { return str }).Should(BeNil()) + Consistently(func() Stream { return str }).Should(BeNil()) sess.handleStreamFrame(&frames.StreamFrame{ StreamID: 3, }) - Eventually(func() utils.Stream { return str }).ShouldNot(BeNil()) + Eventually(func() Stream { return str }).ShouldNot(BeNil()) Expect(str.StreamID()).To(Equal(protocol.StreamID(3))) }) @@ -1236,8 +1235,8 @@ var _ = Describe("Session", func() { str, err := sess.GetOrOpenStream(9) Expect(err).ToNot(HaveOccurred()) Expect(str).To(BeNil()) - // make sure that the returned value is a plain nil, not an utils.Stream with value nil - _, ok := str.(utils.Stream) + // make sure that the returned value is a plain nil, not an Stream with value nil + _, ok := str.(Stream) Expect(ok).To(BeFalse()) }) diff --git a/streams_map.go b/streams_map.go index 0f102b10..0b9f1453 100644 --- a/streams_map.go +++ b/streams_map.go @@ -8,7 +8,6 @@ import ( "github.com/lucas-clemente/quic-go/handshake" "github.com/lucas-clemente/quic-go/protocol" "github.com/lucas-clemente/quic-go/qerr" - "github.com/lucas-clemente/quic-go/utils" ) type streamsMap struct { @@ -193,10 +192,10 @@ func (m *streamsMap) OpenStreamSync() (*stream, error) { // AcceptStream returns the next stream opened by the peer // it blocks until a new stream is opened -func (m *streamsMap) AcceptStream() (utils.Stream, error) { +func (m *streamsMap) AcceptStream() (*stream, error) { m.mutex.Lock() defer m.mutex.Unlock() - var str utils.Stream + var str *stream for { var ok bool if m.closeErr != nil { diff --git a/streams_map_test.go b/streams_map_test.go index 92ec2799..370d9b50 100644 --- a/streams_map_test.go +++ b/streams_map_test.go @@ -8,7 +8,6 @@ import ( "github.com/lucas-clemente/quic-go/handshake" "github.com/lucas-clemente/quic-go/protocol" "github.com/lucas-clemente/quic-go/qerr" - "github.com/lucas-clemente/quic-go/utils" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -296,7 +295,7 @@ var _ = Describe("Streams Map", func() { }) It("accepts stream 1 first", func() { - var str utils.Stream + var str *stream go func() { defer GinkgoRecover() var err error @@ -305,12 +304,12 @@ var _ = Describe("Streams Map", func() { }() _, err := m.GetOrOpenStream(1) Expect(err).ToNot(HaveOccurred()) - Eventually(func() utils.Stream { return str }).ShouldNot(BeNil()) + Eventually(func() Stream { return str }).ShouldNot(BeNil()) Expect(str.StreamID()).To(Equal(protocol.StreamID(1))) }) It("returns an implicitly opened stream, if a stream number is skipped", func() { - var str utils.Stream + var str *stream go func() { defer GinkgoRecover() var err error @@ -319,12 +318,12 @@ var _ = Describe("Streams Map", func() { }() _, err := m.GetOrOpenStream(5) Expect(err).ToNot(HaveOccurred()) - Eventually(func() utils.Stream { return str }).ShouldNot(BeNil()) + Eventually(func() Stream { return str }).ShouldNot(BeNil()) Expect(str.StreamID()).To(Equal(protocol.StreamID(1))) }) It("returns to multiple accepts", func() { - var str1, str2 utils.Stream + var str1, str2 *stream go func() { defer GinkgoRecover() var err error @@ -339,29 +338,29 @@ var _ = Describe("Streams Map", func() { }() _, err := m.GetOrOpenStream(3) // opens stream 1 and 3 Expect(err).ToNot(HaveOccurred()) - Eventually(func() utils.Stream { return str1 }).ShouldNot(BeNil()) - Eventually(func() utils.Stream { return str2 }).ShouldNot(BeNil()) + Eventually(func() *stream { return str1 }).ShouldNot(BeNil()) + Eventually(func() *stream { return str2 }).ShouldNot(BeNil()) Expect(str1.StreamID()).ToNot(Equal(str2.StreamID())) Expect(str1.StreamID() + str2.StreamID()).To(BeEquivalentTo(1 + 3)) }) It("waits a new stream is available", func() { - var str utils.Stream + var str *stream go func() { defer GinkgoRecover() var err error str, err = m.AcceptStream() Expect(err).ToNot(HaveOccurred()) }() - Consistently(func() utils.Stream { return str }).Should(BeNil()) + Consistently(func() *stream { return str }).Should(BeNil()) _, err := m.GetOrOpenStream(1) Expect(err).ToNot(HaveOccurred()) - Eventually(func() utils.Stream { return str }).ShouldNot(BeNil()) + Eventually(func() *stream { return str }).ShouldNot(BeNil()) Expect(str.StreamID()).To(Equal(protocol.StreamID(1))) }) It("returns multiple streams on subsequent Accept calls, if available", func() { - var str utils.Stream + var str *stream go func() { defer GinkgoRecover() var err error @@ -370,7 +369,7 @@ var _ = Describe("Streams Map", func() { }() _, err := m.GetOrOpenStream(3) Expect(err).ToNot(HaveOccurred()) - Eventually(func() utils.Stream { return str }).ShouldNot(BeNil()) + Eventually(func() *stream { return str }).ShouldNot(BeNil()) Expect(str.StreamID()).To(Equal(protocol.StreamID(1))) str, err = m.AcceptStream() Expect(err).ToNot(HaveOccurred()) @@ -461,7 +460,7 @@ var _ = Describe("Streams Map", func() { Context("accepting streams", func() { It("accepts stream 2 first", func() { - var str utils.Stream + var str *stream go func() { defer GinkgoRecover() var err error @@ -470,7 +469,7 @@ var _ = Describe("Streams Map", func() { }() _, err := m.GetOrOpenStream(2) Expect(err).ToNot(HaveOccurred()) - Eventually(func() utils.Stream { return str }).ShouldNot(BeNil()) + Eventually(func() *stream { return str }).ShouldNot(BeNil()) Expect(str.StreamID()).To(Equal(protocol.StreamID(2))) }) }) diff --git a/utils/stream.go b/utils/stream.go deleted file mode 100644 index b776a893..00000000 --- a/utils/stream.go +++ /dev/null @@ -1,17 +0,0 @@ -package utils - -import ( - "io" - - "github.com/lucas-clemente/quic-go/protocol" -) - -// Stream is the interface for QUIC streams -type Stream interface { - io.Reader - io.Writer - io.Closer - StreamID() protocol.StreamID - CloseRemote(offset protocol.ByteCount) - Reset(error) -}