initialize StreamFrameQueue in Session, not in PacketPacker

This commit is contained in:
Marten Seemann
2016-06-10 19:06:40 +07:00
parent 478e4c95e1
commit b1c27b5eca
3 changed files with 7 additions and 24 deletions

View File

@@ -34,7 +34,7 @@ type packetPacker struct {
lastPacketNumber protocol.PacketNumber lastPacketNumber protocol.PacketNumber
} }
func newPacketPacker(connectionID protocol.ConnectionID, cryptoSetup *handshake.CryptoSetup, sentPacketHandler ackhandler.SentPacketHandler, connectionParametersHandler *handshake.ConnectionParametersManager, blockedManager *blockedManager, version protocol.VersionNumber) *packetPacker { func newPacketPacker(connectionID protocol.ConnectionID, cryptoSetup *handshake.CryptoSetup, sentPacketHandler ackhandler.SentPacketHandler, connectionParametersHandler *handshake.ConnectionParametersManager, blockedManager *blockedManager, streamFrameQueue *streamFrameQueue, version protocol.VersionNumber) *packetPacker {
return &packetPacker{ return &packetPacker{
cryptoSetup: cryptoSetup, cryptoSetup: cryptoSetup,
connectionID: connectionID, connectionID: connectionID,
@@ -42,7 +42,7 @@ func newPacketPacker(connectionID protocol.ConnectionID, cryptoSetup *handshake.
version: version, version: version,
sentPacketHandler: sentPacketHandler, sentPacketHandler: sentPacketHandler,
blockedManager: blockedManager, blockedManager: blockedManager,
streamFrameQueue: newStreamFrameQueue(), streamFrameQueue: streamFrameQueue,
} }
} }
@@ -245,12 +245,3 @@ func (p *packetPacker) composeNextPacket(stopWaitingFrame *frames.StopWaitingFra
return payloadFrames, nil return payloadFrames, nil
} }
// Empty returns true if no frames are queued
func (p *packetPacker) Empty() bool {
return p.streamFrameQueue.ByteLen() == 0
}
func (p *packetPacker) StreamFrameQueueByteLen() protocol.ByteCount {
return p.streamFrameQueue.ByteLen()
}

View File

@@ -454,14 +454,4 @@ var _ = Describe("Packet packer", func() {
Expect(p[0]).To(Equal(&frames.BlockedFrame{StreamID: 0})) Expect(p[0]).To(Equal(&frames.BlockedFrame{StreamID: 0}))
}) })
}) })
It("says whether it is empty", func() {
Expect(packer.Empty()).To(BeTrue())
f := frames.StreamFrame{
StreamID: 5,
Data: []byte{0xDE, 0xCA, 0xFB, 0xAD},
}
packer.AddStreamFrame(f)
Expect(packer.Empty()).To(BeFalse())
})
}) })

View File

@@ -53,6 +53,7 @@ type Session struct {
stopWaitingManager ackhandler.StopWaitingManager stopWaitingManager ackhandler.StopWaitingManager
windowUpdateManager *windowUpdateManager windowUpdateManager *windowUpdateManager
blockedManager *blockedManager blockedManager *blockedManager
streamFrameQueue *streamFrameQueue
flowController flowcontrol.FlowController // connection level flow controller flowController flowcontrol.FlowController // connection level flow controller
@@ -101,6 +102,7 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol
flowController: flowcontrol.NewFlowController(0, connectionParametersManager), flowController: flowcontrol.NewFlowController(0, connectionParametersManager),
windowUpdateManager: newWindowUpdateManager(), windowUpdateManager: newWindowUpdateManager(),
blockedManager: newBlockedManager(), blockedManager: newBlockedManager(),
streamFrameQueue: newStreamFrameQueue(),
receivedPackets: make(chan receivedPacket, protocol.MaxSessionUnprocessedPackets), receivedPackets: make(chan receivedPacket, protocol.MaxSessionUnprocessedPackets),
closeChan: make(chan struct{}, 1), closeChan: make(chan struct{}, 1),
sendingScheduled: make(chan struct{}, 1), sendingScheduled: make(chan struct{}, 1),
@@ -118,7 +120,7 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol
return nil, err return nil, err
} }
session.packer = newPacketPacker(connectionID, session.cryptoSetup, session.sentPacketHandler, session.connectionParametersManager, session.blockedManager, v) session.packer = newPacketPacker(connectionID, session.cryptoSetup, session.sentPacketHandler, session.connectionParametersManager, session.blockedManager, session.streamFrameQueue, v)
session.unpacker = &packetUnpacker{aead: session.cryptoSetup, version: v} session.unpacker = &packetUnpacker{aead: session.cryptoSetup, version: v}
return session, err return session, err
@@ -451,7 +453,7 @@ func (s *Session) maybeSendPacket() error {
} }
// note that maxPacketSize can get (much) larger than protocol.MaxPacketSize if there is a long queue of StreamFrames // note that maxPacketSize can get (much) larger than protocol.MaxPacketSize if there is a long queue of StreamFrames
maxPacketSize += s.packer.StreamFrameQueueByteLen() maxPacketSize += s.streamFrameQueue.ByteLen()
if maxPacketSize > protocol.SmallPacketPayloadSizeThreshold { if maxPacketSize > protocol.SmallPacketPayloadSizeThreshold {
return s.sendPacket() return s.sendPacket()
@@ -540,7 +542,7 @@ func (s *Session) sendPacket() error {
return err return err
} }
if !s.packer.Empty() { if s.streamFrameQueue.Len() > 0 {
s.scheduleSending() s.scheduleSending()
} }