From 71227437ee67149c79912208036dc0d80c13891d Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 14 Jan 2017 15:16:12 +0700 Subject: [PATCH] only retransmit WindowUpdates if no higher WindowUpdate has been sent fixes #394 --- flowcontrol/flow_control_manager.go | 10 +++++ flowcontrol/interface.go | 1 + session.go | 8 ++++ session_test.go | 65 +++++++++++++++++++++++++++++ stream_test.go | 5 +++ 5 files changed, 89 insertions(+) diff --git a/flowcontrol/flow_control_manager.go b/flowcontrol/flow_control_manager.go index 9b2616ffd..a6cc577d5 100644 --- a/flowcontrol/flow_control_manager.go +++ b/flowcontrol/flow_control_manager.go @@ -162,6 +162,16 @@ func (f *flowControlManager) GetWindowUpdates() (res []WindowUpdate) { return res } +func (f *flowControlManager) GetReceiveWindow(streamID protocol.StreamID) (protocol.ByteCount, error) { + f.mutex.Lock() + defer f.mutex.Unlock() + flowController, err := f.getFlowController(streamID) + if err != nil { + return 0, err + } + return flowController.receiveFlowControlWindow, nil +} + // streamID must not be 0 here func (f *flowControlManager) AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error { // Only lock the part reading from the map, since send-windows are only accessed from the session goroutine. diff --git a/flowcontrol/interface.go b/flowcontrol/interface.go index f98f09744..e1ea3fac6 100644 --- a/flowcontrol/interface.go +++ b/flowcontrol/interface.go @@ -17,6 +17,7 @@ type FlowControlManager interface { UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error GetWindowUpdates() []WindowUpdate + GetReceiveWindow(streamID protocol.StreamID) (protocol.ByteCount, error) // methods needed for sending data AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error) diff --git a/session.go b/session.go index cefe6369e..0f911f815 100644 --- a/session.go +++ b/session.go @@ -501,6 +501,14 @@ func (s *Session) sendPacket() error { switch frame.(type) { case *frames.StreamFrame: s.streamFramer.AddFrameForRetransmission(frame.(*frames.StreamFrame)) + case *frames.WindowUpdateFrame: + // only retransmit WindowUpdates if the stream is not yet closed and the we haven't sent another WindowUpdate with a higher ByteOffset for the stream + var currentOffset protocol.ByteCount + f := frame.(*frames.WindowUpdateFrame) + currentOffset, err = s.flowControlManager.GetReceiveWindow(f.StreamID) + if err == nil && f.ByteOffset >= currentOffset { + controlFrames = append(controlFrames, frame) + } default: controlFrames = append(controlFrames, frame) } diff --git a/session_test.go b/session_test.go index 30b04f988..84da7b878 100644 --- a/session_test.go +++ b/session_test.go @@ -735,6 +735,71 @@ var _ = Describe("Session", func() { Expect(err).NotTo(HaveOccurred()) Expect(sph.(*mockSentPacketHandler).maybeQueueRTOsCalled).To(BeTrue()) }) + + It("retransmits a WindowUpdates if it hasn't already sent a WindowUpdate with a higher ByteOffset", func() { + _, err := session.GetOrOpenStream(5) + Expect(err).ToNot(HaveOccurred()) + fc := newMockFlowControlHandler() + fc.receiveWindow = 0x1000 + session.flowControlManager = fc + sph := newMockSentPacketHandler() + session.sentPacketHandler = sph + wuf := &frames.WindowUpdateFrame{ + StreamID: 5, + ByteOffset: 0x1000, + } + sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{ + Frames: []frames.Frame{wuf}, + }} + err = session.sendPacket() + Expect(err).ToNot(HaveOccurred()) + sentPackets := sph.(*mockSentPacketHandler).sentPackets + Expect(sentPackets).To(HaveLen(1)) + Expect(sentPackets[0].Frames).To(ContainElement(wuf)) + }) + + It("doesn't retransmit WindowUpdates if it already sent a WindowUpdate with a higher ByteOffset", func() { + _, err := session.GetOrOpenStream(5) + Expect(err).ToNot(HaveOccurred()) + fc := newMockFlowControlHandler() + fc.receiveWindow = 0x2000 + session.flowControlManager = fc + sph := newMockSentPacketHandler() + session.sentPacketHandler = sph + sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{ + Frames: []frames.Frame{&frames.WindowUpdateFrame{ + StreamID: 5, + ByteOffset: 0x1000, + }}, + }} + err = session.sendPacket() + Expect(err).ToNot(HaveOccurred()) + Expect(sph.(*mockSentPacketHandler).sentPackets).To(BeEmpty()) + }) + + It("doesn't retransmit WindowUpdates for closed streams", func() { + str, err := session.GetOrOpenStream(5) + Expect(err).ToNot(HaveOccurred()) + // close the stream + str.(*stream).sentFin() + str.Close() + str.(*stream).RegisterRemoteError(nil) + session.garbageCollectStreams() + _, err = session.flowControlManager.SendWindowSize(5) + Expect(err).To(MatchError("Error accessing the flowController map.")) + sph := newMockSentPacketHandler() + session.sentPacketHandler = sph + sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{ + Frames: []frames.Frame{&frames.WindowUpdateFrame{ + StreamID: 5, + ByteOffset: 0x1337, + }}, + }} + err = session.sendPacket() + Expect(err).ToNot(HaveOccurred()) + sentPackets := sph.(*mockSentPacketHandler).sentPackets + Expect(sentPackets).To(BeEmpty()) + }) }) Context("scheduling sending", func() { diff --git a/stream_test.go b/stream_test.go index 9a3265f5d..efe99c9ed 100644 --- a/stream_test.go +++ b/stream_test.go @@ -23,6 +23,7 @@ type mockFlowControlHandler struct { bytesRead protocol.ByteCount bytesSent protocol.ByteCount + receiveWindow protocol.ByteCount highestReceivedForStream protocol.StreamID highestReceived protocol.ByteCount flowControlViolation error @@ -57,6 +58,10 @@ func (m *mockFlowControlHandler) GetWindowUpdates() (res []flowcontrol.WindowUpd return res } +func (m *mockFlowControlHandler) GetReceiveWindow(protocol.StreamID) (protocol.ByteCount, error) { + return m.receiveWindow, nil +} + func (m *mockFlowControlHandler) AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error { m.bytesReadForStream = streamID m.bytesRead = n