From 1c1101de0eea69d55633cdf696c4ca18f42c0ac7 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 10 Jun 2016 19:18:51 +0700 Subject: [PATCH] remove queued StreamFrames when a Stream is closed with an error fixes #149 --- session.go | 13 +++++++++---- session_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/session.go b/session.go index 53ca3d48..c15364b2 100644 --- a/session.go +++ b/session.go @@ -364,7 +364,7 @@ func (s *Session) handleRstStreamFrame(frame *frames.RstStreamFrame) error { if !streamExists || str == nil { return errRstStreamOnInvalidStream } - str.RegisterError(fmt.Errorf("RST_STREAM received with code %d", frame.ErrorCode)) + s.closeStreamWithError(str, fmt.Errorf("RST_STREAM received with code %d", frame.ErrorCode)) return nil } @@ -409,14 +409,19 @@ func (s *Session) closeImpl(e error, remoteClose bool) error { func (s *Session) closeStreamsWithError(err error) { s.streamsMutex.Lock() defer s.streamsMutex.Unlock() - for _, s := range s.streams { - if s == nil { + for _, str := range s.streams { + if str == nil { continue } - s.RegisterError(err) + s.closeStreamWithError(str, err) } } +func (s *Session) closeStreamWithError(str *stream, err error) { + s.streamFrameQueue.RemoveStream(str.StreamID()) + str.RegisterError(err) +} + // TODO: try sending more than one packet func (s *Session) maybeSendPacket() error { if !s.smallPacketDelayedOccurranceTime.IsZero() && time.Now().Sub(s.smallPacketDelayedOccurranceTime) > protocol.SmallPacketSendDelay { diff --git a/session_test.go b/session_test.go index 835a808c..24570f31 100644 --- a/session_test.go +++ b/session_test.go @@ -187,6 +187,22 @@ var _ = Describe("Session", func() { Expect(session.streams[5]).To(BeNil()) }) + It("removes queued StreamFrames from StreamFrameQueue when closing with an error", func() { + testErr := errors.New("test") + session.handleStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte{0xde, 0xca, 0xfb, 0xad}, + }) + f := frames.StreamFrame{ + StreamID: 5, + Data: []byte("foobar"), + } + session.streamFrameQueue.Push(&f, true) + Expect(session.streams[5]).ToNot(BeNil()) + session.closeStreamsWithError(testErr) + Expect(session.streamFrameQueue.Pop(1000)).To(BeNil()) + }) + It("removes closed streams from BlockedManager", func() { session.handleStreamFrame(&frames.StreamFrame{ StreamID: 5, @@ -260,6 +276,22 @@ var _ = Describe("Session", func() { Expect(err).To(MatchError("RST_STREAM received with code 42")) }) + It("deletes queued StreamFrames from the StreamFrameQueue", func() { + _, err := session.OpenStream(5) + Expect(err).ToNot(HaveOccurred()) + f := frames.StreamFrame{ + StreamID: 5, + Data: []byte("foobar"), + } + session.streamFrameQueue.Push(&f, false) + err = session.handleRstStreamFrame(&frames.RstStreamFrame{ + StreamID: 5, + ErrorCode: 42, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(session.streamFrameQueue.Pop(1000)).To(BeNil()) + }) + It("errors when the stream is not known", func() { err := session.handleRstStreamFrame(&frames.RstStreamFrame{ StreamID: 5,