From 072b84870c20e1b973869e7b97034d39e45e5c2a Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 16 Dec 2018 20:59:32 +0630 Subject: [PATCH 1/4] don't fire a newly created timer --- internal/utils/timer.go | 7 +++++-- internal/utils/timer_test.go | 5 +++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/internal/utils/timer.go b/internal/utils/timer.go index 20eaacd0..15e6fb05 100644 --- a/internal/utils/timer.go +++ b/internal/utils/timer.go @@ -1,6 +1,9 @@ package utils -import "time" +import ( + "math" + "time" +) // A Timer wrapper that behaves correctly when resetting type Timer struct { @@ -11,7 +14,7 @@ type Timer struct { // NewTimer creates a new timer that is not set func NewTimer() *Timer { - return &Timer{t: time.NewTimer(0)} + return &Timer{t: time.NewTimer(time.Duration(math.MaxInt64))} } // Chan returns the channel of the wrapped timer diff --git a/internal/utils/timer_test.go b/internal/utils/timer_test.go index c1581919..cf26785b 100644 --- a/internal/utils/timer_test.go +++ b/internal/utils/timer_test.go @@ -10,6 +10,11 @@ import ( var _ = Describe("Timer", func() { const d = 10 * time.Millisecond + It("doesn't fire a newly created timer", func() { + t := NewTimer() + Consistently(t.Chan()).ShouldNot(Receive()) + }) + It("works", func() { t := NewTimer() t.Reset(time.Now().Add(d)) From 516b427d4633d6d4396a697c46810fcde1220323 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 16 Dec 2018 21:04:39 +0630 Subject: [PATCH 2/4] don't set a timer when the deadline is the zero value --- internal/utils/timer.go | 4 +++- internal/utils/timer_test.go | 6 ++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/utils/timer.go b/internal/utils/timer.go index 15e6fb05..1fefc6ec 100644 --- a/internal/utils/timer.go +++ b/internal/utils/timer.go @@ -34,7 +34,9 @@ func (t *Timer) Reset(deadline time.Time) { if !t.t.Stop() && !t.read { <-t.t.C } - t.t.Reset(time.Until(deadline)) + if !deadline.IsZero() { + t.t.Reset(time.Until(deadline)) + } t.read = false t.deadline = deadline diff --git a/internal/utils/timer_test.go b/internal/utils/timer_test.go index cf26785b..67949279 100644 --- a/internal/utils/timer_test.go +++ b/internal/utils/timer_test.go @@ -54,6 +54,12 @@ var _ = Describe("Timer", func() { Eventually(t.Chan()).Should(Receive()) }) + It("doesn't set a timer if the deadline is the zero value", func() { + t := NewTimer() + t.Reset(time.Time{}) + Consistently(t.Chan()).ShouldNot(Receive()) + }) + It("fires the timer twice, if reset to the same deadline", func() { deadline := time.Now().Add(-time.Millisecond) t := NewTimer() From 0be8e033ab7527c3cb691bb75f7c1f7c125e879f Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 18 Dec 2018 20:59:50 +0630 Subject: [PATCH 3/4] fix race conditions when setting read and write deadlines --- receive_stream.go | 40 ++++++++++++++---------------- receive_stream_test.go | 27 +++++++++++++++++++++ send_stream.go | 55 +++++++++++++++++++----------------------- send_stream_test.go | 28 +++++++++++++++++++++ 4 files changed, 98 insertions(+), 52 deletions(-) diff --git a/receive_stream.go b/receive_stream.go index 13674e8a..17512520 100644 --- a/receive_stream.go +++ b/receive_stream.go @@ -8,6 +8,7 @@ import ( "github.com/lucas-clemente/quic-go/internal/flowcontrol" "github.com/lucas-clemente/quic-go/internal/protocol" + "github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/wire" ) @@ -43,9 +44,8 @@ type receiveStream struct { canceledRead bool // set when CancelRead() is called resetRemotely bool // set when HandleResetStreamFrame() is called - readChan chan struct{} - deadline time.Time - deadlineTimer *time.Timer // initialized by SetReadDeadline() + readChan chan struct{} + deadline time.Time flowController flowcontrol.StreamFlowController version protocol.VersionNumber @@ -109,6 +109,7 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err return false, bytesRead, s.closeForShutdownErr } + var deadlineTimer *utils.Timer for { // Stop waiting on errors if s.closedForShutdown { @@ -121,8 +122,15 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err return false, bytesRead, s.resetRemotelyErr } - if !s.deadline.IsZero() && !time.Now().Before(s.deadline) { - return false, bytesRead, errDeadline + deadline := s.deadline + if !deadline.IsZero() { + if !time.Now().Before(deadline) { + return false, bytesRead, errDeadline + } + if deadlineTimer == nil { + deadlineTimer = utils.NewTimer() + } + deadlineTimer.Reset(deadline) } if s.currentFrame != nil || s.currentFrameIsLast { @@ -130,12 +138,13 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err } s.mutex.Unlock() - if s.deadline.IsZero() { + if deadline.IsZero() { <-s.readChan } else { select { case <-s.readChan: - case <-s.deadlineTimer.C: + case <-deadlineTimer.Chan(): + deadlineTimer.SetRead() } } s.mutex.Lock() @@ -252,22 +261,9 @@ func (s *receiveStream) CloseRemote(offset protocol.ByteCount) { func (s *receiveStream) SetReadDeadline(t time.Time) error { s.mutex.Lock() - defer s.mutex.Unlock() s.deadline = t - if s.deadline.IsZero() { // skip if there's no deadline to set - s.signalRead() - return nil - } - // Lazily initialize the deadline timer. - if s.deadlineTimer == nil { - s.deadlineTimer = time.NewTimer(time.Until(t)) - return nil - } - // reset the timer to the new deadline - if !s.deadlineTimer.Stop() { - <-s.deadlineTimer.C - } - s.deadlineTimer.Reset(time.Until(t)) + s.mutex.Unlock() + s.signalRead() return nil } diff --git a/receive_stream_test.go b/receive_stream_test.go index 27543143..7707aba2 100644 --- a/receive_stream_test.go +++ b/receive_stream_test.go @@ -303,6 +303,33 @@ var _ = Describe("Receive Stream", func() { Expect(err).To(MatchError(errDeadline)) Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(25*time.Millisecond))) }) + + It("doesn't unblock if the deadline is removed", func() { + deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) + str.SetReadDeadline(deadline) + deadlineUnset := make(chan struct{}) + go func() { + defer GinkgoRecover() + time.Sleep(scaleDuration(20 * time.Millisecond)) + str.SetReadDeadline(time.Time{}) + // make sure that this was actually execute before the deadline expires + Expect(time.Now()).To(BeTemporally("<", deadline)) + close(deadlineUnset) + }() + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := strWithTimeout.Read(make([]byte, 1)) + Expect(err).To(MatchError("test done")) + close(done) + }() + runtime.Gosched() + Eventually(deadlineUnset).Should(BeClosed()) + Consistently(done, scaleDuration(100*time.Millisecond)).ShouldNot(BeClosed()) + // make the go routine return + str.closeForShutdown(errors.New("test done")) + Eventually(done).Should(BeClosed()) + }) }) Context("closing", func() { diff --git a/send_stream.go b/send_stream.go index ccb58a93..94883392 100644 --- a/send_stream.go +++ b/send_stream.go @@ -42,9 +42,8 @@ type sendStream struct { dataForWriting []byte - writeChan chan struct{} - deadline time.Time - deadlineTimer *time.Timer // initialized by SetReadDeadline() + writeChan chan struct{} + deadline time.Time flowController flowcontrol.StreamFlowController @@ -97,15 +96,23 @@ func (s *sendStream) Write(p []byte) (int, error) { s.dataForWriting = p - var bytesWritten int - var err error - var notifiedSender bool + var ( + deadlineTimer *utils.Timer + bytesWritten int + notifiedSender bool + ) for { bytesWritten = len(p) - len(s.dataForWriting) - if !s.deadline.IsZero() && !time.Now().Before(s.deadline) { - s.dataForWriting = nil - err = errDeadline - break + deadline := s.deadline + if !deadline.IsZero() { + if !time.Now().Before(deadline) { + s.dataForWriting = nil + return bytesWritten, errDeadline + } + if deadlineTimer == nil { + deadlineTimer = utils.NewTimer() + } + deadlineTimer.Reset(deadline) } if s.dataForWriting == nil || s.canceledWrite || s.closedForShutdown { break @@ -116,23 +123,24 @@ func (s *sendStream) Write(p []byte) (int, error) { s.sender.onHasStreamData(s.streamID) // must be called without holding the mutex notifiedSender = true } - if s.deadline.IsZero() { + if deadline.IsZero() { <-s.writeChan } else { select { case <-s.writeChan: - case <-s.deadlineTimer.C: + case <-deadlineTimer.Chan(): + deadlineTimer.SetRead() } } s.mutex.Lock() } if s.closeForShutdownErr != nil { - err = s.closeForShutdownErr + return bytesWritten, s.closeForShutdownErr } else if s.cancelWriteErr != nil { - err = s.cancelWriteErr + return bytesWritten, s.cancelWriteErr } - return bytesWritten, err + return bytesWritten, nil } // popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream @@ -301,22 +309,9 @@ func (s *sendStream) Context() context.Context { func (s *sendStream) SetWriteDeadline(t time.Time) error { s.mutex.Lock() - defer s.mutex.Unlock() s.deadline = t - if s.deadline.IsZero() { // skip if there's no deadline to set - s.signalWrite() - return nil - } - // Lazily initialize the deadline timer. - if s.deadlineTimer == nil { - s.deadlineTimer = time.NewTimer(time.Until(t)) - return nil - } - // reset the timer to the new deadline - if !s.deadlineTimer.Stop() { - <-s.deadlineTimer.C - } - s.deadlineTimer.Reset(time.Until(t)) + s.mutex.Unlock() + s.signalWrite() return nil } diff --git a/send_stream_test.go b/send_stream_test.go index b0a578f3..c2955c29 100644 --- a/send_stream_test.go +++ b/send_stream_test.go @@ -360,6 +360,34 @@ var _ = Describe("Send Stream", func() { Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond))) Eventually(done).Should(BeClosed()) }) + + It("doesn't unblock if the deadline is removed", func() { + mockSender.EXPECT().onHasStreamData(streamID) + deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) + str.SetWriteDeadline(deadline) + deadlineUnset := make(chan struct{}) + go func() { + defer GinkgoRecover() + time.Sleep(scaleDuration(20 * time.Millisecond)) + str.SetWriteDeadline(time.Time{}) + // make sure that this was actually execute before the deadline expires + Expect(time.Now()).To(BeTemporally("<", deadline)) + close(deadlineUnset) + }() + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := strWithTimeout.Write([]byte("foobar")) + Expect(err).To(MatchError("test done")) + close(done) + }() + runtime.Gosched() + Eventually(deadlineUnset).Should(BeClosed()) + Consistently(done, scaleDuration(100*time.Millisecond)).ShouldNot(BeClosed()) + // make the go routine return + str.closeForShutdown(errors.New("test done")) + Eventually(done).Should(BeClosed()) + }) }) Context("closing", func() { From a8ad6eea383853007a191c4a15283770b952a837 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 18 Dec 2018 22:08:05 +0630 Subject: [PATCH 4/4] add integration tests for setting read and write deadlines --- integrationtests/self/deadline_test.go | 213 +++++++++++++++++++++++++ 1 file changed, 213 insertions(+) create mode 100644 integrationtests/self/deadline_test.go diff --git a/integrationtests/self/deadline_test.go b/integrationtests/self/deadline_test.go new file mode 100644 index 00000000..e65d0b3a --- /dev/null +++ b/integrationtests/self/deadline_test.go @@ -0,0 +1,213 @@ +package self + +import ( + "crypto/tls" + "fmt" + "io/ioutil" + "net" + "time" + + quic "github.com/lucas-clemente/quic-go" + "github.com/lucas-clemente/quic-go/integrationtests/tools/testserver" + "github.com/lucas-clemente/quic-go/internal/testdata" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Stream deadline tests", func() { + var ( + server quic.Listener + serverStr quic.Stream + clientStr quic.Stream + ) + + BeforeEach(func() { + var err error + server, err = quic.ListenAddr("localhost:0", testdata.GetTLSConfig(), nil) + Expect(err).ToNot(HaveOccurred()) + acceptedStream := make(chan struct{}) + go func() { + defer GinkgoRecover() + sess, err := server.Accept() + Expect(err).ToNot(HaveOccurred()) + serverStr, err = sess.AcceptStream() + Expect(err).ToNot(HaveOccurred()) + _, err = serverStr.Read([]byte{0}) + Expect(err).ToNot(HaveOccurred()) + close(acceptedStream) + }() + + sess, err := quic.DialAddr( + fmt.Sprintf("localhost:%d", server.Addr().(*net.UDPAddr).Port), + &tls.Config{RootCAs: testdata.GetRootCA()}, + nil, + ) + Expect(err).ToNot(HaveOccurred()) + clientStr, err = sess.OpenStream() + Expect(err).ToNot(HaveOccurred()) + _, err = clientStr.Write([]byte{0}) // need to write one byte so the server learns about the stream + Expect(err).ToNot(HaveOccurred()) + Eventually(acceptedStream).Should(BeClosed()) + }) + + AfterEach(func() { + Expect(server.Close()).To(Succeed()) + }) + + Context("read deadlines", func() { + It("completes a transfer when the deadline is set", func() { + const timeout = 20 * time.Millisecond + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + _, err := serverStr.Write(testserver.PRDataLong) + Expect(err).ToNot(HaveOccurred()) + close(done) + }() + + var bytesRead int + var timeoutCounter int + buf := make([]byte, 1<<10) + data := make([]byte, len(testserver.PRDataLong)) + clientStr.SetReadDeadline(time.Now().Add(timeout)) + for bytesRead < len(testserver.PRDataLong) { + n, err := clientStr.Read(buf) + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + timeoutCounter++ + clientStr.SetReadDeadline(time.Now().Add(timeout)) + } else { + Expect(err).ToNot(HaveOccurred()) + } + copy(data[bytesRead:], buf[:n]) + bytesRead += n + } + Expect(data).To(Equal(testserver.PRDataLong)) + // make sure the test actually worked an Read actually ran into the deadline a few times + Expect(timeoutCounter).To(BeNumerically(">=", 10)) + Eventually(done).Should(BeClosed()) + }) + + It("completes a transfer when the deadline is set concurrently", func() { + const timeout = 20 * time.Millisecond + go func() { + defer GinkgoRecover() + _, err := serverStr.Write(testserver.PRDataLong) + Expect(err).ToNot(HaveOccurred()) + }() + + var bytesRead int + var timeoutCounter int + buf := make([]byte, 1<<10) + data := make([]byte, len(testserver.PRDataLong)) + clientStr.SetReadDeadline(time.Now().Add(timeout)) + deadlineDone := make(chan struct{}) + received := make(chan struct{}) + go func() { + defer close(deadlineDone) + for { + select { + case <-received: + return + default: + time.Sleep(timeout) + } + clientStr.SetReadDeadline(time.Now().Add(timeout)) + } + }() + + for bytesRead < len(testserver.PRDataLong) { + n, err := clientStr.Read(buf) + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + timeoutCounter++ + } else { + Expect(err).ToNot(HaveOccurred()) + } + copy(data[bytesRead:], buf[:n]) + bytesRead += n + } + close(received) + Expect(data).To(Equal(testserver.PRDataLong)) + // make sure the test actually worked an Read actually ran into the deadline a few times + Expect(timeoutCounter).To(BeNumerically(">=", 10)) + Eventually(deadlineDone).Should(BeClosed()) + }) + }) + + Context("write deadlines", func() { + It("completes a transfer when the deadline is set", func() { + const timeout = 20 * time.Millisecond + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + data, err := ioutil.ReadAll(serverStr) + Expect(err).ToNot(HaveOccurred()) + Expect(data).To(Equal(testserver.PRDataLong)) + close(done) + }() + + var bytesWritten int + var timeoutCounter int + clientStr.SetWriteDeadline(time.Now().Add(timeout)) + for bytesWritten < len(testserver.PRDataLong) { + n, err := clientStr.Write(testserver.PRDataLong[bytesWritten:]) + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + timeoutCounter++ + clientStr.SetWriteDeadline(time.Now().Add(timeout)) + } else { + Expect(err).ToNot(HaveOccurred()) + } + bytesWritten += n + } + clientStr.Close() + // make sure the test actually worked an Read actually ran into the deadline a few times + Expect(timeoutCounter).To(BeNumerically(">=", 10)) + Eventually(done).Should(BeClosed()) + }) + + It("completes a transfer when the deadline is set concurrently", func() { + const timeout = 20 * time.Millisecond + readDone := make(chan struct{}) + go func() { + defer GinkgoRecover() + data, err := ioutil.ReadAll(serverStr) + Expect(err).ToNot(HaveOccurred()) + Expect(data).To(Equal(testserver.PRDataLong)) + close(readDone) + }() + + clientStr.SetWriteDeadline(time.Now().Add(timeout)) + deadlineDone := make(chan struct{}) + go func() { + defer close(deadlineDone) + for { + select { + case <-readDone: + return + default: + time.Sleep(timeout) + } + clientStr.SetWriteDeadline(time.Now().Add(timeout)) + } + }() + + var bytesWritten int + var timeoutCounter int + clientStr.SetWriteDeadline(time.Now().Add(timeout)) + for bytesWritten < len(testserver.PRDataLong) { + n, err := clientStr.Write(testserver.PRDataLong[bytesWritten:]) + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + timeoutCounter++ + } else { + Expect(err).ToNot(HaveOccurred()) + } + bytesWritten += n + } + clientStr.Close() + // make sure the test actually worked an Read actually ran into the deadline a few times + Expect(timeoutCounter).To(BeNumerically(">=", 10)) + Eventually(readDone).Should(BeClosed()) + Eventually(deadlineDone).Should(BeClosed()) + }) + }) +})