delete flow controllers for closed streams

fixes #177
This commit is contained in:
Lucas Clemente
2016-07-28 16:45:55 +02:00
parent 5a82b35ca8
commit 3a88a8cffa
6 changed files with 26 additions and 1 deletions

View File

@@ -51,6 +51,14 @@ func (f *flowControlManager) NewStream(streamID protocol.StreamID, contributesTo
f.contributesToConnectionFlowControl[streamID] = contributesToConnectionFlow f.contributesToConnectionFlowControl[streamID] = contributesToConnectionFlow
} }
// RemoveStream removes a closed stream from flow control
func (f *flowControlManager) RemoveStream(streamID protocol.StreamID) {
f.mutex.Lock()
delete(f.streamFlowController, streamID)
delete(f.contributesToConnectionFlowControl, streamID)
f.mutex.Unlock()
}
// UpdateHighestReceived updates the highest received byte offset for a stream // UpdateHighestReceived updates the highest received byte offset for a stream
// it adds the number of additional bytes to connection level flow control // it adds the number of additional bytes to connection level flow control
// streamID must not be 0 here // streamID must not be 0 here

View File

@@ -33,6 +33,14 @@ var _ = Describe("Flow Control Manager", func() {
}) })
}) })
It("removes streams", func() {
fcm.NewStream(5, true)
Expect(fcm.streamFlowController).To(HaveKey(protocol.StreamID(5)))
fcm.RemoveStream(5)
Expect(fcm.streamFlowController).ToNot(HaveKey(protocol.StreamID(5)))
Expect(fcm.contributesToConnectionFlowControl).ToNot(HaveKey(protocol.StreamID(5)))
})
Context("receiving data", func() { Context("receiving data", func() {
BeforeEach(func() { BeforeEach(func() {
fcm.NewStream(1, false) fcm.NewStream(1, false)

View File

@@ -5,6 +5,7 @@ import "github.com/lucas-clemente/quic-go/protocol"
// A FlowControlManager manages the flow control // A FlowControlManager manages the flow control
type FlowControlManager interface { type FlowControlManager interface {
NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool) NewStream(streamID protocol.StreamID, contributesToConnectionFlow bool)
RemoveStream(streamID protocol.StreamID)
// methods needed for receiving data // methods needed for receiving data
UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error
AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error

View File

@@ -621,6 +621,7 @@ func (s *Session) garbageCollectStreams() {
utils.Debugf("Garbage-collecting stream %d", k) utils.Debugf("Garbage-collecting stream %d", k)
atomic.AddUint32(&s.openStreamsCount, ^uint32(0)) // decrement atomic.AddUint32(&s.openStreamsCount, ^uint32(0)) // decrement
s.streams[k] = nil s.streams[k] = nil
s.flowControlManager.RemoveStream(k)
} }
} }
} }

View File

@@ -186,7 +186,7 @@ var _ = Describe("Session", func() {
Expect(session.streams[5]).ToNot(BeNil()) Expect(session.streams[5]).ToNot(BeNil())
}) })
It("closes streams with FIN bit & close", func() { It("deletes streams with FIN bit & close", func() {
session.handleStreamFrame(&frames.StreamFrame{ session.handleStreamFrame(&frames.StreamFrame{
StreamID: 5, StreamID: 5,
Data: []byte{0xde, 0xca, 0xfb, 0xad}, Data: []byte{0xde, 0xca, 0xfb, 0xad},
@@ -209,6 +209,9 @@ var _ = Describe("Session", func() {
session.garbageCollectStreams() session.garbageCollectStreams()
Expect(session.streams).To(HaveLen(2)) Expect(session.streams).To(HaveLen(2))
Expect(session.streams[5]).To(BeNil()) Expect(session.streams[5]).To(BeNil())
// flow controller should have been notified
_, err = session.flowControlManager.SendWindowSize(5)
Expect(err).To(MatchError("Error accessing the flowController map."))
}) })
It("closes streams with error", func() { It("closes streams with error", func() {

View File

@@ -40,6 +40,10 @@ func (m *mockFlowControlHandler) NewStream(streamID protocol.StreamID, contribut
panic("not implemented") panic("not implemented")
} }
func (m *mockFlowControlHandler) RemoveStream(streamID protocol.StreamID) {
delete(m.sendWindowSizes, streamID)
}
func (m *mockFlowControlHandler) MaybeTriggerStreamWindowUpdate(streamID protocol.StreamID) (bool, protocol.ByteCount, error) { func (m *mockFlowControlHandler) MaybeTriggerStreamWindowUpdate(streamID protocol.StreamID) (bool, protocol.ByteCount, error) {
return m.triggerStreamWindowUpdate, 0x1337, nil return m.triggerStreamWindowUpdate, 0x1337, nil
} }