Merge pull request #1544 from lucas-clemente/optimize-timers

optimize stream deadline timers
This commit is contained in:
Marten Seemann
2018-10-20 10:50:14 +09:00
committed by GitHub
4 changed files with 71 additions and 22 deletions

View File

@@ -43,8 +43,9 @@ type receiveStream struct {
canceledRead bool // set when CancelRead() is called
resetRemotely bool // set when HandleRstStreamFrame() is called
readChan chan struct{}
readDeadline time.Time
readChan chan struct{}
deadline time.Time
deadlineTimer *time.Timer // initialized by SetReadDeadline()
flowController flowcontrol.StreamFlowController
version protocol.VersionNumber
@@ -120,8 +121,7 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
return false, bytesRead, s.resetRemotelyErr
}
deadline := s.readDeadline
if !deadline.IsZero() && !time.Now().Before(deadline) {
if !s.deadline.IsZero() && !time.Now().Before(s.deadline) {
return false, bytesRead, errDeadline
}
@@ -130,12 +130,12 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
}
s.mutex.Unlock()
if deadline.IsZero() {
if s.deadline.IsZero() {
<-s.readChan
} else {
select {
case <-s.readChan:
case <-time.After(time.Until(deadline)):
case <-s.deadlineTimer.C:
}
}
s.mutex.Lock()
@@ -272,13 +272,22 @@ func (s *receiveStream) onClose(offset protocol.ByteCount) {
func (s *receiveStream) SetReadDeadline(t time.Time) error {
s.mutex.Lock()
oldDeadline := s.readDeadline
s.readDeadline = t
s.mutex.Unlock()
// if the new deadline is before the currently set deadline, wake up Read()
if t.Before(oldDeadline) {
defer s.mutex.Unlock()
s.deadline = t
if s.deadline.IsZero() { // skip if there's no deadline to set
s.signalRead()
return nil
}
// Lazily initialize the deadline timer.
if s.deadlineTimer == nil {
s.deadlineTimer = time.NewTimer(time.Until(t))
return nil
}
// reset the timer to the new deadline
if !s.deadlineTimer.Stop() {
<-s.deadlineTimer.C
}
s.deadlineTimer.Reset(time.Until(t))
return nil
}

View File

@@ -243,6 +243,20 @@ var _ = Describe("Receive Stream", func() {
Expect(n).To(BeZero())
})
It("unblocks when the deadline is changed to the past", func() {
str.SetReadDeadline(time.Now().Add(time.Hour))
done := make(chan struct{})
go func() {
defer GinkgoRecover()
_, err := str.Read(make([]byte, 6))
Expect(err).To(MatchError(errDeadline))
close(done)
}()
Consistently(done).ShouldNot(BeClosed())
str.SetReadDeadline(time.Now().Add(-time.Hour))
Eventually(done).Should(BeClosed())
})
It("unblocks after the deadline", func() {
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
str.SetReadDeadline(deadline)

View File

@@ -41,8 +41,10 @@ type sendStream struct {
finSent bool // set when a STREAM_FRAME with FIN bit has b
dataForWriting []byte
writeChan chan struct{}
writeDeadline time.Time
writeChan chan struct{}
deadline time.Time
deadlineTimer *time.Timer // initialized by SetReadDeadline()
flowController flowcontrol.StreamFlowController
@@ -86,7 +88,7 @@ func (s *sendStream) Write(p []byte) (int, error) {
if s.closeForShutdownErr != nil {
return 0, s.closeForShutdownErr
}
if !s.writeDeadline.IsZero() && !time.Now().Before(s.writeDeadline) {
if !s.deadline.IsZero() && !time.Now().Before(s.deadline) {
return 0, errDeadline
}
if len(p) == 0 {
@@ -101,8 +103,7 @@ func (s *sendStream) Write(p []byte) (int, error) {
var err error
for {
bytesWritten = len(p) - len(s.dataForWriting)
deadline := s.writeDeadline
if !deadline.IsZero() && !time.Now().Before(deadline) {
if !s.deadline.IsZero() && !time.Now().Before(s.deadline) {
s.dataForWriting = nil
err = errDeadline
break
@@ -112,12 +113,12 @@ func (s *sendStream) Write(p []byte) (int, error) {
}
s.mutex.Unlock()
if deadline.IsZero() {
if s.deadline.IsZero() {
<-s.writeChan
} else {
select {
case <-s.writeChan:
case <-time.After(time.Until(deadline)):
case <-s.deadlineTimer.C:
}
}
s.mutex.Lock()
@@ -298,12 +299,22 @@ func (s *sendStream) Context() context.Context {
func (s *sendStream) SetWriteDeadline(t time.Time) error {
s.mutex.Lock()
oldDeadline := s.writeDeadline
s.writeDeadline = t
s.mutex.Unlock()
if t.Before(oldDeadline) {
defer s.mutex.Unlock()
s.deadline = t
if s.deadline.IsZero() { // skip if there's no deadline to set
s.signalWrite()
return nil
}
// Lazily initialize the deadline timer.
if s.deadlineTimer == nil {
s.deadlineTimer = time.NewTimer(time.Until(t))
return nil
}
// reset the timer to the new deadline
if !s.deadlineTimer.Stop() {
<-s.deadlineTimer.C
}
s.deadlineTimer.Reset(time.Until(t))
return nil
}

View File

@@ -255,6 +255,21 @@ var _ = Describe("Send Stream", func() {
Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond)))
})
It("unblocks when the deadline is changed to the past", func() {
mockSender.EXPECT().onHasStreamData(streamID)
str.SetWriteDeadline(time.Now().Add(time.Hour))
done := make(chan struct{})
go func() {
defer GinkgoRecover()
_, err := str.Write([]byte("foobar"))
Expect(err).To(MatchError(errDeadline))
close(done)
}()
Consistently(done).ShouldNot(BeClosed())
str.SetWriteDeadline(time.Now().Add(-time.Hour))
Eventually(done).Should(BeClosed())
})
It("returns the number of bytes written, when the deadline expires", func() {
mockSender.EXPECT().onHasStreamData(streamID)
mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()