implement an AcceptStream method in the Session

This commit is contained in:
Marten Seemann
2017-02-12 21:30:12 +07:00
parent 2fe9da6d27
commit c533a9adb8
2 changed files with 60 additions and 0 deletions

View File

@@ -490,6 +490,7 @@ func (s *Session) closeImpl(e error, remoteClose bool) error {
}
if e == errCloseSessionForNewVersion {
s.streamsMap.CloseWithError(e)
s.closeStreamsWithError(e)
// when the run loop exits, it will call the closeCallback
// replace it with an noop function to make sure this doesn't have any effect
@@ -511,6 +512,7 @@ func (s *Session) closeImpl(e error, remoteClose bool) error {
utils.Errorf("Closing session with error: %s", e.Error())
}
s.streamsMap.CloseWithError(quicErr)
s.closeStreamsWithError(quicErr)
if remoteClose {
@@ -666,6 +668,11 @@ func (s *Session) GetOrOpenStream(id protocol.StreamID) (utils.Stream, error) {
return nil, err
}
// AcceptStream returns the next stream openend by the peer
func (s *Session) AcceptStream() (utils.Stream, error) {
return s.streamsMap.AcceptStream()
}
// OpenStream opens a stream
func (s *Session) OpenStream() (utils.Stream, error) {
return s.streamsMap.OpenStream()

View File

@@ -549,6 +549,59 @@ var _ = Describe("Session", func() {
Expect(err).To(MatchError(qerr.Error(42, "foobar")))
})
Context("accepting streams", func() {
It("waits for new streams", func() {
// stream 1 was already opened
str, err := session.AcceptStream()
Expect(err).ToNot(HaveOccurred())
Expect(str.StreamID()).To(Equal(protocol.StreamID(1)))
str = nil
go func() {
defer GinkgoRecover()
var err error
str, err = session.AcceptStream()
Expect(err).ToNot(HaveOccurred())
}()
Consistently(func() utils.Stream { return str }).Should(BeNil())
session.handleStreamFrame(&frames.StreamFrame{
StreamID: 3,
})
Eventually(func() utils.Stream { return str }).ShouldNot(BeNil())
Expect(str.StreamID()).To(Equal(protocol.StreamID(3)))
})
It("stops accepting when the session is closed", func() {
session.AcceptStream() // accept stream 1
testErr := errors.New("testErr")
var err error
go func() {
_, err = session.AcceptStream()
}()
go session.run()
Consistently(func() error { return err }).ShouldNot(HaveOccurred())
session.Close(testErr)
Eventually(func() error { return err }).Should(HaveOccurred())
Expect(err).To(MatchError(qerr.ToQuicError(testErr)))
})
It("stops accepting when the session is closed after version negotiation", func() {
session.AcceptStream() // accept stream 1
testErr := errCloseSessionForNewVersion
var err error
go func() {
_, err = session.AcceptStream()
}()
go session.run()
Consistently(func() error { return err }).ShouldNot(HaveOccurred())
session.Close(testErr)
Eventually(func() error { return err }).Should(HaveOccurred())
Expect(err).To(MatchError(testErr))
})
})
Context("closing", func() {
var (
nGoRoutinesBefore int