diff --git a/session.go b/session.go index 144f5efff..7068bcbce 100644 --- a/session.go +++ b/session.go @@ -733,20 +733,14 @@ func (s *session) queueResetStreamFrame(id protocol.StreamID, offset protocol.By s.scheduleSending() } -func (s *session) newStream(id protocol.StreamID) (*stream, error) { - stream, err := newStream(id, s.scheduleSending, s.queueResetStreamFrame, s.flowControlManager) - if err != nil { - return nil, err - } - +func (s *session) newStream(id protocol.StreamID) *stream { // TODO: find a better solution for determining which streams contribute to connection level flow control if id == 1 || id == 3 { s.flowControlManager.NewStream(id, false) } else { s.flowControlManager.NewStream(id, true) } - - return stream, nil + return newStream(id, s.scheduleSending, s.queueResetStreamFrame, s.flowControlManager) } // garbageCollectStreams goes through all streams and removes EOF'ed streams diff --git a/stream.go b/stream.go index 5596227f9..5e2cbaf87 100644 --- a/stream.go +++ b/stream.go @@ -52,7 +52,10 @@ type stream struct { } // newStream creates a new Stream -func newStream(StreamID protocol.StreamID, onData func(), onReset func(protocol.StreamID, protocol.ByteCount), flowControlManager flowcontrol.FlowControlManager) (*stream, error) { +func newStream(StreamID protocol.StreamID, + onData func(), + onReset func(protocol.StreamID, protocol.ByteCount), + flowControlManager flowcontrol.FlowControlManager) *stream { s := &stream{ onData: onData, onReset: onReset, @@ -60,11 +63,9 @@ func newStream(StreamID protocol.StreamID, onData func(), onReset func(protocol. flowControlManager: flowControlManager, frameQueue: newStreamFrameSorter(), } - s.newFrameOrErrCond.L = &s.mutex s.doneWritingOrErrCond.L = &s.mutex - - return s, nil + return s } // Read implements io.Reader. It is not thread safe! diff --git a/stream_test.go b/stream_test.go index 257b9cb5b..14cb7da15 100644 --- a/stream_test.go +++ b/stream_test.go @@ -139,7 +139,7 @@ var _ = Describe("Stream", func() { cpm := &mockConnectionParametersManager{} flowControlManager := flowcontrol.NewFlowControlManager(cpm, &congestion.RTTStats{}) flowControlManager.NewStream(streamID, true) - str, _ = newStream(streamID, onData, onReset, flowControlManager) + str = newStream(streamID, onData, onReset, flowControlManager) }) It("gets stream id", func() { diff --git a/streams_map.go b/streams_map.go index 31cfe5a84..c1a8c2793 100644 --- a/streams_map.go +++ b/streams_map.go @@ -36,7 +36,7 @@ type streamsMap struct { } type streamLambda func(*stream) (bool, error) -type newStreamLambda func(protocol.StreamID) (*stream, error) +type newStreamLambda func(protocol.StreamID) *stream var ( errMapAccess = errors.New("streamsMap: Error accessing the streams map") @@ -120,11 +120,6 @@ func (m *streamsMap) openRemoteStream(id protocol.StreamID) (*stream, error) { return nil, qerr.Error(qerr.InvalidStreamID, fmt.Sprintf("attempted to open stream %d, which is a lot smaller than the highest opened stream, %d", id, m.highestStreamOpenedByPeer)) } - s, err := m.newStream(id) - if err != nil { - return nil, err - } - if m.perspective == protocol.PerspectiveServer { m.numIncomingStreams++ } else { @@ -135,6 +130,7 @@ func (m *streamsMap) openRemoteStream(id protocol.StreamID) (*stream, error) { m.highestStreamOpenedByPeer = id } + s := m.newStream(id) m.putStream(s) return s, nil } @@ -145,11 +141,6 @@ func (m *streamsMap) openStreamImpl() (*stream, error) { return nil, qerr.TooManyOpenStreams } - s, err := m.newStream(id) - if err != nil { - return nil, err - } - if m.perspective == protocol.PerspectiveServer { m.numOutgoingStreams++ } else { @@ -157,6 +148,7 @@ func (m *streamsMap) openStreamImpl() (*stream, error) { } m.nextStream += 2 + s := m.newStream(id) m.putStream(s) return s, nil } diff --git a/streams_map_test.go b/streams_map_test.go index f810fd26b..31a893867 100644 --- a/streams_map_test.go +++ b/streams_map_test.go @@ -59,8 +59,8 @@ var _ = Describe("Streams Map", func() { setNewStreamsMap := func(p protocol.Perspective) { m = newStreamsMap(nil, p, cpm) - m.newStream = func(id protocol.StreamID) (*stream, error) { - return &stream{streamID: id}, nil + m.newStream = func(id protocol.StreamID) *stream { + return &stream{streamID: id} } } @@ -174,13 +174,6 @@ var _ = Describe("Streams Map", func() { Expect(m.numOutgoingStreams).To(BeEquivalentTo(1)) }) - It("errors if the stream can't be created", func() { - testErr := errors.New("test error") - m.newStream = func(protocol.StreamID) (*stream, error) { return nil, testErr } - _, err := m.OpenStream() - Expect(err).To(MatchError(testErr)) - }) - It("returns the error when the streamsMap was closed", func() { testErr := errors.New("test error") m.CloseWithError(testErr) @@ -259,13 +252,6 @@ var _ = Describe("Streams Map", func() { Expect(str.StreamID()).To(Equal(protocol.StreamID(2*maxNumStreams + 2))) }) - It("errors if the stream can't be created", func() { - testErr := errors.New("test error") - m.newStream = func(protocol.StreamID) (*stream, error) { return nil, testErr } - _, err := m.OpenStreamSync() - Expect(err).To(MatchError(testErr)) - }) - It("stops waiting when an error is registered", func() { openMaxNumStreams() testErr := errors.New("test error")