diff --git a/flowcontrol/flow_control_manager.go b/flowcontrol/flow_control_manager.go index f2e565be..7e3a95d5 100644 --- a/flowcontrol/flow_control_manager.go +++ b/flowcontrol/flow_control_manager.go @@ -51,6 +51,14 @@ func (f *flowControlManager) NewStream(streamID protocol.StreamID, contributesTo f.contributesToConnectionFlowControl[streamID] = contributesToConnectionFlow } +// RemoveStream removes a closed stream from flow control +func (f *flowControlManager) RemoveStream(streamID protocol.StreamID) { + f.mutex.Lock() + delete(f.streamFlowController, streamID) + delete(f.contributesToConnectionFlowControl, streamID) + f.mutex.Unlock() +} + // UpdateHighestReceived updates the highest received byte offset for a stream // it adds the number of additional bytes to connection level flow control // streamID must not be 0 here diff --git a/flowcontrol/flow_control_manager_test.go b/flowcontrol/flow_control_manager_test.go index b0f97da6..21a955f4 100644 --- a/flowcontrol/flow_control_manager_test.go +++ b/flowcontrol/flow_control_manager_test.go @@ -33,6 +33,14 @@ var _ = Describe("Flow Control Manager", func() { }) }) + It("removes streams", func() { + fcm.NewStream(5, true) + Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(5))) + fcm.RemoveStream(5) + Expect(fcm.streamFlowController).ToNot(HaveKey(protocol.StreamID(5))) + Expect(fcm.contributesToConnectionFlowControl).ToNot(HaveKey(protocol.StreamID(5))) + }) + Context("receiving data", func() { BeforeEach(func() { fcm.NewStream(1, false) diff --git a/flowcontrol/interface.go b/flowcontrol/interface.go index 289e1166..dd189101 100644 --- a/flowcontrol/interface.go +++ b/flowcontrol/interface.go @@ -5,6 +5,7 @@ import "github.com/lucas-clemente/quic-go/protocol" // A FlowControlManager manages the flow control type FlowControlManager interface { NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool) + RemoveStream(streamID protocol.StreamID) // methods needed for receiving data UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error diff --git a/session.go b/session.go index 5f932fa7..846e3633 100644 --- a/session.go +++ b/session.go @@ -621,6 +621,7 @@ func (s *Session) garbageCollectStreams() { utils.Debugf("Garbage-collecting stream %d", k) atomic.AddUint32(&s.openStreamsCount, ^uint32(0)) // decrement s.streams[k] = nil + s.flowControlManager.RemoveStream(k) } } } diff --git a/session_test.go b/session_test.go index 42ba6236..1b1967b6 100644 --- a/session_test.go +++ b/session_test.go @@ -186,7 +186,7 @@ var _ = Describe("Session", func() { Expect(session.streams[5]).ToNot(BeNil()) }) - It("closes streams with FIN bit & close", func() { + It("deletes streams with FIN bit & close", func() { session.handleStreamFrame(&frames.StreamFrame{ StreamID: 5, Data: []byte{0xde, 0xca, 0xfb, 0xad}, @@ -209,6 +209,9 @@ var _ = Describe("Session", func() { session.garbageCollectStreams() Expect(session.streams).To(HaveLen(2)) Expect(session.streams[5]).To(BeNil()) + // flow controller should have been notified + _, err = session.flowControlManager.SendWindowSize(5) + Expect(err).To(MatchError("Error accessing the flowController map.")) }) It("closes streams with error", func() { diff --git a/stream_test.go b/stream_test.go index e0dfc987..b5b2bd44 100644 --- a/stream_test.go +++ b/stream_test.go @@ -40,6 +40,10 @@ func (m *mockFlowControlHandler) NewStream(streamID protocol.StreamID, contribut panic("not implemented") } +func (m *mockFlowControlHandler) RemoveStream(streamID protocol.StreamID) { + delete(m.sendWindowSizes, streamID) +} + func (m *mockFlowControlHandler) MaybeTriggerStreamWindowUpdate(streamID protocol.StreamID) (bool, protocol.ByteCount, error) { return m.triggerStreamWindowUpdate, 0x1337, nil }