diff --git a/streams_map.go b/streams_map.go index 40556d67..1469ac67 100644 --- a/streams_map.go +++ b/streams_map.go @@ -160,7 +160,7 @@ func (m *streamsMap) openStreamImpl() (streamI, error) { } // OpenStream opens the next available stream -func (m *streamsMap) OpenStream() (streamI, error) { +func (m *streamsMap) OpenStream() (Stream, error) { m.mutex.Lock() defer m.mutex.Unlock() @@ -170,7 +170,7 @@ func (m *streamsMap) OpenStream() (streamI, error) { return m.openStreamImpl() } -func (m *streamsMap) OpenStreamSync() (streamI, error) { +func (m *streamsMap) OpenStreamSync() (Stream, error) { m.mutex.Lock() defer m.mutex.Unlock() @@ -191,7 +191,7 @@ func (m *streamsMap) OpenStreamSync() (streamI, error) { // AcceptStream returns the next stream opened by the peer // it blocks until a new stream is opened -func (m *streamsMap) AcceptStream() (streamI, error) { +func (m *streamsMap) AcceptStream() (Stream, error) { m.mutex.Lock() defer m.mutex.Unlock() var str streamI diff --git a/streams_map_test.go b/streams_map_test.go index 0c90d627..be1c16f8 100644 --- a/streams_map_test.go +++ b/streams_map_test.go @@ -210,19 +210,18 @@ var _ = Describe("Streams Map", func() { It("waits until another stream is closed", func() { openMaxNumStreams() - var returned bool - var str streamI + var str Stream + done := make(chan struct{}) go func() { defer GinkgoRecover() var err error str, err = m.OpenStreamSync() Expect(err).ToNot(HaveOccurred()) - returned = true + close(done) }() - - Consistently(func() bool { return returned }).Should(BeFalse()) + Consistently(done).ShouldNot(BeClosed()) deleteStream(6) - Eventually(func() bool { return returned }).Should(BeTrue()) + Eventually(done).Should(BeClosed()) Expect(str.StreamID()).To(Equal(protocol.StreamID(2*maxOutgoingStreams + 2))) }) @@ -267,95 +266,109 @@ var _ = Describe("Streams Map", func() { It("starts with stream 1, if the crypto stream is stream 0", func() { setNewStreamsMap(protocol.PerspectiveServer, versionIETFFrames) - var str streamI + var str Stream + done := make(chan struct{}) go func() { defer GinkgoRecover() var err error str, err = m.AcceptStream() Expect(err).ToNot(HaveOccurred()) + close(done) }() _, err := m.GetOrOpenStream(1) Expect(err).ToNot(HaveOccurred()) - Eventually(func() Stream { return str }).ShouldNot(BeNil()) + Eventually(done).Should(BeClosed()) Expect(str.StreamID()).To(Equal(protocol.StreamID(1))) }) It("starts with stream 3, if the crypto stream is stream 1", func() { - var str streamI + var str Stream + done := make(chan struct{}) go func() { defer GinkgoRecover() var err error str, err = m.AcceptStream() Expect(err).ToNot(HaveOccurred()) + close(done) }() _, err := m.GetOrOpenStream(3) Expect(err).ToNot(HaveOccurred()) - Eventually(func() Stream { return str }).ShouldNot(BeNil()) + Eventually(done).Should(BeClosed()) Expect(str.StreamID()).To(Equal(protocol.StreamID(3))) }) It("returns an implicitly opened stream, if a stream number is skipped", func() { - var str streamI + var str Stream + done := make(chan struct{}) go func() { defer GinkgoRecover() var err error str, err = m.AcceptStream() Expect(err).ToNot(HaveOccurred()) + close(done) }() _, err := m.GetOrOpenStream(5) Expect(err).ToNot(HaveOccurred()) - Eventually(func() Stream { return str }).ShouldNot(BeNil()) + Eventually(done).Should(BeClosed()) Expect(str.StreamID()).To(Equal(protocol.StreamID(3))) }) It("returns to multiple accepts", func() { - var str1, str2 streamI + var str1, str2 Stream + done1 := make(chan struct{}) + done2 := make(chan struct{}) go func() { defer GinkgoRecover() var err error str1, err = m.AcceptStream() Expect(err).ToNot(HaveOccurred()) + close(done1) }() go func() { defer GinkgoRecover() var err error str2, err = m.AcceptStream() Expect(err).ToNot(HaveOccurred()) + close(done2) }() _, err := m.GetOrOpenStream(5) // opens stream 3 and 5 Expect(err).ToNot(HaveOccurred()) - Eventually(func() streamI { return str1 }).ShouldNot(BeNil()) - Eventually(func() streamI { return str2 }).ShouldNot(BeNil()) + Eventually(done1).Should(BeClosed()) + Eventually(done2).Should(BeClosed()) Expect(str1.StreamID()).ToNot(Equal(str2.StreamID())) Expect(str1.StreamID() + str2.StreamID()).To(BeEquivalentTo(3 + 5)) }) - It("waits a new stream is available", func() { - var str streamI + It("waits until a new stream is available", func() { + var str Stream + done := make(chan struct{}) go func() { defer GinkgoRecover() var err error str, err = m.AcceptStream() Expect(err).ToNot(HaveOccurred()) + close(done) }() - Consistently(func() streamI { return str }).Should(BeNil()) + Consistently(done).ShouldNot(BeClosed()) _, err := m.GetOrOpenStream(3) Expect(err).ToNot(HaveOccurred()) - Eventually(func() streamI { return str }).ShouldNot(BeNil()) + Eventually(done).Should(BeClosed()) Expect(str.StreamID()).To(Equal(protocol.StreamID(3))) }) It("returns multiple streams on subsequent Accept calls, if available", func() { - var str streamI + var str Stream + done := make(chan struct{}) go func() { defer GinkgoRecover() var err error str, err = m.AcceptStream() Expect(err).ToNot(HaveOccurred()) + close(done) }() _, err := m.GetOrOpenStream(5) Expect(err).ToNot(HaveOccurred()) - Eventually(func() streamI { return str }).ShouldNot(BeNil()) + Eventually(done).Should(BeClosed()) Expect(str.StreamID()).To(Equal(protocol.StreamID(3))) str, err = m.AcceptStream() Expect(err).ToNot(HaveOccurred()) @@ -489,16 +502,18 @@ var _ = Describe("Streams Map", func() { Context("accepting streams", func() { It("accepts stream 2 first", func() { - var str streamI + var str Stream + done := make(chan struct{}) go func() { defer GinkgoRecover() var err error str, err = m.AcceptStream() Expect(err).ToNot(HaveOccurred()) + close(done) }() _, err := m.GetOrOpenStream(2) Expect(err).ToNot(HaveOccurred()) - Eventually(func() streamI { return str }).ShouldNot(BeNil()) + Eventually(done).Should(BeClosed()) Expect(str.StreamID()).To(Equal(protocol.StreamID(2))) }) })