don't block the server when too many packets are queued in a session

fixes #35
This commit is contained in:
Lucas Clemente
2016-05-17 16:52:55 +02:00
parent 68b529a54c
commit 15e7fe6c73
3 changed files with 18 additions and 2 deletions

View File

@@ -49,3 +49,6 @@ const WindowUpdateThreshold = ReceiveStreamFlowControlWindow / 2
// WindowUpdateNumRepitions is the number of times the same WindowUpdate frame will be sent to the client
const WindowUpdateNumRepitions uint8 = 2
// MaxSessionUnprocessedPackets is the max number of packets stored in each session that are not yet processed.
const MaxSessionUnprocessedPackets = 128

View File

@@ -89,7 +89,7 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol
receivedPacketHandler: ackhandler.NewReceivedPacketHandler(),
stopWaitingManager: stopWaitingManager,
windowUpdateManager: newWindowUpdateManager(),
receivedPackets: make(chan receivedPacket, 1000), // TODO: What if server receives many packets and connection is already closed?!
receivedPackets: make(chan receivedPacket, protocol.MaxSessionUnprocessedPackets),
closeChan: make(chan struct{}, 1),
sendingScheduled: make(chan struct{}, 1),
rttStats: congestion.RTTStats{},
@@ -242,7 +242,12 @@ func (s *Session) handlePacketImpl(remoteAddr interface{}, hdr *publicHeader, da
// handlePacket handles a packet
func (s *Session) handlePacket(remoteAddr interface{}, hdr *publicHeader, data []byte) {
s.receivedPackets <- receivedPacket{remoteAddr: remoteAddr, publicHeader: hdr, data: data}
// Discard packets once the amount of queued packets is larger than
// the channel size, protocol.MaxSessionUnprocessedPackets
select {
case s.receivedPackets <- receivedPacket{remoteAddr: remoteAddr, publicHeader: hdr, data: data}:
default:
}
}
func (s *Session) handleStreamFrame(frame *frames.StreamFrame) error {

View File

@@ -680,4 +680,12 @@ var _ = Describe("Session", func() {
Expect(cong.argsOnCongestionEvent[3]).To(Equal(congestion.PacketVector{{2, 2}}))
})
})
It("stored up to MaxSessionUnprocessedPackets packets", func(done Done) {
// Nothing here should block
for i := 0; i < protocol.MaxSessionUnprocessedPackets+10; i++ {
session.handlePacket(nil, nil, nil)
}
close(done)
}, 0.5)
})