From 15e7fe6c735480226fddef504d3feff76f25d0bf Mon Sep 17 00:00:00 2001 From: Lucas Clemente Date: Tue, 17 May 2016 16:52:55 +0200 Subject: [PATCH] don't block the server when too many packets are queued in a session fixes #35 --- protocol/server_parameters.go | 3 +++ session.go | 9 +++++++-- session_test.go | 8 ++++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/protocol/server_parameters.go b/protocol/server_parameters.go index e90bc07ce..0695d1dd6 100644 --- a/protocol/server_parameters.go +++ b/protocol/server_parameters.go @@ -49,3 +49,6 @@ const WindowUpdateThreshold = ReceiveStreamFlowControlWindow / 2 // WindowUpdateNumRepitions is the number of times the same WindowUpdate frame will be sent to the client const WindowUpdateNumRepitions uint8 = 2 + +// MaxSessionUnprocessedPackets is the max number of packets stored in each session that are not yet processed. +const MaxSessionUnprocessedPackets = 128 diff --git a/session.go b/session.go index 47a7b84b4..565060159 100644 --- a/session.go +++ b/session.go @@ -89,7 +89,7 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol receivedPacketHandler: ackhandler.NewReceivedPacketHandler(), stopWaitingManager: stopWaitingManager, windowUpdateManager: newWindowUpdateManager(), - receivedPackets: make(chan receivedPacket, 1000), // TODO: What if server receives many packets and connection is already closed?! + receivedPackets: make(chan receivedPacket, protocol.MaxSessionUnprocessedPackets), closeChan: make(chan struct{}, 1), sendingScheduled: make(chan struct{}, 1), rttStats: congestion.RTTStats{}, @@ -242,7 +242,12 @@ func (s *Session) handlePacketImpl(remoteAddr interface{}, hdr *publicHeader, da // handlePacket handles a packet func (s *Session) handlePacket(remoteAddr interface{}, hdr *publicHeader, data []byte) { - s.receivedPackets <- receivedPacket{remoteAddr: remoteAddr, publicHeader: hdr, data: data} + // Discard packets once the amount of queued packets is larger than + // the channel size, protocol.MaxSessionUnprocessedPackets + select { + case s.receivedPackets <- receivedPacket{remoteAddr: remoteAddr, publicHeader: hdr, data: data}: + default: + } } func (s *Session) handleStreamFrame(frame *frames.StreamFrame) error { diff --git a/session_test.go b/session_test.go index ceb6b13b8..b7e4252fe 100644 --- a/session_test.go +++ b/session_test.go @@ -680,4 +680,12 @@ var _ = Describe("Session", func() { Expect(cong.argsOnCongestionEvent[3]).To(Equal(congestion.PacketVector{{2, 2}})) }) }) + + It("stored up to MaxSessionUnprocessedPackets packets", func(done Done) { + // Nothing here should block + for i := 0; i < protocol.MaxSessionUnprocessedPackets+10; i++ { + session.handlePacket(nil, nil, nil) + } + close(done) + }, 0.5) })