diff --git a/blocked_manager.go b/blocked_manager.go deleted file mode 100644 index f4cc6c21..00000000 --- a/blocked_manager.go +++ /dev/null @@ -1,49 +0,0 @@ -package quic - -import ( - "sync" - - "github.com/lucas-clemente/quic-go/frames" - "github.com/lucas-clemente/quic-go/protocol" -) - -type blockedManager struct { - blockedStreams map[protocol.StreamID]protocol.ByteCount - mutex sync.Mutex -} - -func newBlockedManager() *blockedManager { - return &blockedManager{ - blockedStreams: make(map[protocol.StreamID]protocol.ByteCount), - } -} - -func (m *blockedManager) AddBlockedStream(streamID protocol.StreamID, offset protocol.ByteCount) { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.blockedStreams[streamID] = offset -} - -func (m *blockedManager) RemoveBlockedStream(streamID protocol.StreamID) { - m.mutex.Lock() - defer m.mutex.Unlock() - - delete(m.blockedStreams, streamID) -} - -func (m *blockedManager) GetBlockedFrame(streamID protocol.StreamID, offset protocol.ByteCount) *frames.BlockedFrame { - m.mutex.Lock() - defer m.mutex.Unlock() - - blockedOffset, ok := m.blockedStreams[streamID] - if !ok { - return nil - } - if blockedOffset > offset { - return nil - } - - delete(m.blockedStreams, streamID) - return &frames.BlockedFrame{StreamID: streamID} -} diff --git a/blocked_manager_test.go b/blocked_manager_test.go deleted file mode 100644 index 4bc2b8d5..00000000 --- a/blocked_manager_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package quic - -import ( - "github.com/lucas-clemente/quic-go/frames" - "github.com/lucas-clemente/quic-go/protocol" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -var _ = Describe("WindowUpdateManager", func() { - var bm *blockedManager - - BeforeEach(func() { - bm = newBlockedManager() - }) - - It("accepts new entries", func() { - bm.AddBlockedStream(1337, 0x1337) - Expect(bm.blockedStreams).To(HaveKey(protocol.StreamID(1337))) - Expect(bm.blockedStreams[1337]).To(Equal(protocol.ByteCount(0x1337))) - }) - - It("gets a blocked frame for the right offset", func() { - bm.AddBlockedStream(1337, 0x1337) - Expect(bm.GetBlockedFrame(1337, 0x1337)).To(Equal(&frames.BlockedFrame{StreamID: 1337})) - }) - - It("doesn't get a blocked frame twice for the same offset", func() { - bm.AddBlockedStream(1337, 0x1337) - Expect(bm.GetBlockedFrame(1337, 0x1337)).ToNot(BeNil()) - Expect(bm.GetBlockedFrame(1337, 0x1337)).To(BeNil()) - }) - - It("removes the blocked entry when the stream is not blocked anymore", func() { - bm.AddBlockedStream(1337, 0x1337) - bm.RemoveBlockedStream(1337) - Expect(bm.GetBlockedFrame(1337, 0x1337)).To(BeNil()) - }) - - It("doesn't care if the stream was previously blocked when removing the block", func() { - bm.RemoveBlockedStream(1337) - Expect(bm.GetBlockedFrame(1337, 0x1337)).To(BeNil()) - }) - - It("doesn't get a blocked frame for smaller offsets", func() { - bm.AddBlockedStream(1337, 0x1337) - Expect(bm.GetBlockedFrame(1337, 0x1336)).To(BeNil()) - }) - - It("doesn't get a blocked frame for the wrong stream", func() { - bm.AddBlockedStream(1337, 0x1337) - Expect(bm.GetBlockedFrame(1336, 0x1337)).To(BeNil()) - }) -}) diff --git a/packet_packer.go b/packet_packer.go index 6fd35ace..32a91584 100644 --- a/packet_packer.go +++ b/packet_packer.go @@ -26,20 +26,18 @@ type packetPacker struct { connectionParametersManager *handshake.ConnectionParametersManager - streamFramer *streamFramer - controlFrames []frames.Frame - blockedManager *blockedManager + streamFramer *streamFramer + controlFrames []frames.Frame lastPacketNumber protocol.PacketNumber } -func newPacketPacker(connectionID protocol.ConnectionID, cryptoSetup *handshake.CryptoSetup, connectionParametersHandler *handshake.ConnectionParametersManager, blockedManager *blockedManager, streamFramer *streamFramer, version protocol.VersionNumber) *packetPacker { +func newPacketPacker(connectionID protocol.ConnectionID, cryptoSetup *handshake.CryptoSetup, connectionParametersHandler *handshake.ConnectionParametersManager, streamFramer *streamFramer, version protocol.VersionNumber) *packetPacker { return &packetPacker{ cryptoSetup: cryptoSetup, connectionID: connectionID, connectionParametersManager: connectionParametersHandler, version: version, - blockedManager: blockedManager, streamFramer: streamFramer, } } @@ -212,7 +210,7 @@ func (p *packetPacker) composeNextPacket(stopWaitingFrame *frames.StopWaitingFra frameHeaderLen, _ := frame.MinLength(p.version) // StreamFrame.MinLength *never* returns an error payloadLength += frameHeaderLen + frame.DataLen() - blockedFrame := p.blockedManager.GetBlockedFrame(frame.StreamID, frame.Offset+frame.DataLen()) + blockedFrame := p.streamFramer.PopBlockedFrame() if blockedFrame != nil { blockedLength, _ := blockedFrame.MinLength(p.version) // BlockedFrame.MinLength *never* returns an error if payloadLength+blockedLength <= maxFrameSize { diff --git a/packet_packer_test.go b/packet_packer_test.go index b71fc942..d2167b0a 100644 --- a/packet_packer_test.go +++ b/packet_packer_test.go @@ -29,7 +29,6 @@ var _ = Describe("Packet packer", func() { packer = &packetPacker{ cryptoSetup: &handshake.CryptoSetup{}, connectionParametersManager: handshake.NewConnectionParamatersManager(), - blockedManager: newBlockedManager(), streamFramer: streamFramer, } publicHeaderLen = 1 + 8 + 1 // 1 flag byte, 8 connection ID, 1 packet number @@ -385,10 +384,10 @@ var _ = Describe("Packet packer", func() { }) }) - PContext("Blocked frames", func() { + Context("Blocked frames", func() { It("adds a blocked frame to a packet if there is enough space", func() { length := 100 - // packer.AddBlocked(5, protocol.ByteCount(length)) + streamFramer.blockedFrameQueue = []*frames.BlockedFrame{{StreamID: 5}} f := &frames.StreamFrame{ StreamID: 5, Data: bytes.Repeat([]byte{'f'}, length), @@ -402,7 +401,7 @@ var _ = Describe("Packet packer", func() { It("removes the dataLen attribute from the last StreamFrame, even if it inserted a BlockedFrame before", func() { length := 100 - // packer.AddBlocked(5, protocol.ByteCount(length)) + streamFramer.blockedFrameQueue = []*frames.BlockedFrame{{StreamID: 5}} f := &frames.StreamFrame{ StreamID: 5, Data: bytes.Repeat([]byte{'f'}, length), @@ -416,7 +415,7 @@ var _ = Describe("Packet packer", func() { It("packs a BlockedFrame in the next packet if the current packet doesn't have enough space", func() { dataLen := int(protocol.MaxFrameAndPublicHeaderSize-publicHeaderLen) - (1 + 1 + 2) + 1 - // packer.AddBlocked(5, protocol.ByteCount(dataLen)) + streamFramer.blockedFrameQueue = []*frames.BlockedFrame{{StreamID: 5}} f := &frames.StreamFrame{ StreamID: 5, Data: bytes.Repeat([]byte{'f'}, dataLen), @@ -441,7 +440,7 @@ var _ = Describe("Packet packer", func() { streamFrameHeaderLen, _ := f1.MinLength(0) // this is the maximum dataLen of a StreamFrames that fits into one packet dataLen := int(protocol.MaxFrameAndPublicHeaderSize - publicHeaderLen - streamFrameHeaderLen - blockedFrameLen) - // packer.AddBlocked(5, protocol.ByteCount(dataLen)) + streamFramer.blockedFrameQueue = []*frames.BlockedFrame{{StreamID: 5}} f1.Data = bytes.Repeat([]byte{'f'}, dataLen) streamFramer.AddFrameForRetransmission(f1) p, err := packer.PackPacket(nil, []frames.Frame{}, 0) @@ -453,10 +452,8 @@ var _ = Describe("Packet packer", func() { Expect(p).To(BeNil()) }) - // TODO: fix this once connection-level BlockedFrames are sent out at the right time - // see https://github.com/lucas-clemente/quic-go/issues/113 It("packs a connection-level BlockedFrame", func() { - // packer.AddBlocked(0, 0x1337) + streamFramer.blockedFrameQueue = []*frames.BlockedFrame{{StreamID: 0}} f := &frames.StreamFrame{ StreamID: 5, Data: []byte("foobar"), diff --git a/session.go b/session.go index 5db32219..32a42187 100644 --- a/session.go +++ b/session.go @@ -57,7 +57,6 @@ type Session struct { receivedPacketHandler ackhandlerlegacy.ReceivedPacketHandler stopWaitingManager ackhandlerlegacy.StopWaitingManager windowUpdateManager *windowUpdateManager - blockedManager *blockedManager streamFramer *streamFramer flowControlManager flowcontrol.FlowControlManager @@ -108,7 +107,6 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol stopWaitingManager: stopWaitingManager, flowControlManager: flowControlManager, windowUpdateManager: newWindowUpdateManager(), - blockedManager: newBlockedManager(), receivedPackets: make(chan receivedPacket, protocol.MaxSessionUnprocessedPackets), closeChan: make(chan struct{}, 1), sendingScheduled: make(chan struct{}, 1), @@ -127,7 +125,7 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol } session.streamFramer = newStreamFramer(&session.streams, &session.streamsMutex, flowControlManager) - session.packer = newPacketPacker(connectionID, session.cryptoSetup, session.connectionParametersManager, session.blockedManager, session.streamFramer, v) + session.packer = newPacketPacker(connectionID, session.cryptoSetup, session.connectionParametersManager, session.streamFramer, v) session.unpacker = &packetUnpacker{aead: session.cryptoSetup, version: v} return session, err @@ -634,9 +632,6 @@ func (s *Session) garbageCollectStreams() { if v == nil { continue } - if v.finishedWriting() { - s.blockedManager.RemoveBlockedStream(k) - } if v.finishedReading() { s.windowUpdateManager.RemoveStream(k) } diff --git a/session_test.go b/session_test.go index acaaa761..d2d52171 100644 --- a/session_test.go +++ b/session_test.go @@ -229,20 +229,6 @@ var _ = Describe("Session", func() { Expect(session.streams[5]).To(BeNil()) }) - PIt("removes closed streams from BlockedManager", func() { - session.handleStreamFrame(&frames.StreamFrame{ - StreamID: 5, - Data: []byte{0xde, 0xca, 0xfb, 0xad}, - }) - Expect(session.streams[5]).ToNot(BeNil()) - session.blockedManager.AddBlockedStream(5, 4) - Expect(session.blockedManager.blockedStreams).To(HaveKey(protocol.StreamID(5))) - err := session.streams[5].Close() - Expect(err).ToNot(HaveOccurred()) - session.garbageCollectStreams() - Expect(session.blockedManager.blockedStreams).ToNot(HaveKey(protocol.StreamID(5))) - }) - It("removes closed streams from WindowUpdateManager", func() { session.handleStreamFrame(&frames.StreamFrame{ StreamID: 5,