only retransmit WindowUpdates if no higher WindowUpdate has been sent

fixes #394
This commit is contained in:
Marten Seemann
2017-01-14 15:16:12 +07:00
parent db22dae089
commit 71227437ee
5 changed files with 89 additions and 0 deletions

View File

@@ -162,6 +162,16 @@ func (f *flowControlManager) GetWindowUpdates() (res []WindowUpdate) {
return res
}
func (f *flowControlManager) GetReceiveWindow(streamID protocol.StreamID) (protocol.ByteCount, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
flowController, err := f.getFlowController(streamID)
if err != nil {
return 0, err
}
return flowController.receiveFlowControlWindow, nil
}
// streamID must not be 0 here
func (f *flowControlManager) AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error {
// Only lock the part reading from the map, since send-windows are only accessed from the session goroutine.

View File

@@ -17,6 +17,7 @@ type FlowControlManager interface {
UpdateHighestReceived(streamID protocol.StreamID, byteOffset protocol.ByteCount) error
AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error
GetWindowUpdates() []WindowUpdate
GetReceiveWindow(streamID protocol.StreamID) (protocol.ByteCount, error)
// methods needed for sending data
AddBytesSent(streamID protocol.StreamID, n protocol.ByteCount) error
SendWindowSize(streamID protocol.StreamID) (protocol.ByteCount, error)

View File

@@ -501,6 +501,14 @@ func (s *Session) sendPacket() error {
switch frame.(type) {
case *frames.StreamFrame:
s.streamFramer.AddFrameForRetransmission(frame.(*frames.StreamFrame))
case *frames.WindowUpdateFrame:
// only retransmit WindowUpdates if the stream is not yet closed and the we haven't sent another WindowUpdate with a higher ByteOffset for the stream
var currentOffset protocol.ByteCount
f := frame.(*frames.WindowUpdateFrame)
currentOffset, err = s.flowControlManager.GetReceiveWindow(f.StreamID)
if err == nil && f.ByteOffset >= currentOffset {
controlFrames = append(controlFrames, frame)
}
default:
controlFrames = append(controlFrames, frame)
}

View File

@@ -735,6 +735,71 @@ var _ = Describe("Session", func() {
Expect(err).NotTo(HaveOccurred())
Expect(sph.(*mockSentPacketHandler).maybeQueueRTOsCalled).To(BeTrue())
})
It("retransmits a WindowUpdates if it hasn't already sent a WindowUpdate with a higher ByteOffset", func() {
_, err := session.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
fc := newMockFlowControlHandler()
fc.receiveWindow = 0x1000
session.flowControlManager = fc
sph := newMockSentPacketHandler()
session.sentPacketHandler = sph
wuf := &frames.WindowUpdateFrame{
StreamID: 5,
ByteOffset: 0x1000,
}
sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{wuf},
}}
err = session.sendPacket()
Expect(err).ToNot(HaveOccurred())
sentPackets := sph.(*mockSentPacketHandler).sentPackets
Expect(sentPackets).To(HaveLen(1))
Expect(sentPackets[0].Frames).To(ContainElement(wuf))
})
It("doesn't retransmit WindowUpdates if it already sent a WindowUpdate with a higher ByteOffset", func() {
_, err := session.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
fc := newMockFlowControlHandler()
fc.receiveWindow = 0x2000
session.flowControlManager = fc
sph := newMockSentPacketHandler()
session.sentPacketHandler = sph
sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{&frames.WindowUpdateFrame{
StreamID: 5,
ByteOffset: 0x1000,
}},
}}
err = session.sendPacket()
Expect(err).ToNot(HaveOccurred())
Expect(sph.(*mockSentPacketHandler).sentPackets).To(BeEmpty())
})
It("doesn't retransmit WindowUpdates for closed streams", func() {
str, err := session.GetOrOpenStream(5)
Expect(err).ToNot(HaveOccurred())
// close the stream
str.(*stream).sentFin()
str.Close()
str.(*stream).RegisterRemoteError(nil)
session.garbageCollectStreams()
_, err = session.flowControlManager.SendWindowSize(5)
Expect(err).To(MatchError("Error accessing the flowController map."))
sph := newMockSentPacketHandler()
session.sentPacketHandler = sph
sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandler.Packet{{
Frames: []frames.Frame{&frames.WindowUpdateFrame{
StreamID: 5,
ByteOffset: 0x1337,
}},
}}
err = session.sendPacket()
Expect(err).ToNot(HaveOccurred())
sentPackets := sph.(*mockSentPacketHandler).sentPackets
Expect(sentPackets).To(BeEmpty())
})
})
Context("scheduling sending", func() {

View File

@@ -23,6 +23,7 @@ type mockFlowControlHandler struct {
bytesRead protocol.ByteCount
bytesSent protocol.ByteCount
receiveWindow protocol.ByteCount
highestReceivedForStream protocol.StreamID
highestReceived protocol.ByteCount
flowControlViolation error
@@ -57,6 +58,10 @@ func (m *mockFlowControlHandler) GetWindowUpdates() (res []flowcontrol.WindowUpd
return res
}
func (m *mockFlowControlHandler) GetReceiveWindow(protocol.StreamID) (protocol.ByteCount, error) {
return m.receiveWindow, nil
}
func (m *mockFlowControlHandler) AddBytesRead(streamID protocol.StreamID, n protocol.ByteCount) error {
m.bytesReadForStream = streamID
m.bytesRead = n