From f1c6421845a1e45345d7921a47d9901158234d4d Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 31 Dec 2020 13:34:34 +0800 Subject: [PATCH 1/2] introduce an interface for the send queue, use a mock in session tests --- mock_sender_test.go | 72 +++++++++++++++++++++++++++++++++++++++++++++ mockgen.go | 1 + send_queue.go | 10 ++++++- send_queue_test.go | 2 +- session.go | 2 +- session_test.go | 52 ++++++++++++++++++++------------ 6 files changed, 117 insertions(+), 22 deletions(-) create mode 100644 mock_sender_test.go diff --git a/mock_sender_test.go b/mock_sender_test.go new file mode 100644 index 000000000..462e9763c --- /dev/null +++ b/mock_sender_test.go @@ -0,0 +1,72 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/lucas-clemente/quic-go (interfaces: Sender) + +// Package quic is a generated GoMock package. +package quic + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockSender is a mock of Sender interface +type MockSender struct { + ctrl *gomock.Controller + recorder *MockSenderMockRecorder +} + +// MockSenderMockRecorder is the mock recorder for MockSender +type MockSenderMockRecorder struct { + mock *MockSender +} + +// NewMockSender creates a new mock instance +func NewMockSender(ctrl *gomock.Controller) *MockSender { + mock := &MockSender{ctrl: ctrl} + mock.recorder = &MockSenderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockSender) EXPECT() *MockSenderMockRecorder { + return m.recorder +} + +// Close mocks base method +func (m *MockSender) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close +func (mr *MockSenderMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSender)(nil).Close)) +} + +// Run mocks base method +func (m *MockSender) Run() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Run") + ret0, _ := ret[0].(error) + return ret0 +} + +// Run indicates an expected call of Run +func (mr *MockSenderMockRecorder) Run() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSender)(nil).Run)) +} + +// Send mocks base method +func (m *MockSender) Send(arg0 *packetBuffer) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Send", arg0) +} + +// Send indicates an expected call of Send +func (mr *MockSenderMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockSender)(nil).Send), arg0) +} diff --git a/mockgen.go b/mockgen.go index 7f0ba432a..8f6b0578d 100644 --- a/mockgen.go +++ b/mockgen.go @@ -1,6 +1,7 @@ package quic //go:generate sh -c "./mockgen_private.sh quic mock_send_conn_test.go github.com/lucas-clemente/quic-go sendConn" +//go:generate sh -c "./mockgen_private.sh quic mock_sender_test.go github.com/lucas-clemente/quic-go sender" //go:generate sh -c "./mockgen_private.sh quic mock_stream_internal_test.go github.com/lucas-clemente/quic-go streamI" //go:generate sh -c "./mockgen_private.sh quic mock_crypto_stream_test.go github.com/lucas-clemente/quic-go cryptoStream" //go:generate sh -c "./mockgen_private.sh quic mock_receive_stream_internal_test.go github.com/lucas-clemente/quic-go receiveStreamI" diff --git a/send_queue.go b/send_queue.go index 97d04dbac..04ba48ed4 100644 --- a/send_queue.go +++ b/send_queue.go @@ -1,5 +1,11 @@ package quic +type sender interface { + Send(p *packetBuffer) + Run() error + Close() +} + type sendQueue struct { queue chan *packetBuffer closeCalled chan struct{} // runStopped when Close() is called @@ -7,7 +13,9 @@ type sendQueue struct { conn sendConn } -func newSendQueue(conn sendConn) *sendQueue { +var _ sender = &sendQueue{} + +func newSendQueue(conn sendConn) sender { s := &sendQueue{ conn: conn, runStopped: make(chan struct{}), diff --git a/send_queue_test.go b/send_queue_test.go index b82b6dfc4..710206fd0 100644 --- a/send_queue_test.go +++ b/send_queue_test.go @@ -9,7 +9,7 @@ import ( ) var _ = Describe("Send Queue", func() { - var q *sendQueue + var q sender var c *MockSendConn BeforeEach(func() { diff --git a/session.go b/session.go index eef65d2e7..3a8746f67 100644 --- a/session.go +++ b/session.go @@ -137,7 +137,7 @@ type session struct { config *Config conn sendConn - sendQueue *sendQueue + sendQueue sender streamsMap streamManager connIDManager *connIDManager diff --git a/session_test.go b/session_test.go index 9163d44d6..9761c0084 100644 --- a/session_test.go +++ b/session_test.go @@ -1207,10 +1207,16 @@ var _ = Describe("Session", func() { }) Context("sending packets", func() { - var sessionDone chan struct{} + var ( + sessionDone chan struct{} + sender *MockSender + ) BeforeEach(func() { + sender = NewMockSender(mockCtrl) + sess.sendQueue = sender sessionDone = make(chan struct{}) + sender.EXPECT().Run() }) AfterEach(func() { @@ -1221,6 +1227,7 @@ var _ = Describe("Session", func() { mconn.EXPECT().Write(gomock.Any()) tracer.EXPECT().ClosedConnection(gomock.Any()) tracer.EXPECT().Close() + sender.EXPECT().Close() sess.shutdown() Eventually(sess.Context().Done()).Should(BeClosed()) Eventually(sessionDone).Should(BeClosed()) @@ -1249,7 +1256,7 @@ var _ = Describe("Session", func() { packer.EXPECT().PackPacket().Return(p, nil) packer.EXPECT().PackPacket().Return(nil, nil).AnyTimes() sent := make(chan struct{}) - mconn.EXPECT().Write(gomock.Any()).Do(func([]byte) { close(sent) }) + sender.EXPECT().Send(gomock.Any()).Do(func(packet *packetBuffer) { close(sent) }) tracer.EXPECT().SentPacket(p.header, p.buffer.Len(), nil, []logging.Frame{}) sess.scheduleSending() Eventually(sent).Should(BeClosed()) @@ -1295,7 +1302,7 @@ var _ = Describe("Session", func() { sess.connFlowController = fc runSession() sent := make(chan struct{}) - mconn.EXPECT().Write(gomock.Any()).Do(func([]byte) { close(sent) }) + sender.EXPECT().Send(gomock.Any()).Do(func(packet *packetBuffer) { close(sent) }) tracer.EXPECT().SentPacket(p.header, p.length, nil, []logging.Frame{}) sess.scheduleSending() Eventually(sent).Should(BeClosed()) @@ -1351,7 +1358,7 @@ var _ = Describe("Session", func() { sess.sentPacketHandler = sph runSession() sent := make(chan struct{}) - mconn.EXPECT().Write(gomock.Any()).Do(func([]byte) { close(sent) }) + sender.EXPECT().Send(gomock.Any()).Do(func(packet *packetBuffer) { close(sent) }) tracer.EXPECT().SentPacket(p.header, p.length, gomock.Any(), gomock.Any()) sess.scheduleSending() Eventually(sent).Should(BeClosed()) @@ -1372,7 +1379,7 @@ var _ = Describe("Session", func() { sess.sentPacketHandler = sph runSession() sent := make(chan struct{}) - mconn.EXPECT().Write(gomock.Any()).Do(func([]byte) { close(sent) }) + sender.EXPECT().Send(gomock.Any()).Do(func(packet *packetBuffer) { close(sent) }) tracer.EXPECT().SentPacket(p.header, p.length, gomock.Any(), gomock.Any()) sess.scheduleSending() Eventually(sent).Should(BeClosed()) @@ -1385,7 +1392,10 @@ var _ = Describe("Session", func() { }) Context("packet pacing", func() { - var sph *mockackhandler.MockSentPacketHandler + var ( + sph *mockackhandler.MockSentPacketHandler + sender *MockSender + ) BeforeEach(func() { tracer.EXPECT().SentPacket(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() @@ -1394,6 +1404,9 @@ var _ = Describe("Session", func() { sess.handshakeConfirmed = true sess.handshakeComplete = true sess.sentPacketHandler = sph + sender = NewMockSender(mockCtrl) + sender.EXPECT().Run() + sess.sendQueue = sender streamManager.EXPECT().CloseWithError(gomock.Any()) }) @@ -1405,6 +1418,7 @@ var _ = Describe("Session", func() { mconn.EXPECT().Write(gomock.Any()) tracer.EXPECT().ClosedConnection(gomock.Any()) tracer.EXPECT().Close() + sender.EXPECT().Close() sess.shutdown() Eventually(sess.Context().Done()).Should(BeClosed()) }) @@ -1417,7 +1431,7 @@ var _ = Describe("Session", func() { sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(3) packer.EXPECT().PackPacket().Return(getPacket(10), nil) packer.EXPECT().PackPacket().Return(getPacket(11), nil) - mconn.EXPECT().Write(gomock.Any()).Times(2) + sender.EXPECT().Send(gomock.Any()).Times(2) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) @@ -1435,7 +1449,7 @@ var _ = Describe("Session", func() { sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(3) packer.EXPECT().PackPacket().Return(getPacket(10), nil) packer.EXPECT().PackPacket().Return(nil, nil) - mconn.EXPECT().Write(gomock.Any()) + sender.EXPECT().Send(gomock.Any()) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) @@ -1453,7 +1467,7 @@ var _ = Describe("Session", func() { sph.EXPECT().SendMode().Return(ackhandler.SendAny) sph.EXPECT().SendMode().Return(ackhandler.SendAck) packer.EXPECT().PackPacket().Return(getPacket(100), nil) - mconn.EXPECT().Write(gomock.Any()) + sender.EXPECT().Send(gomock.Any()) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) @@ -1479,10 +1493,7 @@ var _ = Describe("Session", func() { sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)), ) written := make(chan struct{}, 2) - mconn.EXPECT().Write(gomock.Any()).DoAndReturn(func(p []byte) (int, error) { - written <- struct{}{} - return len(p), nil - }).Times(2) + sender.EXPECT().Send(gomock.Any()).DoAndReturn(func(p *packetBuffer) { written <- struct{}{} }).Times(2) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) @@ -1504,10 +1515,7 @@ var _ = Describe("Session", func() { packer.EXPECT().PackPacket().Return(getPacket(1001), nil) packer.EXPECT().PackPacket().Return(getPacket(1002), nil) written := make(chan struct{}, 3) - mconn.EXPECT().Write(gomock.Any()).DoAndReturn(func(p []byte) (int, error) { - written <- struct{}{} - return len(p), nil - }).Times(3) + sender.EXPECT().Send(gomock.Any()).DoAndReturn(func(p *packetBuffer) { written <- struct{}{} }).Times(3) go func() { defer GinkgoRecover() cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) @@ -1533,7 +1541,12 @@ var _ = Describe("Session", func() { }) Context("scheduling sending", func() { + var sender *MockSender + BeforeEach(func() { + sender = NewMockSender(mockCtrl) + sender.EXPECT().Run() + sess.sendQueue = sender sess.handshakeConfirmed = true }) @@ -1544,6 +1557,7 @@ var _ = Describe("Session", func() { packer.EXPECT().PackConnectionClose(gomock.Any()).Return(&coalescedPacket{buffer: getPacketBuffer()}, nil) cryptoSetup.EXPECT().Close() mconn.EXPECT().Write(gomock.Any()) + sender.EXPECT().Close() tracer.EXPECT().ClosedConnection(gomock.Any()) tracer.EXPECT().Close() sess.shutdown() @@ -1570,7 +1584,7 @@ var _ = Describe("Session", func() { time.Sleep(50 * time.Millisecond) // only EXPECT calls after scheduleSending is called written := make(chan struct{}) - mconn.EXPECT().Write(gomock.Any()).Do(func([]byte) { close(written) }) + sender.EXPECT().Send(gomock.Any()).Do(func(*packetBuffer) { close(written) }) tracer.EXPECT().SentPacket(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() sess.scheduleSending() Eventually(written).Should(BeClosed()) @@ -1594,7 +1608,7 @@ var _ = Describe("Session", func() { sess.receivedPacketHandler = rph written := make(chan struct{}) - mconn.EXPECT().Write(gomock.Any()).Do(func([]byte) { close(written) }) + sender.EXPECT().Send(gomock.Any()).Do(func(*packetBuffer) { close(written) }) tracer.EXPECT().SentPacket(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() go func() { defer GinkgoRecover() From b81a6f875b01d46ad6192b0515d11356707712aa Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 25 Dec 2020 12:27:54 +0700 Subject: [PATCH 2/2] don't generate new packets when the send queue is full --- mock_sender_test.go | 28 ++++++++++++++++++ send_queue.go | 28 ++++++++++++++++-- send_queue_test.go | 31 ++++++++++---------- session.go | 20 ++++++++++++- session_test.go | 71 ++++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 158 insertions(+), 20 deletions(-) diff --git a/mock_sender_test.go b/mock_sender_test.go index 462e9763c..60a216a36 100644 --- a/mock_sender_test.go +++ b/mock_sender_test.go @@ -33,6 +33,20 @@ func (m *MockSender) EXPECT() *MockSenderMockRecorder { return m.recorder } +// Available mocks base method +func (m *MockSender) Available() <-chan struct{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Available") + ret0, _ := ret[0].(<-chan struct{}) + return ret0 +} + +// Available indicates an expected call of Available +func (mr *MockSenderMockRecorder) Available() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Available", reflect.TypeOf((*MockSender)(nil).Available)) +} + // Close mocks base method func (m *MockSender) Close() { m.ctrl.T.Helper() @@ -70,3 +84,17 @@ func (mr *MockSenderMockRecorder) Send(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockSender)(nil).Send), arg0) } + +// WouldBlock mocks base method +func (m *MockSender) WouldBlock() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WouldBlock") + ret0, _ := ret[0].(bool) + return ret0 +} + +// WouldBlock indicates an expected call of WouldBlock +func (mr *MockSenderMockRecorder) WouldBlock() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WouldBlock", reflect.TypeOf((*MockSender)(nil).WouldBlock)) +} diff --git a/send_queue.go b/send_queue.go index 04ba48ed4..4d31a2764 100644 --- a/send_queue.go +++ b/send_queue.go @@ -3,6 +3,8 @@ package quic type sender interface { Send(p *packetBuffer) Run() error + WouldBlock() bool + Available() <-chan struct{} Close() } @@ -10,28 +12,44 @@ type sendQueue struct { queue chan *packetBuffer closeCalled chan struct{} // runStopped when Close() is called runStopped chan struct{} // runStopped when the run loop returns + available chan struct{} conn sendConn } var _ sender = &sendQueue{} +const sendQueueCapacity = 1 + func newSendQueue(conn sendConn) sender { - s := &sendQueue{ + return &sendQueue{ conn: conn, runStopped: make(chan struct{}), closeCalled: make(chan struct{}), - queue: make(chan *packetBuffer, 1), + available: make(chan struct{}, 1), + queue: make(chan *packetBuffer, sendQueueCapacity), } - return s } +// Send sends out a packet. It's guaranteed to not block. +// Callers need to make sure that there's actually space in the send queue by calling WouldBlock. +// Otherwise Send will panic. func (h *sendQueue) Send(p *packetBuffer) { select { case h.queue <- p: case <-h.runStopped: + default: + panic("sendQueue.Send would have blocked") } } +func (h *sendQueue) WouldBlock() bool { + return len(h.queue) == sendQueueCapacity +} + +func (h *sendQueue) Available() <-chan struct{} { + return h.available +} + func (h *sendQueue) Run() error { defer close(h.runStopped) var shouldClose bool @@ -49,6 +67,10 @@ func (h *sendQueue) Run() error { return err } p.Release() + select { + case h.available <- struct{}{}: + default: + } } } } diff --git a/send_queue_test.go b/send_queue_test.go index 710206fd0..416c264b1 100644 --- a/send_queue_test.go +++ b/send_queue_test.go @@ -42,21 +42,20 @@ var _ = Describe("Send Queue", func() { Eventually(done).Should(BeClosed()) }) - It("blocks sending when too many packets are queued", func() { - q.Send(getPacket([]byte("foobar"))) + It("panics when Send() is called although there's no space in the queue", func() { + Expect(q.WouldBlock()).To(BeFalse()) + q.Send(getPacket([]byte("foobar1"))) + Expect(q.WouldBlock()).To(BeTrue()) + Expect(func() { q.Send(getPacket([]byte("foobar2"))) }).To(Panic()) + }) - written := make(chan []byte, 2) - c.EXPECT().Write(gomock.Any()).Do(func(p []byte) { written <- p }).Times(2) - - sent := make(chan struct{}) - go func() { - defer GinkgoRecover() - q.Send(getPacket([]byte("raboof"))) - close(sent) - }() - - Consistently(sent).ShouldNot(BeClosed()) + It("signals when sending is possible again", func() { + Expect(q.WouldBlock()).To(BeFalse()) + q.Send(getPacket([]byte("foobar1"))) + Consistently(q.Available()).ShouldNot(Receive()) + // now start sending out packets. This should free up queue space. + c.EXPECT().Write(gomock.Any()).MinTimes(1).MaxTimes(2) done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -64,8 +63,10 @@ var _ = Describe("Send Queue", func() { close(done) }() - Eventually(written).Should(Receive(Equal([]byte("foobar")))) - Eventually(written).Should(Receive(Equal([]byte("raboof")))) + Eventually(q.Available()).Should(Receive()) + Expect(q.WouldBlock()).To(BeFalse()) + Expect(func() { q.Send(getPacket([]byte("foobar2"))) }).ToNot(Panic()) + q.Close() Eventually(done).Should(BeClosed()) }) diff --git a/session.go b/session.go index 3a8746f67..ddcd702ef 100644 --- a/session.go +++ b/session.go @@ -542,7 +542,10 @@ func (s *session) run() error { } } - var closeErr closeError + var ( + closeErr closeError + sendQueueAvailable <-chan struct{} + ) runLoop: for { @@ -583,6 +586,7 @@ runLoop: case <-s.sendingScheduled: // We do all the interesting stuff after the switch statement, so // nothing to see here. + case <-sendQueueAvailable: case firstPacket := <-s.receivedPackets: s.sentPacketHandler.ReceivedBytes(firstPacket.Size()) wasProcessed := s.handlePacketImpl(firstPacket) @@ -655,9 +659,20 @@ runLoop: } } + if s.sendQueue.WouldBlock() { + // The send queue is still busy sending out packets. + // Wait until there's space to enqueue new packets. + sendQueueAvailable = s.sendQueue.Available() + continue + } if err := s.sendPackets(); err != nil { s.closeLocal(err) } + if s.sendQueue.WouldBlock() { + sendQueueAvailable = s.sendQueue.Available() + } else { + sendQueueAvailable = nil + } } s.handleCloseError(closeErr) @@ -1541,6 +1556,9 @@ func (s *session) sendPackets() error { default: return fmt.Errorf("BUG: invalid send mode %d", sendMode) } + if s.sendQueue.WouldBlock() { + return nil + } } } diff --git a/session_test.go b/session_test.go index 9761c0084..3cb6eaf58 100644 --- a/session_test.go +++ b/session_test.go @@ -1214,9 +1214,10 @@ var _ = Describe("Session", func() { BeforeEach(func() { sender = NewMockSender(mockCtrl) + sender.EXPECT().Run() + sender.EXPECT().WouldBlock().AnyTimes() sess.sendQueue = sender sessionDone = make(chan struct{}) - sender.EXPECT().Run() }) AfterEach(func() { @@ -1256,6 +1257,7 @@ var _ = Describe("Session", func() { packer.EXPECT().PackPacket().Return(p, nil) packer.EXPECT().PackPacket().Return(nil, nil).AnyTimes() sent := make(chan struct{}) + sender.EXPECT().WouldBlock().AnyTimes() sender.EXPECT().Send(gomock.Any()).Do(func(packet *packetBuffer) { close(sent) }) tracer.EXPECT().SentPacket(p.header, p.buffer.Len(), nil, []logging.Frame{}) sess.scheduleSending() @@ -1431,6 +1433,7 @@ var _ = Describe("Session", func() { sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(3) packer.EXPECT().PackPacket().Return(getPacket(10), nil) packer.EXPECT().PackPacket().Return(getPacket(11), nil) + sender.EXPECT().WouldBlock().AnyTimes() sender.EXPECT().Send(gomock.Any()).Times(2) go func() { defer GinkgoRecover() @@ -1449,6 +1452,7 @@ var _ = Describe("Session", func() { sph.EXPECT().SendMode().Return(ackhandler.SendAny).Times(3) packer.EXPECT().PackPacket().Return(getPacket(10), nil) packer.EXPECT().PackPacket().Return(nil, nil) + sender.EXPECT().WouldBlock().AnyTimes() sender.EXPECT().Send(gomock.Any()) go func() { defer GinkgoRecover() @@ -1467,6 +1471,7 @@ var _ = Describe("Session", func() { sph.EXPECT().SendMode().Return(ackhandler.SendAny) sph.EXPECT().SendMode().Return(ackhandler.SendAck) packer.EXPECT().PackPacket().Return(getPacket(100), nil) + sender.EXPECT().WouldBlock().AnyTimes() sender.EXPECT().Send(gomock.Any()) go func() { defer GinkgoRecover() @@ -1493,6 +1498,7 @@ var _ = Describe("Session", func() { sph.EXPECT().TimeUntilSend().Return(time.Now().Add(time.Hour)), ) written := make(chan struct{}, 2) + sender.EXPECT().WouldBlock().AnyTimes() sender.EXPECT().Send(gomock.Any()).DoAndReturn(func(p *packetBuffer) { written <- struct{}{} }).Times(2) go func() { defer GinkgoRecover() @@ -1515,6 +1521,7 @@ var _ = Describe("Session", func() { packer.EXPECT().PackPacket().Return(getPacket(1001), nil) packer.EXPECT().PackPacket().Return(getPacket(1002), nil) written := make(chan struct{}, 3) + sender.EXPECT().WouldBlock().AnyTimes() sender.EXPECT().Send(gomock.Any()).DoAndReturn(func(p *packetBuffer) { written <- struct{}{} }).Times(3) go func() { defer GinkgoRecover() @@ -1525,9 +1532,70 @@ var _ = Describe("Session", func() { Eventually(written).Should(HaveLen(3)) }) + It("doesn't try to send if the send queue is full", func() { + available := make(chan struct{}, 1) + sender.EXPECT().WouldBlock().Return(true) + sender.EXPECT().Available().Return(available) + go func() { + defer GinkgoRecover() + cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) + sess.run() + }() + sess.scheduleSending() + time.Sleep(scaleDuration(50 * time.Millisecond)) + + written := make(chan struct{}) + sender.EXPECT().WouldBlock().AnyTimes() + sph.EXPECT().SentPacket(gomock.Any()) + sph.EXPECT().HasPacingBudget().Return(true).AnyTimes() + sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() + packer.EXPECT().PackPacket().Return(getPacket(1000), nil) + packer.EXPECT().PackPacket().Return(nil, nil) + sender.EXPECT().Send(gomock.Any()).DoAndReturn(func(p *packetBuffer) { close(written) }) + available <- struct{}{} + Eventually(written).Should(BeClosed()) + }) + + It("stops sending when the send queue is full", func() { + sph.EXPECT().SentPacket(gomock.Any()) + sph.EXPECT().HasPacingBudget().Return(true).AnyTimes() + sph.EXPECT().SendMode().Return(ackhandler.SendAny) + packer.EXPECT().PackPacket().Return(getPacket(1000), nil) + written := make(chan struct{}, 1) + sender.EXPECT().WouldBlock() + sender.EXPECT().WouldBlock().Return(true).Times(2) + sender.EXPECT().Send(gomock.Any()).DoAndReturn(func(p *packetBuffer) { written <- struct{}{} }) + go func() { + defer GinkgoRecover() + cryptoSetup.EXPECT().RunHandshake().MaxTimes(1) + sess.run() + }() + available := make(chan struct{}, 1) + sender.EXPECT().Available().Return(available) + sess.scheduleSending() + Eventually(written).Should(Receive()) + time.Sleep(scaleDuration(50 * time.Millisecond)) + + // now make room in the send queue + sph.EXPECT().SentPacket(gomock.Any()) + sph.EXPECT().HasPacingBudget().Return(true).AnyTimes() + sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() + sender.EXPECT().WouldBlock().AnyTimes() + packer.EXPECT().PackPacket().Return(getPacket(1001), nil) + packer.EXPECT().PackPacket().Return(nil, nil) + sender.EXPECT().Send(gomock.Any()).DoAndReturn(func(p *packetBuffer) { written <- struct{}{} }) + available <- struct{}{} + Eventually(written).Should(Receive()) + + // The send queue is not full any more. Sending on the available channel should have no effect. + available <- struct{}{} + time.Sleep(scaleDuration(50 * time.Millisecond)) + }) + It("doesn't set a pacing timer when there is no data to send", func() { sph.EXPECT().HasPacingBudget().Return(true) sph.EXPECT().SendMode().Return(ackhandler.SendAny).AnyTimes() + sender.EXPECT().WouldBlock().AnyTimes() packer.EXPECT().PackPacket() // don't EXPECT any calls to mconn.Write() go func() { @@ -1545,6 +1613,7 @@ var _ = Describe("Session", func() { BeforeEach(func() { sender = NewMockSender(mockCtrl) + sender.EXPECT().WouldBlock().AnyTimes() sender.EXPECT().Run() sess.sendQueue = sender sess.handshakeConfirmed = true