diff --git a/internal/flowcontrol/base_flow_controller.go b/internal/flowcontrol/base_flow_controller.go index d4e5c8a54..b4da7d6df 100644 --- a/internal/flowcontrol/base_flow_controller.go +++ b/internal/flowcontrol/base_flow_controller.go @@ -11,8 +11,9 @@ import ( type baseFlowController struct { // for sending data - bytesSent protocol.ByteCount - sendWindow protocol.ByteCount + bytesSent protocol.ByteCount + sendWindow protocol.ByteCount + lastBlockedAt protocol.ByteCount // for receiving data mutex sync.RWMutex @@ -72,12 +73,14 @@ func (c *baseFlowController) getWindowUpdate() protocol.ByteCount { return c.receiveWindow } -// IsBlocked says if it is blocked by flow control. +// IsBlocked says if it is newly blocked by flow control. +// For every offset, it only returns true once. // If it is blocked, the offset is returned. -func (c *baseFlowController) IsBlocked() (bool, protocol.ByteCount) { - if c.sendWindowSize() != 0 { +func (c *baseFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) { + if c.sendWindowSize() != 0 || c.sendWindow == c.lastBlockedAt { return false, 0 } + c.lastBlockedAt = c.sendWindow return true, c.sendWindow } diff --git a/internal/flowcontrol/base_flow_controller_test.go b/internal/flowcontrol/base_flow_controller_test.go index 790d31f3e..7869f2a61 100644 --- a/internal/flowcontrol/base_flow_controller_test.go +++ b/internal/flowcontrol/base_flow_controller_test.go @@ -52,12 +52,26 @@ var _ = Describe("Base Flow controller", func() { It("says when it's blocked", func() { controller.UpdateSendWindow(100) - Expect(controller.IsBlocked()).To(BeFalse()) + Expect(controller.IsNewlyBlocked()).To(BeFalse()) controller.AddBytesSent(100) - blocked, offset := controller.IsBlocked() + blocked, offset := controller.IsNewlyBlocked() Expect(blocked).To(BeTrue()) Expect(offset).To(Equal(protocol.ByteCount(100))) }) + + It("doesn't say that it's newly blocked multiple times for the same offset", func() { + controller.UpdateSendWindow(100) + controller.AddBytesSent(100) + newlyBlocked, offset := controller.IsNewlyBlocked() + Expect(newlyBlocked).To(BeTrue()) + Expect(offset).To(Equal(protocol.ByteCount(100))) + newlyBlocked, _ = controller.IsNewlyBlocked() + Expect(newlyBlocked).To(BeFalse()) + controller.UpdateSendWindow(150) + controller.AddBytesSent(150) + newlyBlocked, offset = controller.IsNewlyBlocked() + Expect(newlyBlocked).To(BeTrue()) + }) }) Context("receive flow control", func() { diff --git a/internal/flowcontrol/interface.go b/internal/flowcontrol/interface.go index f55772830..a753b4e45 100644 --- a/internal/flowcontrol/interface.go +++ b/internal/flowcontrol/interface.go @@ -5,7 +5,7 @@ import "github.com/lucas-clemente/quic-go/internal/protocol" type flowController interface { // for sending SendWindowSize() protocol.ByteCount - IsBlocked() (bool, protocol.ByteCount) + IsNewlyBlocked() (bool, protocol.ByteCount) UpdateSendWindow(protocol.ByteCount) AddBytesSent(protocol.ByteCount) // for receiving diff --git a/internal/flowcontrol/stream_flow_controller_test.go b/internal/flowcontrol/stream_flow_controller_test.go index 5938e5e65..35e07b18c 100644 --- a/internal/flowcontrol/stream_flow_controller_test.go +++ b/internal/flowcontrol/stream_flow_controller_test.go @@ -233,9 +233,9 @@ var _ = Describe("Stream Flow controller", func() { controller.connection.UpdateSendWindow(50) controller.UpdateSendWindow(100) controller.AddBytesSent(50) - blocked, _ := controller.connection.IsBlocked() + blocked, _ := controller.connection.IsNewlyBlocked() Expect(blocked).To(BeTrue()) - Expect(controller.IsBlocked()).To(BeFalse()) + Expect(controller.IsNewlyBlocked()).To(BeFalse()) }) }) }) diff --git a/internal/mocks/connection_flow_controller.go b/internal/mocks/connection_flow_controller.go index cf99e9d12..6a54f829a 100644 --- a/internal/mocks/connection_flow_controller.go +++ b/internal/mocks/connection_flow_controller.go @@ -65,17 +65,17 @@ func (_mr *MockConnectionFlowControllerMockRecorder) GetWindowUpdate() *gomock.C return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetWindowUpdate", reflect.TypeOf((*MockConnectionFlowController)(nil).GetWindowUpdate)) } -// IsBlocked mocks base method -func (_m *MockConnectionFlowController) IsBlocked() (bool, protocol.ByteCount) { - ret := _m.ctrl.Call(_m, "IsBlocked") +// IsNewlyBlocked mocks base method +func (_m *MockConnectionFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) { + ret := _m.ctrl.Call(_m, "IsNewlyBlocked") ret0, _ := ret[0].(bool) ret1, _ := ret[1].(protocol.ByteCount) return ret0, ret1 } -// IsBlocked indicates an expected call of IsBlocked -func (_mr *MockConnectionFlowControllerMockRecorder) IsBlocked() *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsBlocked", reflect.TypeOf((*MockConnectionFlowController)(nil).IsBlocked)) +// IsNewlyBlocked indicates an expected call of IsNewlyBlocked +func (_mr *MockConnectionFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Call { + return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockConnectionFlowController)(nil).IsNewlyBlocked)) } // SendWindowSize mocks base method diff --git a/internal/mocks/stream_flow_controller.go b/internal/mocks/stream_flow_controller.go index 8bca7b9e4..f5c49f574 100644 --- a/internal/mocks/stream_flow_controller.go +++ b/internal/mocks/stream_flow_controller.go @@ -65,17 +65,17 @@ func (_mr *MockStreamFlowControllerMockRecorder) GetWindowUpdate() *gomock.Call return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "GetWindowUpdate", reflect.TypeOf((*MockStreamFlowController)(nil).GetWindowUpdate)) } -// IsBlocked mocks base method -func (_m *MockStreamFlowController) IsBlocked() (bool, protocol.ByteCount) { - ret := _m.ctrl.Call(_m, "IsBlocked") +// IsNewlyBlocked mocks base method +func (_m *MockStreamFlowController) IsNewlyBlocked() (bool, protocol.ByteCount) { + ret := _m.ctrl.Call(_m, "IsNewlyBlocked") ret0, _ := ret[0].(bool) ret1, _ := ret[1].(protocol.ByteCount) return ret0, ret1 } -// IsBlocked indicates an expected call of IsBlocked -func (_mr *MockStreamFlowControllerMockRecorder) IsBlocked() *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsBlocked", reflect.TypeOf((*MockStreamFlowController)(nil).IsBlocked)) +// IsNewlyBlocked indicates an expected call of IsNewlyBlocked +func (_mr *MockStreamFlowControllerMockRecorder) IsNewlyBlocked() *gomock.Call { + return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "IsNewlyBlocked", reflect.TypeOf((*MockStreamFlowController)(nil).IsNewlyBlocked)) } // SendWindowSize mocks base method diff --git a/packet_packer.go b/packet_packer.go index 01681d2a5..1a56e196b 100644 --- a/packet_packer.go +++ b/packet_packer.go @@ -265,17 +265,9 @@ func (p *packetPacker) composeNextPacket( fs[len(fs)-1].DataLenPresent = false } - // TODO: Simplify for _, f := range fs { payloadFrames = append(payloadFrames, f) } - - for b := p.streamFramer.PopBlockedFrame(); b != nil; b = p.streamFramer.PopBlockedFrame() { - p.controlFrameMutex.Lock() - p.controlFrames = append(p.controlFrames, b) - p.controlFrameMutex.Unlock() - } - return payloadFrames, nil } diff --git a/packet_packer_test.go b/packet_packer_test.go index e4c80fcb3..f1f5023dd 100644 --- a/packet_packer_test.go +++ b/packet_packer_test.go @@ -63,7 +63,7 @@ var _ = Describe("Packet packer", func() { version := versionGQUICFrames cryptoStream = newCryptoStream(func() {}, flowcontrol.NewStreamFlowController(version.CryptoStreamID(), false, flowcontrol.NewConnectionFlowController(1000, 1000, nil), 1000, 1000, 1000, nil), version) streamsMap := newStreamsMap(nil, protocol.PerspectiveServer, versionGQUICFrames) - streamFramer = newStreamFramer(cryptoStream, streamsMap, nil, versionGQUICFrames) + streamFramer = newStreamFramer(cryptoStream, streamsMap, versionGQUICFrames) packer = &packetPacker{ cryptoSetup: &mockCryptoSetup{encLevelSeal: protocol.EncryptionForwardSecure}, @@ -690,47 +690,6 @@ var _ = Describe("Packet packer", func() { }) }) - Context("BLOCKED frames", func() { - It("queues a BLOCKED frame", func() { - length := 100 - streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{Offset: 555}} - f := &wire.StreamFrame{ - StreamID: 5, - Data: bytes.Repeat([]byte{'f'}, length), - } - streamFramer.AddFrameForRetransmission(f) - _, err := packer.composeNextPacket(maxFrameSize, true) - Expect(err).ToNot(HaveOccurred()) - Expect(packer.controlFrames[0]).To(Equal(&wire.BlockedFrame{Offset: 555})) - }) - - It("removes the dataLen attribute from the last STREAM frame, even if it queued a BLOCKED frame", func() { - length := 100 - streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{Offset: 50}} - f := &wire.StreamFrame{ - StreamID: 5, - Data: bytes.Repeat([]byte{'f'}, length), - } - streamFramer.AddFrameForRetransmission(f) - p, err := packer.composeNextPacket(maxFrameSize, true) - Expect(err).ToNot(HaveOccurred()) - Expect(p).To(HaveLen(1)) - Expect(p[0].(*wire.StreamFrame).DataLenPresent).To(BeFalse()) - }) - - It("packs a connection-level BlockedFrame", func() { - streamFramer.blockedFrameQueue = []*wire.BlockedFrame{&wire.BlockedFrame{}} - f := &wire.StreamFrame{ - StreamID: 5, - Data: []byte("foobar"), - } - streamFramer.AddFrameForRetransmission(f) - _, err := packer.composeNextPacket(maxFrameSize, true) - Expect(err).ToNot(HaveOccurred()) - Expect(packer.controlFrames[0]).To(Equal(&wire.BlockedFrame{})) - }) - }) - It("returns nil if we only have a single STOP_WAITING", func() { packer.QueueControlFrame(&wire.StopWaitingFrame{}) p, err := packer.PackPacket() diff --git a/send_stream.go b/send_stream.go index f09e975f5..b3018fcee 100644 --- a/send_stream.go +++ b/send_stream.go @@ -146,7 +146,7 @@ func (s *sendStream) PopStreamFrame(maxBytes protocol.ByteCount) *wire.StreamFra if frame.FinBit { s.finSent = true } else if s.streamID != s.version.CryptoStreamID() { // TODO(#657): Flow control for the crypto stream - if isBlocked, offset := s.flowController.IsBlocked(); isBlocked { + if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked { s.queueControlFrame(&wire.StreamBlockedFrame{ StreamID: s.streamID, Offset: offset, diff --git a/send_stream_test.go b/send_stream_test.go index c80d91c90..01bded602 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -49,7 +49,7 @@ var _ = Describe("Send Stream", func() { It("writes and gets all data at once", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) - mockFC.EXPECT().IsBlocked() + mockFC.EXPECT().IsNewlyBlocked() done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -78,7 +78,7 @@ var _ = Describe("Send Stream", func() { frameHeaderLen := protocol.ByteCount(4) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) mockFC.EXPECT().AddBytesSent(gomock.Any() /* protocol.ByteCount(3)*/).Times(2) - mockFC.EXPECT().IsBlocked().Times(2) + mockFC.EXPECT().IsNewlyBlocked().Times(2) done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -115,7 +115,7 @@ var _ = Describe("Send Stream", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2)) - mockFC.EXPECT().IsBlocked().Times(2) + mockFC.EXPECT().IsNewlyBlocked().Times(2) s := []byte("foo") go func() { defer GinkgoRecover() @@ -155,7 +155,7 @@ var _ = Describe("Send Stream", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) // don't use offset 6 here, to make sure the BLOCKED frame contains the number returned by the flow controller - mockFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(10)) + mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(10)) done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -181,7 +181,7 @@ var _ = Describe("Send Stream", func() { It("doesn't queue a BLOCKED frame if the stream is flow control blocked, but the frame popped has the FIN bit set", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) - // don't EXPECT a call to IsBlocked + // don't EXPECT a call to IsNewlyBlocked done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -222,7 +222,7 @@ var _ = Describe("Send Stream", func() { It("returns the number of bytes written, when the deadline expires", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes() mockFC.EXPECT().AddBytesSent(gomock.Any()) - mockFC.EXPECT().IsBlocked() + mockFC.EXPECT().IsNewlyBlocked() deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) str.SetWriteDeadline(deadline) var n int @@ -299,7 +299,7 @@ var _ = Describe("Send Stream", func() { frameHeaderLen := protocol.ByteCount(4) mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2) - mockFC.EXPECT().IsBlocked() + mockFC.EXPECT().IsNewlyBlocked() str.dataForWriting = []byte("foobar") str.Close() f := str.PopStreamFrame(3 + frameHeaderLen) @@ -340,7 +340,7 @@ var _ = Describe("Send Stream", func() { It("doesn't get data for writing if an error occurred", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) mockFC.EXPECT().AddBytesSent(gomock.Any()) - mockFC.EXPECT().IsBlocked() + mockFC.EXPECT().IsNewlyBlocked() done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -380,7 +380,7 @@ var _ = Describe("Send Stream", func() { It("unblocks Write", func() { mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) mockFC.EXPECT().AddBytesSent(gomock.Any()) - mockFC.EXPECT().IsBlocked() + mockFC.EXPECT().IsNewlyBlocked() writeReturned := make(chan struct{}) var n int go func() { diff --git a/session.go b/session.go index 2caf73ae8..d570c59bb 100644 --- a/session.go +++ b/session.go @@ -314,7 +314,7 @@ func (s *session) postSetup(initialPacketNumber protocol.PacketNumber) error { s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.version) s.streamsMap = newStreamsMap(s.newStream, s.perspective, s.version) - s.streamFramer = newStreamFramer(s.cryptoStream, s.streamsMap, s.connFlowController, s.version) + s.streamFramer = newStreamFramer(s.cryptoStream, s.streamsMap, s.version) s.packer = newPacketPacker(s.connectionID, initialPacketNumber, @@ -721,6 +721,9 @@ func (s *session) sendPacket() error { for _, f := range s.getWindowUpdates() { s.packer.QueueControlFrame(f) } + if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked { + s.packer.QueueControlFrame(&wire.BlockedFrame{Offset: offset}) + } ack := s.receivedPacketHandler.GetAckFrame() if ack != nil { diff --git a/session_test.go b/session_test.go index 74cbaa26f..34f6306bc 100644 --- a/session_test.go +++ b/session_test.go @@ -857,6 +857,27 @@ var _ = Describe("Session", func() { Expect(mconn.written).To(HaveLen(1)) }) + It("adds a BLOCKED frame when it is connection-level flow control blocked", func() { + fc := mocks.NewMockConnectionFlowController(mockCtrl) + fc.EXPECT().GetWindowUpdate() + fc.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(1337)) + sess.connFlowController = fc + sph := mocks.NewMockSentPacketHandler(mockCtrl) + sph.EXPECT().GetLeastUnacked().AnyTimes() + sph.EXPECT().SendingAllowed().Return(true) + sph.EXPECT().SendingAllowed() + sph.EXPECT().DequeuePacketForRetransmission() + sph.EXPECT().ShouldSendRetransmittablePacket() + sph.EXPECT().SentPacket(gomock.Any()).Do(func(p *ackhandler.Packet) { + Expect(p.Frames).To(Equal([]wire.Frame{ + &wire.BlockedFrame{Offset: 1337}, + })) + }) + sess.sentPacketHandler = sph + err := sess.sendPacket() + Expect(err).ToNot(HaveOccurred()) + }) + It("sends public reset", func() { err := sess.sendPublicReset(1) Expect(err).NotTo(HaveOccurred()) diff --git a/stream_framer.go b/stream_framer.go index 8e032f456..bb14ab112 100644 --- a/stream_framer.go +++ b/stream_framer.go @@ -1,7 +1,6 @@ package quic import ( - "github.com/lucas-clemente/quic-go/internal/flowcontrol" "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/wire" ) @@ -11,23 +10,18 @@ type streamFramer struct { cryptoStream cryptoStreamI version protocol.VersionNumber - connFlowController flowcontrol.ConnectionFlowController - retransmissionQueue []*wire.StreamFrame - blockedFrameQueue []*wire.BlockedFrame } func newStreamFramer( cryptoStream cryptoStreamI, streamsMap *streamsMap, - cfc flowcontrol.ConnectionFlowController, v protocol.VersionNumber, ) *streamFramer { return &streamFramer{ - streamsMap: streamsMap, - cryptoStream: cryptoStream, - connFlowController: cfc, - version: v, + streamsMap: streamsMap, + cryptoStream: cryptoStream, + version: v, } } @@ -40,15 +34,6 @@ func (f *streamFramer) PopStreamFrames(maxLen protocol.ByteCount) []*wire.Stream return append(fs, f.maybePopNormalFrames(maxLen-currentLen)...) } -func (f *streamFramer) PopBlockedFrame() wire.Frame { - if len(f.blockedFrameQueue) == 0 { - return nil - } - frame := f.blockedFrameQueue[0] - f.blockedFrameQueue = f.blockedFrameQueue[1:] - return frame -} - func (f *streamFramer) HasFramesForRetransmission() bool { return len(f.retransmissionQueue) > 0 } @@ -103,19 +88,11 @@ func (f *streamFramer) maybePopNormalFrames(maxTotalLen protocol.ByteCount) (res if frame == nil { return true, nil } - - if blocked, offset := f.connFlowController.IsBlocked(); blocked { - f.blockedFrameQueue = append(f.blockedFrameQueue, &wire.BlockedFrame{Offset: offset}) - } - res = append(res, frame) currentLen += frame.MinLength(f.version) + frame.DataLen() - if currentLen == maxTotalLen { return false, nil } - - frame = &wire.StreamFrame{DataLenPresent: true} return true, nil } diff --git a/stream_framer_test.go b/stream_framer_test.go index a2d33f788..deabfdb2f 100644 --- a/stream_framer_test.go +++ b/stream_framer_test.go @@ -23,7 +23,6 @@ var _ = Describe("Stream Framer", func() { framer *streamFramer streamsMap *streamsMap stream1, stream2 *mocks.MockStreamI - connFC *mocks.MockConnectionFlowController ) setNoData := func(str *mocks.MockStreamI) { @@ -49,8 +48,7 @@ var _ = Describe("Stream Framer", func() { streamsMap.putStream(stream1) streamsMap.putStream(stream2) - connFC = mocks.NewMockConnectionFlowController(mockCtrl) - framer = newStreamFramer(nil, streamsMap, connFC, versionGQUICFrames) + framer = newStreamFramer(nil, streamsMap, versionGQUICFrames) }) It("says if it has retransmissions", func() { @@ -69,11 +67,6 @@ var _ = Describe("Stream Framer", func() { }) Context("Popping", func() { - BeforeEach(func() { - // we're not connection-level flow control blocked - connFC.EXPECT().IsBlocked().AnyTimes() - }) - It("returns nil when popping an empty framer", func() { setNoData(stream1) setNoData(stream2) @@ -257,34 +250,4 @@ var _ = Describe("Stream Framer", func() { }) }) }) - - Context("BLOCKED frames", func() { - It("doesn't queue a stream-level BLOCKED frame after sending the FIN bit frame", func() { - setNoData(stream2) - f := &wire.StreamFrame{ - StreamID: id1, - Data: []byte("foobar"), - FinBit: true, - } - connFC.EXPECT().IsBlocked() - stream1.EXPECT().PopStreamFrame(gomock.Any()).Return(f) - // no call to IsFlowControlBlocked() - frames := framer.PopStreamFrames(1000) - Expect(frames).To(Equal([]*wire.StreamFrame{f})) - blockedFrame := framer.PopBlockedFrame() - Expect(blockedFrame).To(BeNil()) - }) - - It("queues and pops BLOCKED frames for connection blocked streams", func() { - setNoData(stream2) - connFC.EXPECT().IsBlocked().Return(true, protocol.ByteCount(0x4321)) - stream1.EXPECT().PopStreamFrame(gomock.Any()).Return(&wire.StreamFrame{ - StreamID: id1, - Data: []byte("foo"), - }) - framer.PopStreamFrames(1000) - Expect(framer.PopBlockedFrame()).To(Equal(&wire.BlockedFrame{Offset: 0x4321})) - Expect(framer.PopBlockedFrame()).To(BeNil()) - }) - }) }) diff --git a/stream_test.go b/stream_test.go index 2e4a92049..17d0c16b8 100644 --- a/stream_test.go +++ b/stream_test.go @@ -138,7 +138,7 @@ var _ = Describe("Stream", func() { str.version = versionGQUICFrames mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).AnyTimes() mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) - mockFC.EXPECT().IsBlocked() + mockFC.EXPECT().IsNewlyBlocked() err := str.CancelRead(1234) Expect(err).ToNot(HaveOccurred()) Expect(queuedControlFrames).To(BeEmpty()) // no RST_STREAM frame queued yet