move utils.Stream to quic.Stream

This commit is contained in:
Marten Seemann
2017-02-20 18:05:49 +07:00
parent 592ef45fdf
commit cd465ae0b5
17 changed files with 73 additions and 80 deletions

View File

@@ -107,7 +107,7 @@ func (c *Client) Listen() error {
}
// OpenStream opens a stream, for client-side created streams (i.e. odd streamIDs)
func (c *Client) OpenStream() (utils.Stream, error) {
func (c *Client) OpenStream() (Stream, error) {
return c.session.OpenStream()
}

View File

@@ -21,7 +21,7 @@ import (
)
type quicClient interface {
OpenStream() (utils.Stream, error)
OpenStream() (quic.Stream, error)
Close(error) error
Listen() error
}
@@ -37,7 +37,7 @@ type Client struct {
encryptionLevel protocol.EncryptionLevel
client quicClient
headerStream utils.Stream
headerStream quic.Stream
headerErr *qerr.QuicError
requestWriter *requestWriter
@@ -241,7 +241,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
return res, nil
}
func (c *Client) writeRequestBody(dataStream utils.Stream, body io.ReadCloser) (err error) {
func (c *Client) writeRequestBody(dataStream quic.Stream, body io.ReadCloser) (err error) {
defer func() {
cerr := body.Close()
if err == nil {

View File

@@ -9,9 +9,9 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
quic "github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/qerr"
"github.com/lucas-clemente/quic-go/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -25,7 +25,7 @@ type mockQuicClient struct {
func (m *mockQuicClient) Close(e error) error { m.closeErr = e; return nil }
func (m *mockQuicClient) Listen() error { panic("not implemented") }
func (m *mockQuicClient) OpenStream() (utils.Stream, error) {
func (m *mockQuicClient) OpenStream() (quic.Stream, error) {
id := m.nextStream
ms := &mockStream{id: id}
m.streams[id] = ms

View File

@@ -3,18 +3,18 @@ package h2quic
import (
"io"
"github.com/lucas-clemente/quic-go/utils"
quic "github.com/lucas-clemente/quic-go"
)
type requestBody struct {
requestRead bool
dataStream utils.Stream
dataStream quic.Stream
}
// make sure the requestBody can be used as a http.Request.Body
var _ io.ReadCloser = &requestBody{}
func newRequestBody(stream utils.Stream) *requestBody {
func newRequestBody(stream quic.Stream) *requestBody {
return &requestBody{dataStream: stream}
}

View File

@@ -12,13 +12,14 @@ import (
"golang.org/x/net/http2/hpack"
"golang.org/x/net/lex/httplex"
quic "github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/utils"
)
type requestWriter struct {
mutex sync.Mutex
headerStream utils.Stream
headerStream quic.Stream
henc *hpack.Encoder
hbuf bytes.Buffer // HPACK encoder writes into this
@@ -26,7 +27,7 @@ type requestWriter struct {
const defaultUserAgent = "quic-go"
func newRequestWriter(headerStream utils.Stream) *requestWriter {
func newRequestWriter(headerStream quic.Stream) *requestWriter {
rw := &requestWriter{
headerStream: headerStream,
}

View File

@@ -7,6 +7,7 @@ import (
"strings"
"sync"
quic "github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/utils"
"golang.org/x/net/http2"
@@ -15,9 +16,9 @@ import (
type responseWriter struct {
dataStreamID protocol.StreamID
dataStream utils.Stream
dataStream quic.Stream
headerStream utils.Stream
headerStream quic.Stream
headerStreamMutex *sync.Mutex
header http.Header
@@ -25,7 +26,7 @@ type responseWriter struct {
headerWritten bool
}
func newResponseWriter(headerStream utils.Stream, headerStreamMutex *sync.Mutex, dataStream utils.Stream, dataStreamID protocol.StreamID) *responseWriter {
func newResponseWriter(headerStream quic.Stream, headerStreamMutex *sync.Mutex, dataStream quic.Stream, dataStreamID protocol.StreamID) *responseWriter {
return &responseWriter{
header: http.Header{},
headerStream: headerStream,

View File

@@ -21,7 +21,7 @@ import (
type streamCreator interface {
quic.Session
GetOrOpenStream(protocol.StreamID) (utils.Stream, error)
GetOrOpenStream(protocol.StreamID) (quic.Stream, error)
}
// Server is a HTTP2 server listening for QUIC connections.
@@ -128,7 +128,7 @@ func (s *Server) handleHeaderStream(session streamCreator) {
}()
}
func (s *Server) handleRequest(session streamCreator, headerStream utils.Stream, headerStreamMutex *sync.Mutex, hpackDecoder *hpack.Decoder, h2framer *http2.Framer) error {
func (s *Server) handleRequest(session streamCreator, headerStream quic.Stream, headerStreamMutex *sync.Mutex, hpackDecoder *hpack.Decoder, h2framer *http2.Framer) error {
h2frame, err := h2framer.ReadFrame()
if err != nil {
return err

View File

@@ -13,10 +13,10 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
quic "github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/qerr"
"github.com/lucas-clemente/quic-go/testdata"
"github.com/lucas-clemente/quic-go/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -25,20 +25,20 @@ import (
type mockSession struct {
closed bool
closedWithError error
dataStream utils.Stream
streamToAccept utils.Stream
dataStream quic.Stream
streamToAccept quic.Stream
}
func (s *mockSession) GetOrOpenStream(id protocol.StreamID) (utils.Stream, error) {
func (s *mockSession) GetOrOpenStream(id protocol.StreamID) (quic.Stream, error) {
return s.dataStream, nil
}
func (s *mockSession) AcceptStream() (utils.Stream, error) {
func (s *mockSession) AcceptStream() (quic.Stream, error) {
return s.streamToAccept, nil
}
func (s *mockSession) OpenStream() (utils.Stream, error) {
func (s *mockSession) OpenStream() (quic.Stream, error) {
panic("not implemented")
}
func (s *mockSession) OpenStreamSync() (utils.Stream, error) {
func (s *mockSession) OpenStreamSync() (quic.Stream, error) {
panic("not implemented")
}
func (s *mockSession) Close(e error) error {

View File

@@ -25,7 +25,7 @@ type cryptoSetupClient struct {
version protocol.VersionNumber
negotiatedVersions []protocol.VersionNumber
cryptoStream utils.Stream
cryptoStream io.ReadWriter
serverConfig *serverConfigClient
@@ -64,7 +64,7 @@ func NewCryptoSetupClient(
hostname string,
connID protocol.ConnectionID,
version protocol.VersionNumber,
cryptoStream utils.Stream,
cryptoStream io.ReadWriter,
tlsConfig *tls.Config,
connectionParameters ConnectionParametersManager,
aeadChanged chan struct{},

View File

@@ -36,7 +36,7 @@ type cryptoSetupServer struct {
keyDerivation KeyDerivationFunction
keyExchange KeyExchangeFunction
cryptoStream utils.Stream
cryptoStream io.ReadWriter
connectionParameters ConnectionParametersManager
@@ -51,7 +51,7 @@ func NewCryptoSetup(
sourceAddr []byte,
version protocol.VersionNumber,
scfg *ServerConfig,
cryptoStream utils.Stream,
cryptoStream io.ReadWriter,
connectionParametersManager ConnectionParametersManager,
aeadChanged chan struct{},
) (CryptoSetup, error) {

View File

@@ -2,21 +2,32 @@ package quic
import (
"crypto/tls"
"io"
"net"
"github.com/lucas-clemente/quic-go/utils"
"github.com/lucas-clemente/quic-go/protocol"
)
// Stream is the interface for QUIC streams
type Stream interface {
io.Reader
io.Writer
io.Closer
StreamID() protocol.StreamID
CloseRemote(offset protocol.ByteCount)
Reset(error)
}
// A Session is a QUIC Session
type Session interface {
// get the next stream opened by the client
// first stream returned has StreamID 3
AcceptStream() (utils.Stream, error)
AcceptStream() (Stream, error)
// guaranteed to return the smallest unopened stream
// special error for "too many streams, retry later"
OpenStream() (utils.Stream, error)
OpenStream() (Stream, error)
// blocks until a new stream can be opened, if the maximum number of stream is opened
OpenStreamSync() (utils.Stream, error)
OpenStreamSync() (Stream, error)
RemoteAddr() net.Addr
Close(error) error
}

View File

@@ -32,13 +32,13 @@ func (s *mockSession) Close(e error) error {
s.closed = true
return nil
}
func (s *mockSession) AcceptStream() (utils.Stream, error) {
func (s *mockSession) AcceptStream() (Stream, error) {
panic("not implemented")
}
func (s *mockSession) OpenStream() (utils.Stream, error) {
func (s *mockSession) OpenStream() (Stream, error) {
return &stream{streamID: 1337}, nil
}
func (s *mockSession) OpenStreamSync() (utils.Stream, error) {
func (s *mockSession) OpenStreamSync() (Stream, error) {
panic("not implemented")
}
func (s *mockSession) RemoteAddr() net.Addr {

View File

@@ -662,26 +662,26 @@ func (s *session) logPacket(packet *packedPacket) {
// GetOrOpenStream either returns an existing stream, a newly opened stream, or nil if a stream with the provided ID is already closed.
// Newly opened streams should only originate from the client. To open a stream from the server, OpenStream should be used.
func (s *session) GetOrOpenStream(id protocol.StreamID) (utils.Stream, error) {
func (s *session) GetOrOpenStream(id protocol.StreamID) (Stream, error) {
str, err := s.streamsMap.GetOrOpenStream(id)
if str != nil {
return str, err
}
// make sure to return an actual nil value here, not an utils.Stream with value nil
// make sure to return an actual nil value here, not an Stream with value nil
return nil, err
}
// AcceptStream returns the next stream openend by the peer
func (s *session) AcceptStream() (utils.Stream, error) {
func (s *session) AcceptStream() (Stream, error) {
return s.streamsMap.AcceptStream()
}
// OpenStream opens a stream
func (s *session) OpenStream() (utils.Stream, error) {
func (s *session) OpenStream() (Stream, error) {
return s.streamsMap.OpenStream()
}
func (s *session) OpenStreamSync() (utils.Stream, error) {
func (s *session) OpenStreamSync() (Stream, error) {
return s.streamsMap.OpenStreamSync()
}

View File

@@ -21,7 +21,6 @@ import (
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/qerr"
"github.com/lucas-clemente/quic-go/testdata"
"github.com/lucas-clemente/quic-go/utils"
)
type mockConnection struct {
@@ -571,18 +570,18 @@ var _ = Describe("Session", func() {
Context("accepting streams", func() {
It("waits for new streams", func() {
var str utils.Stream
var str Stream
go func() {
defer GinkgoRecover()
var err error
str, err = sess.AcceptStream()
Expect(err).ToNot(HaveOccurred())
}()
Consistently(func() utils.Stream { return str }).Should(BeNil())
Consistently(func() Stream { return str }).Should(BeNil())
sess.handleStreamFrame(&frames.StreamFrame{
StreamID: 3,
})
Eventually(func() utils.Stream { return str }).ShouldNot(BeNil())
Eventually(func() Stream { return str }).ShouldNot(BeNil())
Expect(str.StreamID()).To(Equal(protocol.StreamID(3)))
})
@@ -1236,8 +1235,8 @@ var _ = Describe("Session", func() {
str, err := sess.GetOrOpenStream(9)
Expect(err).ToNot(HaveOccurred())
Expect(str).To(BeNil())
// make sure that the returned value is a plain nil, not an utils.Stream with value nil
_, ok := str.(utils.Stream)
// make sure that the returned value is a plain nil, not an Stream with value nil
_, ok := str.(Stream)
Expect(ok).To(BeFalse())
})

View File

@@ -8,7 +8,6 @@ import (
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/qerr"
"github.com/lucas-clemente/quic-go/utils"
)
type streamsMap struct {
@@ -193,10 +192,10 @@ func (m *streamsMap) OpenStreamSync() (*stream, error) {
// AcceptStream returns the next stream opened by the peer
// it blocks until a new stream is opened
func (m *streamsMap) AcceptStream() (utils.Stream, error) {
func (m *streamsMap) AcceptStream() (*stream, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var str utils.Stream
var str *stream
for {
var ok bool
if m.closeErr != nil {

View File

@@ -8,7 +8,6 @@ import (
"github.com/lucas-clemente/quic-go/handshake"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/qerr"
"github.com/lucas-clemente/quic-go/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
@@ -296,7 +295,7 @@ var _ = Describe("Streams Map", func() {
})
It("accepts stream 1 first", func() {
var str utils.Stream
var str *stream
go func() {
defer GinkgoRecover()
var err error
@@ -305,12 +304,12 @@ var _ = Describe("Streams Map", func() {
}()
_, err := m.GetOrOpenStream(1)
Expect(err).ToNot(HaveOccurred())
Eventually(func() utils.Stream { return str }).ShouldNot(BeNil())
Eventually(func() Stream { return str }).ShouldNot(BeNil())
Expect(str.StreamID()).To(Equal(protocol.StreamID(1)))
})
It("returns an implicitly opened stream, if a stream number is skipped", func() {
var str utils.Stream
var str *stream
go func() {
defer GinkgoRecover()
var err error
@@ -319,12 +318,12 @@ var _ = Describe("Streams Map", func() {
}()
_, err := m.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
Eventually(func() utils.Stream { return str }).ShouldNot(BeNil())
Eventually(func() Stream { return str }).ShouldNot(BeNil())
Expect(str.StreamID()).To(Equal(protocol.StreamID(1)))
})
It("returns to multiple accepts", func() {
var str1, str2 utils.Stream
var str1, str2 *stream
go func() {
defer GinkgoRecover()
var err error
@@ -339,29 +338,29 @@ var _ = Describe("Streams Map", func() {
}()
_, err := m.GetOrOpenStream(3) // opens stream 1 and 3
Expect(err).ToNot(HaveOccurred())
Eventually(func() utils.Stream { return str1 }).ShouldNot(BeNil())
Eventually(func() utils.Stream { return str2 }).ShouldNot(BeNil())
Eventually(func() *stream { return str1 }).ShouldNot(BeNil())
Eventually(func() *stream { return str2 }).ShouldNot(BeNil())
Expect(str1.StreamID()).ToNot(Equal(str2.StreamID()))
Expect(str1.StreamID() + str2.StreamID()).To(BeEquivalentTo(1 + 3))
})
It("waits a new stream is available", func() {
var str utils.Stream
var str *stream
go func() {
defer GinkgoRecover()
var err error
str, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
}()
Consistently(func() utils.Stream { return str }).Should(BeNil())
Consistently(func() *stream { return str }).Should(BeNil())
_, err := m.GetOrOpenStream(1)
Expect(err).ToNot(HaveOccurred())
Eventually(func() utils.Stream { return str }).ShouldNot(BeNil())
Eventually(func() *stream { return str }).ShouldNot(BeNil())
Expect(str.StreamID()).To(Equal(protocol.StreamID(1)))
})
It("returns multiple streams on subsequent Accept calls, if available", func() {
var str utils.Stream
var str *stream
go func() {
defer GinkgoRecover()
var err error
@@ -370,7 +369,7 @@ var _ = Describe("Streams Map", func() {
}()
_, err := m.GetOrOpenStream(3)
Expect(err).ToNot(HaveOccurred())
Eventually(func() utils.Stream { return str }).ShouldNot(BeNil())
Eventually(func() *stream { return str }).ShouldNot(BeNil())
Expect(str.StreamID()).To(Equal(protocol.StreamID(1)))
str, err = m.AcceptStream()
Expect(err).ToNot(HaveOccurred())
@@ -461,7 +460,7 @@ var _ = Describe("Streams Map", func() {
Context("accepting streams", func() {
It("accepts stream 2 first", func() {
var str utils.Stream
var str *stream
go func() {
defer GinkgoRecover()
var err error
@@ -470,7 +469,7 @@ var _ = Describe("Streams Map", func() {
}()
_, err := m.GetOrOpenStream(2)
Expect(err).ToNot(HaveOccurred())
Eventually(func() utils.Stream { return str }).ShouldNot(BeNil())
Eventually(func() *stream { return str }).ShouldNot(BeNil())
Expect(str.StreamID()).To(Equal(protocol.StreamID(2)))
})
})

View File

@@ -1,17 +0,0 @@
package utils
import (
"io"
"github.com/lucas-clemente/quic-go/protocol"
)
// Stream is the interface for QUIC streams
type Stream interface {
io.Reader
io.Writer
io.Closer
StreamID() protocol.StreamID
CloseRemote(offset protocol.ByteCount)
Reset(error)
}