add flow control to streamFramer

ref #83
This commit is contained in:
Lucas Clemente
2016-07-08 12:21:06 +02:00
parent 81ae6d44ec
commit 3cbf16b66f
3 changed files with 149 additions and 268 deletions

View File

@@ -80,12 +80,11 @@ func (f *streamFramer) EstimatedDataLen() protocol.ByteCount {
return l
}
// TODO: Maybe remove error return value?
func (f *streamFramer) PopStreamFrame(maxLen protocol.ByteCount) (*frames.StreamFrame, error) {
if frame := f.maybePopFrameForRetransmission(maxLen); frame != nil {
return frame, nil
}
return f.maybePopNormalFrame(maxLen), nil
return f.maybePopNormalFrame(maxLen)
}
func (f *streamFramer) maybePopFrameForRetransmission(maxLen protocol.ByteCount) *frames.StreamFrame {
@@ -110,7 +109,7 @@ func (f *streamFramer) maybePopFrameForRetransmission(maxLen protocol.ByteCount)
return frame
}
func (f *streamFramer) maybePopNormalFrame(maxLen protocol.ByteCount) *frames.StreamFrame {
func (f *streamFramer) maybePopNormalFrame(maxBytes protocol.ByteCount) (*frames.StreamFrame, error) {
frame := &frames.StreamFrame{DataLenPresent: true}
f.streamsMutex.RLock()
defer f.streamsMutex.RUnlock()
@@ -122,25 +121,54 @@ func (f *streamFramer) maybePopNormalFrame(maxLen protocol.ByteCount) *frames.St
frame.StreamID = s.streamID
// not perfect, but thread-safe since writeOffset is only written when getting data
frame.Offset = s.writeOffset
frameHeaderLen, _ := frame.MinLength(protocol.VersionWhatever) // can never error
if maxLen < frameHeaderLen {
frameHeaderBytes, _ := frame.MinLength(protocol.VersionWhatever) // can never error
if maxBytes < frameHeaderBytes {
continue
}
maxLen := maxBytes - frameHeaderBytes
flowControlWindow, err := f.flowControlManager.SendWindowSize(s.streamID)
if err != nil {
return nil, err
}
flowControlWindow -= s.writeOffset
if flowControlWindow < maxLen {
maxLen = flowControlWindow
}
contributes, err := f.flowControlManager.StreamContributesToConnectionFlowControl(s.StreamID())
if err != nil {
return nil, err
}
if contributes {
connectionWindow := f.flowControlManager.RemainingConnectionWindowSize()
if connectionWindow < maxLen {
maxLen = connectionWindow
}
}
if maxLen == 0 {
continue
}
data := s.getDataForWriting(maxLen - frameHeaderLen)
data := s.getDataForWriting(maxLen)
if data == nil {
if s.shouldSendFin() {
frame.FinBit = true
s.sentFin()
return frame
return frame, nil
}
continue
}
frame.Data = data
return frame
if err := f.flowControlManager.AddBytesSent(s.streamID, protocol.ByteCount(len(data))); err != nil {
return nil, err
}
return frame, nil
}
return nil
return nil, nil
}
// maybeSplitOffFrame removes the first n bytes and returns them as a separate frame. If n >= len(frame), nil is returned and nothing is modified.

View File

@@ -16,6 +16,7 @@ var _ = Describe("Stream Framer", func() {
framer *streamFramer
streamsMap map[protocol.StreamID]*stream
stream1, stream2 *stream
fcm *mockFlowControlHandler
)
BeforeEach(func() {
@@ -36,7 +37,7 @@ var _ = Describe("Stream Framer", func() {
11: stream2,
}
fcm := newMockFlowControlHandler()
fcm = newMockFlowControlHandler()
fcm.sendWindowSizes[stream1.streamID] = protocol.MaxByteCount
fcm.sendWindowSizes[stream2.streamID] = protocol.MaxByteCount
fcm.sendWindowSizes[retransmittedFrame1.StreamID] = protocol.MaxByteCount
@@ -133,13 +134,6 @@ var _ = Describe("Stream Framer", func() {
Expect(frame).To(BeNil())
})
It("doesn't add the bytes sent to the FlowControlManager if it was a retransmission", func() {
framer.AddFrameForRetransmission(retransmittedFrame1)
_, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(framer.flowControlManager.(*mockFlowControlHandler).bytesSent).To(BeZero())
})
It("returns normal frames", func() {
stream1.dataForWriting = []byte("foobar")
frame, err := framer.PopStreamFrame(1000)
@@ -281,262 +275,117 @@ var _ = Describe("Stream Framer", func() {
})
})
// It("tells the FlowControlManager how many bytes it sent", func() {
// framer.Push(frame1, false)
// _, err := framer.PopStreamFrame(1000)
// Expect(err).ToNot(HaveOccurred())
// Expect(framer.flowControlManager.(*mockFlowControlHandler).bytesSent).To(Equal(frame1.DataLen()))
// })
//
Context("flow control", func() {
It("tells the FlowControlManager how many bytes it sent", func() {
stream1.dataForWriting = []byte("foobar")
_, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.bytesSent).To(Equal(protocol.ByteCount(6)))
})
// Context("flow control", func() {
// It("returns the whole frame if it fits", func() {
// frame1.Offset = 10
// framer.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 10 + frame1.DataLen()
// framer.Push(frame1, false)
// frame, err := framer.PopStreamFrame(1000)
// Expect(err).ToNot(HaveOccurred())
// Expect(frame).To(Equal(frame1))
// })
//
// It("returns a split frame if the whole frame doesn't fit", func() {
// framer.Push(frame1, false)
// len := frame1.DataLen() - 1
// framer.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = len
// frame, err := framer.PopStreamFrame(1000)
// Expect(err).ToNot(HaveOccurred())
// Expect(frame.DataLen()).To(Equal(protocol.ByteCount(len)))
// })
//
// It("returns a split frame if the whole frame doesn't fit in the stream flow control window, for non-zero StreamFrame offset", func() {
// frame1.Offset = 2
// framer.Push(frame1, false)
// framer.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 4
// frame, err := framer.PopStreamFrame(1000)
// Expect(err).ToNot(HaveOccurred())
// Expect(frame.DataLen()).To(Equal(protocol.ByteCount(2)))
// })
//
// It("returns a split frame if the whole frame doesn't fit in the connection flow control window", func() {
// frame1.Offset = 2
// framer.Push(frame1, false)
// framer.flowControlManager.(*mockFlowControlHandler).streamsContributing = []protocol.StreamID{frame1.StreamID}
// framer.flowControlManager.(*mockFlowControlHandler).remainingConnectionWindowSize = 3
// frame, err := framer.PopStreamFrame(1000)
// Expect(err).ToNot(HaveOccurred())
// Expect(frame.DataLen()).To(Equal(protocol.ByteCount(3)))
// })
//
// It("skips a frame if the stream is flow control blocked", func() {
// framer.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 0
// framer.Push(frame1, false)
// framer.Push(frame2, false)
// frame, err := framer.PopStreamFrame(1000)
// Expect(err).ToNot(HaveOccurred())
// Expect(frame).To(Equal(frame2))
// })
//
// It("skips a frame if the connection is flow control blocked", func() {
// framer.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 10000
// framer.flowControlManager.(*mockFlowControlHandler).streamsContributing = []protocol.StreamID{frame1.StreamID}
// framer.flowControlManager.(*mockFlowControlHandler).remainingConnectionWindowSize = 0
// framer.Push(frame1, false)
// framer.Push(frame2, false)
// frame, err := framer.PopStreamFrame(1000)
// Expect(err).ToNot(HaveOccurred())
// Expect(frame).To(Equal(frame2))
// })
//
// It("returns nil if no stream is not flow control blocked", func() {
// framer.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame1.StreamID] = 0
// framer.flowControlManager.(*mockFlowControlHandler).sendWindowSizes[frame2.StreamID] = 0
// framer.Push(frame1, false)
// framer.Push(frame2, false)
// frame, err := framer.PopStreamFrame(1000)
// Expect(err).ToNot(HaveOccurred())
// Expect(frame).To(BeNil())
// })
// })
// })
It("does not count retransmitted frames as sent bytes", func() {
framer.AddFrameForRetransmission(retransmittedFrame1)
_, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(fcm.bytesSent).To(BeZero())
})
It("returns the whole frame if it fits", func() {
stream1.writeOffset = 10
stream1.dataForWriting = []byte("foobar")
fcm.sendWindowSizes[stream1.streamID] = 10 + 6
frame, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame.DataLen()).To(Equal(protocol.ByteCount(6)))
})
It("returns a smaller frame if the whole frame doesn't fit", func() {
stream1.dataForWriting = []byte("foobar")
fcm.sendWindowSizes[stream1.streamID] = 3
frame, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame.Data).To(Equal([]byte("foo")))
})
It("returns a smaller frame if the whole frame doesn't fit in the stream flow control window, for non-zero StreamFrame offset", func() {
stream1.writeOffset = 1
stream1.dataForWriting = []byte("foobar")
fcm.sendWindowSizes[stream1.StreamID()] = 4
frame, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame.Data).To(Equal([]byte("foo")))
})
It("returns a smaller frame if the whole frame doesn't fit in the connection flow control window", func() {
stream1.dataForWriting = []byte("foobar")
fcm.streamsContributing = []protocol.StreamID{stream1.StreamID()}
fcm.remainingConnectionWindowSize = 3
frame, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame.Data).To(Equal([]byte("foo")))
})
It("ignores the connection flow control window for non-contributing streams", func() {
stream1.dataForWriting = []byte("foobar")
fcm.remainingConnectionWindowSize = 0
frame, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame.Data).To(Equal([]byte("foobar")))
})
It("respects the connection flow control window for contributing streams", func() {
stream1.dataForWriting = []byte("foobar")
fcm.remainingConnectionWindowSize = 0
fcm.streamsContributing = []protocol.StreamID{stream1.StreamID()}
frame, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame).To(BeNil())
})
It("selects a stream that is not flow control blocked", func() {
fcm.sendWindowSizes[stream1.StreamID()] = 0
stream1.dataForWriting = []byte("foobar")
stream2.dataForWriting = []byte("foobaz")
frame, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame.StreamID).To(Equal(stream2.StreamID()))
Expect(frame.Data).To(Equal([]byte("foobaz")))
})
It("chooses a non-contributing stream if the connection is flow control blocked", func() {
stream1.dataForWriting = []byte("foobar")
stream2.dataForWriting = []byte("foobaz")
fcm.streamsContributing = []protocol.StreamID{stream1.StreamID()}
fcm.remainingConnectionWindowSize = 0
frame, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame.StreamID).To(Equal(stream2.StreamID()))
})
It("returns nil if every stream is individually flow control blocked", func() {
fcm.sendWindowSizes[stream1.StreamID()] = 0
fcm.sendWindowSizes[stream2.StreamID()] = 0
stream1.dataForWriting = []byte("foobar")
stream2.dataForWriting = []byte("foobaz")
frame, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame).To(BeNil())
})
It("returns nil if every stream is connection flow control blocked", func() {
fcm.remainingConnectionWindowSize = 0
stream1.dataForWriting = []byte("foobar")
stream2.dataForWriting = []byte("foobaz")
fcm.streamsContributing = []protocol.StreamID{stream1.StreamID(), stream2.StreamID()}
frame, err := framer.PopStreamFrame(1000)
Expect(err).ToNot(HaveOccurred())
Expect(frame).To(BeNil())
})
})
})
// Old stream tests
// It("writes everything if the flow control window is big enough", func() {
// data := []byte{0xDE, 0xCA, 0xFB, 0xAD}
// updated := str.flowController.UpdateSendWindow(4)
// Expect(updated).To(BeTrue())
// n, err := str.Write(data)
// Expect(n).To(Equal(4))
// Expect(err).ToNot(HaveOccurred())
// Expect(handler.frames).To(HaveLen(1))
// Expect(handler.frames[0].Data).To(Equal(data))
// })
//
// It("doesn't care about the connection flow control window if it is not contributing", func() {
// updated := str.flowController.UpdateSendWindow(4)
// Expect(updated).To(BeTrue())
// str.contributesToConnectionFlowControl = false
// updated = str.connectionFlowController.UpdateSendWindow(1)
// Expect(updated).To(BeTrue())
// n, err := str.Write([]byte{0xDE, 0xCA, 0xFB, 0xAD})
// Expect(err).ToNot(HaveOccurred())
// Expect(n).To(Equal(4))
// })
//
// It("returns true when the flow control window was updated", func() {
// updated := str.flowController.UpdateSendWindow(4)
// Expect(updated).To(BeTrue())
// updated = str.UpdateSendFlowControlWindow(5)
// Expect(updated).To(BeTrue())
// })
//
// It("returns false when the flow control window was not updated", func() {
// updated := str.flowController.UpdateSendWindow(4)
// Expect(updated).To(BeTrue())
// updated = str.UpdateSendFlowControlWindow(3)
// Expect(updated).To(BeFalse())
// })
//
// It("waits for a stream flow control window update", func() {
// var b bool
// updated := str.flowController.UpdateSendWindow(1)
// Expect(updated).To(BeTrue())
// _, err := str.Write([]byte{0x42})
// Expect(err).ToNot(HaveOccurred())
//
// go func() {
// time.Sleep(2 * time.Millisecond)
// b = true
// str.UpdateSendFlowControlWindow(3)
// }()
// n, err := str.Write([]byte{0x13, 0x37})
// Expect(err).ToNot(HaveOccurred())
// Expect(b).To(BeTrue())
// Expect(n).To(Equal(2))
// Expect(str.writeOffset).To(Equal(protocol.ByteCount(3)))
// Expect(handler.frames).To(HaveLen(2))
// Expect(handler.frames[0].Offset).To(Equal(protocol.ByteCount(0)))
// Expect(handler.frames[0].Data).To(Equal([]byte{0x42}))
// Expect(handler.frames[1].Offset).To(Equal(protocol.ByteCount(1)))
// Expect(handler.frames[1].Data).To(Equal([]byte{0x13, 0x37}))
// })
//
// It("does not write too much data after receiving a window update", func() {
// var b bool
// updated := str.flowController.UpdateSendWindow(1)
// Expect(updated).To(BeTrue())
//
// go func() {
// time.Sleep(2 * time.Millisecond)
// b = true
// str.UpdateSendFlowControlWindow(5)
// }()
// n, err := str.Write([]byte{0x13, 0x37})
// Expect(b).To(BeTrue())
// Expect(n).To(Equal(2))
// Expect(str.writeOffset).To(Equal(protocol.ByteCount(2)))
// Expect(err).ToNot(HaveOccurred())
// Expect(handler.frames).To(HaveLen(2))
// Expect(handler.frames[0].Data).To(Equal([]byte{0x13}))
// Expect(handler.frames[1].Data).To(Equal([]byte{0x37}))
// })
//
// It("waits for a connection flow control window update", func() {
// var b bool
// updated := str.flowController.UpdateSendWindow(1000)
// Expect(updated).To(BeTrue())
// updated = str.connectionFlowController.UpdateSendWindow(1)
// Expect(updated).To(BeTrue())
// str.contributesToConnectionFlowControl = true
//
// _, err := str.Write([]byte{0x42})
// Expect(err).ToNot(HaveOccurred())
// Expect(str.writeOffset).To(Equal(protocol.ByteCount(1)))
//
// var sendWindowUpdated bool
// go func() {
// time.Sleep(2 * time.Millisecond)
// b = true
// sendWindowUpdated = str.connectionFlowController.UpdateSendWindow(3)
// str.ConnectionFlowControlWindowUpdated()
// }()
//
// n, err := str.Write([]byte{0x13, 0x37})
// Expect(b).To(BeTrue())
// Expect(sendWindowUpdated).To(BeTrue())
// Expect(n).To(Equal(2))
// Expect(str.writeOffset).To(Equal(protocol.ByteCount(3)))
// Expect(err).ToNot(HaveOccurred())
// })
//
// It("splits writing of frames when given more data than the flow control windows size", func() {
// updated := str.flowController.UpdateSendWindow(2)
// Expect(updated).To(BeTrue())
// var b bool
//
// go func() {
// time.Sleep(time.Millisecond)
// b = true
// str.UpdateSendFlowControlWindow(4)
// }()
//
// n, err := str.Write([]byte{0xDE, 0xCA, 0xFB, 0xAD})
// Expect(err).ToNot(HaveOccurred())
// Expect(handler.frames).To(HaveLen(2))
// Expect(b).To(BeTrue())
// Expect(n).To(Equal(4))
// Expect(str.writeOffset).To(Equal(protocol.ByteCount(4)))
// })
//
// It("writes after a flow control window update", func() {
// var b bool
// updated := str.flowController.UpdateSendWindow(1)
// Expect(updated).To(BeTrue())
//
// _, err := str.Write([]byte{0x42})
// Expect(err).ToNot(HaveOccurred())
//
// go func() {
// time.Sleep(time.Millisecond)
// b = true
// str.UpdateSendFlowControlWindow(3)
// }()
// n, err := str.Write([]byte{0xDE, 0xAD})
// Expect(err).ToNot(HaveOccurred())
// Expect(b).To(BeTrue())
// Expect(n).To(Equal(2))
// Expect(str.writeOffset).To(Equal(protocol.ByteCount(3)))
// })
//
// It("immediately returns on remote errors", func() {
// var b bool
// updated := str.flowController.UpdateSendWindow(1)
// Expect(updated).To(BeTrue())
//
// testErr := errors.New("test error")
//
// go func() {
// time.Sleep(time.Millisecond)
// b = true
// str.RegisterError(testErr)
// }()
//
// _, err := str.Write([]byte{0xDE, 0xCA, 0xFB, 0xAD})
// Expect(err).To(MatchError(testErr))
// Expect(b).To(BeTrue())
// })
//
// It("works with large flow control windows", func() {
// // This paniced before due to a wrong cast,
// // see https://github.com/lucas-clemente/quic-go/issues/143
// str.contributesToConnectionFlowControl = false
// updated := str.UpdateSendFlowControlWindow(protocol.ByteCount(1) << 63)
// Expect(updated).To(BeTrue())
// _, err := str.Write([]byte("foobar"))
// Expect(err).NotTo(HaveOccurred())
// })
// PContext("Blocked streams", func() {
// It("notifies the session when a stream is flow control blocked", func() {
// updated, err := str.flowControlManager.UpdateWindow(str.streamID, 1337)

View File

@@ -342,6 +342,10 @@ var _ = Describe("Stream", func() {
Expect(err).To(MatchError(testErr))
close(done)
})
It("getDataForWriting returns nil if no data is available", func() {
Expect(str.getDataForWriting(1000)).To(BeNil())
})
})
Context("closing", func() {