forked from quic-go/quic-go
use a time.Timer for read deadlines
This commit is contained in:
@@ -43,8 +43,9 @@ type receiveStream struct {
|
|||||||
canceledRead bool // set when CancelRead() is called
|
canceledRead bool // set when CancelRead() is called
|
||||||
resetRemotely bool // set when HandleRstStreamFrame() is called
|
resetRemotely bool // set when HandleRstStreamFrame() is called
|
||||||
|
|
||||||
readChan chan struct{}
|
readChan chan struct{}
|
||||||
readDeadline time.Time
|
deadline time.Time
|
||||||
|
deadlineTimer *time.Timer // initialized by SetReadDeadline()
|
||||||
|
|
||||||
flowController flowcontrol.StreamFlowController
|
flowController flowcontrol.StreamFlowController
|
||||||
version protocol.VersionNumber
|
version protocol.VersionNumber
|
||||||
@@ -120,8 +121,7 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
|
|||||||
return false, bytesRead, s.resetRemotelyErr
|
return false, bytesRead, s.resetRemotelyErr
|
||||||
}
|
}
|
||||||
|
|
||||||
deadline := s.readDeadline
|
if !s.deadline.IsZero() && !time.Now().Before(s.deadline) {
|
||||||
if !deadline.IsZero() && !time.Now().Before(deadline) {
|
|
||||||
return false, bytesRead, errDeadline
|
return false, bytesRead, errDeadline
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -130,12 +130,12 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.mutex.Unlock()
|
s.mutex.Unlock()
|
||||||
if deadline.IsZero() {
|
if s.deadline.IsZero() {
|
||||||
<-s.readChan
|
<-s.readChan
|
||||||
} else {
|
} else {
|
||||||
select {
|
select {
|
||||||
case <-s.readChan:
|
case <-s.readChan:
|
||||||
case <-time.After(time.Until(deadline)):
|
case <-s.deadlineTimer.C:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.mutex.Lock()
|
s.mutex.Lock()
|
||||||
@@ -272,13 +272,22 @@ func (s *receiveStream) onClose(offset protocol.ByteCount) {
|
|||||||
|
|
||||||
func (s *receiveStream) SetReadDeadline(t time.Time) error {
|
func (s *receiveStream) SetReadDeadline(t time.Time) error {
|
||||||
s.mutex.Lock()
|
s.mutex.Lock()
|
||||||
oldDeadline := s.readDeadline
|
defer s.mutex.Unlock()
|
||||||
s.readDeadline = t
|
s.deadline = t
|
||||||
s.mutex.Unlock()
|
if s.deadline.IsZero() { // skip if there's no deadline to set
|
||||||
// if the new deadline is before the currently set deadline, wake up Read()
|
|
||||||
if t.Before(oldDeadline) {
|
|
||||||
s.signalRead()
|
s.signalRead()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
// Lazily initialize the deadline timer.
|
||||||
|
if s.deadlineTimer == nil {
|
||||||
|
s.deadlineTimer = time.NewTimer(time.Until(t))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// reset the timer to the new deadline
|
||||||
|
if !s.deadlineTimer.Stop() {
|
||||||
|
<-s.deadlineTimer.C
|
||||||
|
}
|
||||||
|
s.deadlineTimer.Reset(time.Until(t))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -243,6 +243,20 @@ var _ = Describe("Receive Stream", func() {
|
|||||||
Expect(n).To(BeZero())
|
Expect(n).To(BeZero())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("unblocks when the deadline is changed to the past", func() {
|
||||||
|
str.SetReadDeadline(time.Now().Add(time.Hour))
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
_, err := str.Read(make([]byte, 6))
|
||||||
|
Expect(err).To(MatchError(errDeadline))
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
Consistently(done).ShouldNot(BeClosed())
|
||||||
|
str.SetReadDeadline(time.Now().Add(-time.Hour))
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
|
})
|
||||||
|
|
||||||
It("unblocks after the deadline", func() {
|
It("unblocks after the deadline", func() {
|
||||||
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
|
deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
|
||||||
str.SetReadDeadline(deadline)
|
str.SetReadDeadline(deadline)
|
||||||
|
|||||||
Reference in New Issue
Block a user