forked from quic-go/quic-go
remove session pointer from stream and use a lambda to signal new data
This commit is contained in:
@@ -617,7 +617,7 @@ func (s *Session) newStreamImpl(id protocol.StreamID) (*stream, error) {
|
||||
if _, ok := s.streams[id]; ok {
|
||||
return nil, fmt.Errorf("Session: stream with ID %d already exists", id)
|
||||
}
|
||||
stream, err := newStream(s, s.connectionParametersManager, s.flowControlManager, id)
|
||||
stream, err := newStream(s.scheduleSending, s.connectionParametersManager, s.flowControlManager, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
14
stream.go
14
stream.go
@@ -13,10 +13,6 @@ import (
|
||||
"github.com/lucas-clemente/quic-go/utils"
|
||||
)
|
||||
|
||||
type streamHandler interface {
|
||||
scheduleSending()
|
||||
}
|
||||
|
||||
var (
|
||||
errFlowControlViolation = qerr.FlowControlReceivedTooMuchData
|
||||
errConnectionFlowControlViolation = qerr.FlowControlReceivedTooMuchData
|
||||
@@ -27,7 +23,7 @@ var (
|
||||
// Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
|
||||
type stream struct {
|
||||
streamID protocol.StreamID
|
||||
session streamHandler
|
||||
onData func()
|
||||
|
||||
readPosInFrame int
|
||||
writeOffset protocol.ByteCount
|
||||
@@ -55,9 +51,9 @@ type stream struct {
|
||||
}
|
||||
|
||||
// newStream creates a new Stream
|
||||
func newStream(session streamHandler, connectionParameterManager *handshake.ConnectionParametersManager, flowControlManager flowcontrol.FlowControlManager, StreamID protocol.StreamID) (*stream, error) {
|
||||
func newStream(onData func(), connectionParameterManager *handshake.ConnectionParametersManager, flowControlManager flowcontrol.FlowControlManager, StreamID protocol.StreamID) (*stream, error) {
|
||||
s := &stream{
|
||||
session: session,
|
||||
onData: onData,
|
||||
streamID: StreamID,
|
||||
flowControlManager: flowControlManager,
|
||||
contributesToConnectionFlowControl: true,
|
||||
@@ -161,7 +157,7 @@ func (s *stream) Write(p []byte) (int, error) {
|
||||
s.dataForWriting = make([]byte, len(p))
|
||||
copy(s.dataForWriting, p)
|
||||
|
||||
s.session.scheduleSending()
|
||||
s.onData()
|
||||
|
||||
for s.dataForWriting != nil && s.err == nil {
|
||||
s.doneWritingOrErrCond.Wait()
|
||||
@@ -201,7 +197,7 @@ func (s *stream) getDataForWriting(maxBytes protocol.ByteCount) []byte {
|
||||
// Close implements io.Closer
|
||||
func (s *stream) Close() error {
|
||||
atomic.StoreInt32(&s.closed, 1)
|
||||
s.session.scheduleSending()
|
||||
s.onData()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -13,12 +13,6 @@ import (
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
type mockStreamHandler struct {
|
||||
scheduledSending bool
|
||||
}
|
||||
|
||||
func (m *mockStreamHandler) scheduleSending() { m.scheduledSending = true }
|
||||
|
||||
type mockFlowControlHandler struct {
|
||||
streamsContributing []protocol.StreamID
|
||||
|
||||
@@ -96,17 +90,21 @@ func (m *mockFlowControlHandler) StreamContributesToConnectionFlowControl(stream
|
||||
|
||||
var _ = Describe("Stream", func() {
|
||||
var (
|
||||
str *stream
|
||||
handler *mockStreamHandler
|
||||
str *stream
|
||||
onDataCalled bool
|
||||
)
|
||||
|
||||
onData := func() {
|
||||
onDataCalled = true
|
||||
}
|
||||
|
||||
BeforeEach(func() {
|
||||
onDataCalled = false
|
||||
var streamID protocol.StreamID = 1337
|
||||
handler = &mockStreamHandler{}
|
||||
cpm := handshake.NewConnectionParamatersManager()
|
||||
flowControlManager := flowcontrol.NewFlowControlManager(cpm)
|
||||
flowControlManager.NewStream(streamID, true)
|
||||
str, _ = newStream(handler, cpm, flowControlManager, streamID)
|
||||
str, _ = newStream(onData, cpm, flowControlManager, streamID)
|
||||
})
|
||||
|
||||
It("gets stream id", func() {
|
||||
@@ -299,7 +297,7 @@ var _ = Describe("Stream", func() {
|
||||
defer str.mutex.Unlock()
|
||||
return str.dataForWriting
|
||||
}).Should(Equal([]byte("foobar")))
|
||||
Expect(handler.scheduledSending).To(BeTrue())
|
||||
Expect(onDataCalled).To(BeTrue())
|
||||
Expect(str.lenOfDataForWriting()).To(Equal(protocol.ByteCount(6)))
|
||||
data := str.getDataForWriting(1000)
|
||||
Expect(data).To(Equal([]byte("foobar")))
|
||||
|
||||
Reference in New Issue
Block a user