forked from quic-go/quic-go
remove queued StreamFrames when a Stream is closed with an error
fixes #149
This commit is contained in:
13
session.go
13
session.go
@@ -364,7 +364,7 @@ func (s *Session) handleRstStreamFrame(frame *frames.RstStreamFrame) error {
|
||||
if !streamExists || str == nil {
|
||||
return errRstStreamOnInvalidStream
|
||||
}
|
||||
str.RegisterError(fmt.Errorf("RST_STREAM received with code %d", frame.ErrorCode))
|
||||
s.closeStreamWithError(str, fmt.Errorf("RST_STREAM received with code %d", frame.ErrorCode))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -409,14 +409,19 @@ func (s *Session) closeImpl(e error, remoteClose bool) error {
|
||||
func (s *Session) closeStreamsWithError(err error) {
|
||||
s.streamsMutex.Lock()
|
||||
defer s.streamsMutex.Unlock()
|
||||
for _, s := range s.streams {
|
||||
if s == nil {
|
||||
for _, str := range s.streams {
|
||||
if str == nil {
|
||||
continue
|
||||
}
|
||||
s.RegisterError(err)
|
||||
s.closeStreamWithError(str, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) closeStreamWithError(str *stream, err error) {
|
||||
s.streamFrameQueue.RemoveStream(str.StreamID())
|
||||
str.RegisterError(err)
|
||||
}
|
||||
|
||||
// TODO: try sending more than one packet
|
||||
func (s *Session) maybeSendPacket() error {
|
||||
if !s.smallPacketDelayedOccurranceTime.IsZero() && time.Now().Sub(s.smallPacketDelayedOccurranceTime) > protocol.SmallPacketSendDelay {
|
||||
|
||||
@@ -187,6 +187,22 @@ var _ = Describe("Session", func() {
|
||||
Expect(session.streams[5]).To(BeNil())
|
||||
})
|
||||
|
||||
It("removes queued StreamFrames from StreamFrameQueue when closing with an error", func() {
|
||||
testErr := errors.New("test")
|
||||
session.handleStreamFrame(&frames.StreamFrame{
|
||||
StreamID: 5,
|
||||
Data: []byte{0xde, 0xca, 0xfb, 0xad},
|
||||
})
|
||||
f := frames.StreamFrame{
|
||||
StreamID: 5,
|
||||
Data: []byte("foobar"),
|
||||
}
|
||||
session.streamFrameQueue.Push(&f, true)
|
||||
Expect(session.streams[5]).ToNot(BeNil())
|
||||
session.closeStreamsWithError(testErr)
|
||||
Expect(session.streamFrameQueue.Pop(1000)).To(BeNil())
|
||||
})
|
||||
|
||||
It("removes closed streams from BlockedManager", func() {
|
||||
session.handleStreamFrame(&frames.StreamFrame{
|
||||
StreamID: 5,
|
||||
@@ -260,6 +276,22 @@ var _ = Describe("Session", func() {
|
||||
Expect(err).To(MatchError("RST_STREAM received with code 42"))
|
||||
})
|
||||
|
||||
It("deletes queued StreamFrames from the StreamFrameQueue", func() {
|
||||
_, err := session.OpenStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
f := frames.StreamFrame{
|
||||
StreamID: 5,
|
||||
Data: []byte("foobar"),
|
||||
}
|
||||
session.streamFrameQueue.Push(&f, false)
|
||||
err = session.handleRstStreamFrame(&frames.RstStreamFrame{
|
||||
StreamID: 5,
|
||||
ErrorCode: 42,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(session.streamFrameQueue.Pop(1000)).To(BeNil())
|
||||
})
|
||||
|
||||
It("errors when the stream is not known", func() {
|
||||
err := session.handleRstStreamFrame(&frames.RstStreamFrame{
|
||||
StreamID: 5,
|
||||
|
||||
Reference in New Issue
Block a user