From 5fbd52158fe49ade53edcf4f8f079e2a12293261 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 15 Apr 2017 12:13:12 +0700 Subject: [PATCH 1/7] replace the sync.Cond for stream.Read() by a channel --- stream.go | 27 ++++++++++++++++++--------- stream_test.go | 8 +++----- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/stream.go b/stream.go index 183db9cbc..3e2a7303a 100644 --- a/stream.go +++ b/stream.go @@ -40,8 +40,8 @@ type stream struct { // resetRemotely is set if RegisterRemoteError() is called resetRemotely utils.AtomicBool - frameQueue *streamFrameSorter - newFrameOrErrCond sync.Cond + frameQueue *streamFrameSorter + readChan chan struct{} dataForWriting []byte finSent utils.AtomicBool @@ -62,8 +62,8 @@ func newStream(StreamID protocol.StreamID, streamID: StreamID, flowControlManager: flowControlManager, frameQueue: newStreamFrameSorter(), + readChan: make(chan struct{}, 1), } - s.newFrameOrErrCond.L = &s.mutex s.doneWritingOrErrCond.L = &s.mutex return s } @@ -84,9 +84,9 @@ func (s *stream) Read(p []byte) (int, error) { for bytesRead < len(p) { s.mutex.Lock() frame := s.frameQueue.Head() + s.mutex.Unlock() if frame == nil && bytesRead > 0 { - s.mutex.Unlock() return bytesRead, s.err } @@ -101,10 +101,11 @@ func (s *stream) Read(p []byte) (int, error) { s.readPosInFrame = int(s.readOffset - frame.Offset) break } - s.newFrameOrErrCond.Wait() + <-s.readChan + s.mutex.Lock() frame = s.frameQueue.Head() + s.mutex.Unlock() } - s.mutex.Unlock() if err != nil { return bytesRead, err @@ -250,10 +251,18 @@ func (s *stream) AddStreamFrame(frame *frames.StreamFrame) error { if err != nil && err != errDuplicateStreamData { return err } - s.newFrameOrErrCond.Signal() + s.signalRead() return nil } +// signalRead performs a non-blocking send on the readChan +func (s *stream) signalRead() { + select { + case s.readChan <- struct{}{}: + default: + } +} + // CloseRemote makes the stream receive a "virtual" FIN stream frame at a given offset func (s *stream) CloseRemote(offset protocol.ByteCount) { s.AddStreamFrame(&frames.StreamFrame{FinBit: true, Offset: offset}) @@ -267,7 +276,7 @@ func (s *stream) Cancel(err error) { // errors must not be changed! if s.err == nil { s.err = err - s.newFrameOrErrCond.Signal() + s.signalRead() s.doneWritingOrErrCond.Signal() } s.mutex.Unlock() @@ -283,7 +292,7 @@ func (s *stream) Reset(err error) { // errors must not be changed! if s.err == nil { s.err = err - s.newFrameOrErrCond.Signal() + s.signalRead() s.doneWritingOrErrCond.Signal() } if s.shouldSendReset() { diff --git a/stream_test.go b/stream_test.go index ad066f9ca..4f4f273a5 100644 --- a/stream_test.go +++ b/stream_test.go @@ -135,11 +135,9 @@ var _ = Describe("Stream", func() { mockFcm.EXPECT().UpdateHighestReceived(streamID, protocol.ByteCount(2)) mockFcm.EXPECT().AddBytesRead(streamID, protocol.ByteCount(2)) go func() { - frame := frames.StreamFrame{ - Offset: 0, - Data: []byte{0xDE, 0xAD}, - } - time.Sleep(time.Millisecond) + defer GinkgoRecover() + frame := frames.StreamFrame{Data: []byte{0xDE, 0xAD}} + time.Sleep(10 * time.Millisecond) err := str.AddStreamFrame(&frame) Expect(err).ToNot(HaveOccurred()) }() From a70ae86f5afd2b8fbf06db019fdff1c4e21dde94 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 15 Apr 2017 12:24:17 +0700 Subject: [PATCH 2/7] replace the sync.Cond for stream.Write() by a channel --- stream.go | 69 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/stream.go b/stream.go index 3e2a7303a..ce0ffdb28 100644 --- a/stream.go +++ b/stream.go @@ -43,10 +43,10 @@ type stream struct { frameQueue *streamFrameSorter readChan chan struct{} - dataForWriting []byte - finSent utils.AtomicBool - rstSent utils.AtomicBool - doneWritingOrErrCond sync.Cond + dataForWriting []byte + finSent utils.AtomicBool + rstSent utils.AtomicBool + writeChan chan struct{} flowControlManager flowcontrol.FlowControlManager } @@ -56,16 +56,15 @@ func newStream(StreamID protocol.StreamID, onData func(), onReset func(protocol.StreamID, protocol.ByteCount), flowControlManager flowcontrol.FlowControlManager) *stream { - s := &stream{ + return &stream{ onData: onData, onReset: onReset, streamID: StreamID, flowControlManager: flowControlManager, frameQueue: newStreamFrameSorter(), readChan: make(chan struct{}, 1), + writeChan: make(chan struct{}, 1), } - s.doneWritingOrErrCond.L = &s.mutex - return s } // Read implements io.Reader. It is not thread safe! @@ -147,30 +146,35 @@ func (s *stream) Read(p []byte) (int, error) { } func (s *stream) Write(p []byte) (int, error) { - if s.resetLocally.Get() { - return 0, s.err - } - s.mutex.Lock() - defer s.mutex.Unlock() - - if s.err != nil { - return 0, s.err + if s.resetLocally.Get() || s.err != nil { + err := s.err + s.mutex.Unlock() + return 0, err } - if len(p) == 0 { + s.mutex.Unlock() return 0, nil } s.dataForWriting = make([]byte, len(p)) copy(s.dataForWriting, p) - s.onData() + s.mutex.Unlock() - for s.dataForWriting != nil && s.err == nil { - s.doneWritingOrErrCond.Wait() + for { + s.mutex.Lock() + if s.dataForWriting == nil || s.err != nil { + s.mutex.Unlock() + break + } + s.mutex.Unlock() + <-s.writeChan } + s.mutex.Lock() + defer s.mutex.Unlock() + if s.err != nil { return len(p) - len(s.dataForWriting), s.err } @@ -190,14 +194,12 @@ func (s *stream) lenOfDataForWriting() protocol.ByteCount { func (s *stream) getDataForWriting(maxBytes protocol.ByteCount) []byte { s.mutex.Lock() - if s.err != nil { - s.mutex.Unlock() - return nil - } - if s.dataForWriting == nil { - s.mutex.Unlock() + defer s.mutex.Unlock() + + if s.err != nil || s.dataForWriting == nil { return nil } + var ret []byte if protocol.ByteCount(len(s.dataForWriting)) > maxBytes { ret = s.dataForWriting[:maxBytes] @@ -205,10 +207,9 @@ func (s *stream) getDataForWriting(maxBytes protocol.ByteCount) []byte { } else { ret = s.dataForWriting s.dataForWriting = nil - s.doneWritingOrErrCond.Signal() + s.signalWrite() } s.writeOffset += protocol.ByteCount(len(ret)) - s.mutex.Unlock() return ret } @@ -263,6 +264,14 @@ func (s *stream) signalRead() { } } +// signalRead performs a non-blocking send on the writeChan +func (s *stream) signalWrite() { + select { + case s.writeChan <- struct{}{}: + default: + } +} + // CloseRemote makes the stream receive a "virtual" FIN stream frame at a given offset func (s *stream) CloseRemote(offset protocol.ByteCount) { s.AddStreamFrame(&frames.StreamFrame{FinBit: true, Offset: offset}) @@ -277,7 +286,7 @@ func (s *stream) Cancel(err error) { if s.err == nil { s.err = err s.signalRead() - s.doneWritingOrErrCond.Signal() + s.signalWrite() } s.mutex.Unlock() } @@ -293,7 +302,7 @@ func (s *stream) Reset(err error) { if s.err == nil { s.err = err s.signalRead() - s.doneWritingOrErrCond.Signal() + s.signalWrite() } if s.shouldSendReset() { s.onReset(s.streamID, s.writeOffset) @@ -312,7 +321,7 @@ func (s *stream) RegisterRemoteError(err error) { // errors must not be changed! if s.err == nil { s.err = err - s.doneWritingOrErrCond.Signal() + s.signalWrite() } if s.shouldSendReset() { s.onReset(s.streamID, s.writeOffset) From 5720e8af7d7584f086cdbd75ad3f99b6c6211dbd Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 16 Apr 2017 11:12:21 +0700 Subject: [PATCH 3/7] implement read deadlines --- stream.go | 50 ++++++++++++++++++++++++++++++++++------ stream_test.go | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 7 deletions(-) diff --git a/stream.go b/stream.go index ce0ffdb28..9ee9b3a37 100644 --- a/stream.go +++ b/stream.go @@ -1,9 +1,11 @@ package quic import ( + "errors" "fmt" "io" "sync" + "time" "github.com/lucas-clemente/quic-go/flowcontrol" "github.com/lucas-clemente/quic-go/frames" @@ -40,8 +42,9 @@ type stream struct { // resetRemotely is set if RegisterRemoteError() is called resetRemotely utils.AtomicBool - frameQueue *streamFrameSorter - readChan chan struct{} + frameQueue *streamFrameSorter + readChan chan struct{} + readDeadline time.Time dataForWriting []byte finSent utils.AtomicBool @@ -51,6 +54,8 @@ type stream struct { flowControlManager flowcontrol.FlowControlManager } +var errDeadline = errors.New("deadline exceeded") + // newStream creates a new Stream func newStream(StreamID protocol.StreamID, onData func(), @@ -83,10 +88,10 @@ func (s *stream) Read(p []byte) (int, error) { for bytesRead < len(p) { s.mutex.Lock() frame := s.frameQueue.Head() - s.mutex.Unlock() - if frame == nil && bytesRead > 0 { - return bytesRead, s.err + err = s.err + s.mutex.Unlock() + return bytesRead, err } var err error @@ -96,15 +101,31 @@ func (s *stream) Read(p []byte) (int, error) { err = s.err break } + + deadline := s.readDeadline + if !deadline.IsZero() && !time.Now().Before(deadline) { + err = errDeadline + break + } + if frame != nil { s.readPosInFrame = int(s.readOffset - frame.Offset) break } - <-s.readChan + + s.mutex.Unlock() + if deadline.IsZero() { + <-s.readChan + } else { + select { + case <-s.readChan: + case <-time.After(deadline.Sub(time.Now())): + } + } s.mutex.Lock() frame = s.frameQueue.Head() - s.mutex.Unlock() } + s.mutex.Unlock() if err != nil { return bytesRead, err @@ -272,6 +293,21 @@ func (s *stream) signalWrite() { } } +// SetReadDeadline sets the deadline for future Read calls and +// any currently-blocked Read call. +// A zero value for t means Read will not time out. +func (s *stream) SetReadDeadline(t time.Time) error { + s.mutex.Lock() + oldDeadline := s.readDeadline + s.readDeadline = t + s.mutex.Unlock() + // if the new deadline is before the currently set deadline, wake up Read() + if t.Before(oldDeadline) { + s.signalRead() + } + return nil +} + // CloseRemote makes the stream receive a "virtual" FIN stream frame at a given offset func (s *stream) CloseRemote(offset protocol.ByteCount) { s.AddStreamFrame(&frames.StreamFrame{FinBit: true, Offset: offset}) diff --git a/stream_test.go b/stream_test.go index 4f4f273a5..6b7cdf72a 100644 --- a/stream_test.go +++ b/stream_test.go @@ -3,6 +3,7 @@ package quic import ( "errors" "io" + "runtime" "time" "github.com/lucas-clemente/quic-go/frames" @@ -238,6 +239,67 @@ var _ = Describe("Stream", func() { Expect(onDataCalled).To(BeTrue()) }) + Context("deadlines", func() { + It("returns an error when Read is called after the deadline", func() { + mockFcm.EXPECT().UpdateHighestReceived(streamID, protocol.ByteCount(6)).AnyTimes() + f := &frames.StreamFrame{Data: []byte("foobar")} + err := str.AddStreamFrame(f) + Expect(err).ToNot(HaveOccurred()) + str.SetReadDeadline(time.Now().Add(-time.Second)) + b := make([]byte, 6) + n, err := str.Read(b) + Expect(err).To(MatchError(errDeadline)) + Expect(n).To(BeZero()) + }) + + It("unblocks read after the deadline", func() { + deadline := time.Now().Add(200 * time.Millisecond) + str.SetReadDeadline(deadline) + b := make([]byte, 6) + n, err := str.Read(b) + Expect(err).To(MatchError(errDeadline)) + Expect(n).To(BeZero()) + Expect(time.Now()).To(BeTemporally("~", deadline, 50*time.Millisecond)) + }) + + It("doesn't unblock if the deadline is changed before the first one expires", func() { + deadline1 := time.Now().Add(200 * time.Millisecond) + deadline2 := time.Now().Add(400 * time.Millisecond) + str.SetReadDeadline(deadline1) + go func() { + defer GinkgoRecover() + time.Sleep(50 * time.Millisecond) + str.SetReadDeadline(deadline2) + // make sure that this was actually execute before the deadline expires + Expect(time.Now()).To(BeTemporally("<", deadline1)) + }() + runtime.Gosched() + b := make([]byte, 10) + n, err := str.Read(b) + Expect(err).To(MatchError(errDeadline)) + Expect(n).To(BeZero()) + Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond)) + }) + + It("unblocks earlier, when a new deadline is set", func() { + deadline1 := time.Now().Add(1200 * time.Millisecond) + deadline2 := time.Now().Add(300 * time.Millisecond) + go func() { + defer GinkgoRecover() + time.Sleep(50 * time.Millisecond) + str.SetReadDeadline(deadline2) + // make sure that this was actually execute before the deadline expires + Expect(time.Now()).To(BeTemporally("<", deadline2)) + }() + str.SetReadDeadline(deadline1) + runtime.Gosched() + b := make([]byte, 10) + _, err := str.Read(b) + Expect(err).To(MatchError(errDeadline)) + Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond)) + }) + }) + Context("closing", func() { Context("with FIN bit", func() { It("returns EOFs", func() { From 1acdc5f18e9498a862ef2a2e63bb4a1064bc4df8 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 30 Apr 2017 12:01:59 +0700 Subject: [PATCH 4/7] implement write deadlines --- stream.go | 62 +++++++++++++++++++++++++++++++++--------- stream_test.go | 74 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 122 insertions(+), 14 deletions(-) diff --git a/stream.go b/stream.go index 9ee9b3a37..1348eb9cc 100644 --- a/stream.go +++ b/stream.go @@ -50,6 +50,7 @@ type stream struct { finSent utils.AtomicBool rstSent utils.AtomicBool writeChan chan struct{} + writeDeadline time.Time flowControlManager flowcontrol.FlowControlManager } @@ -168,38 +169,48 @@ func (s *stream) Read(p []byte) (int, error) { func (s *stream) Write(p []byte) (int, error) { s.mutex.Lock() + defer s.mutex.Unlock() + if s.resetLocally.Get() || s.err != nil { - err := s.err - s.mutex.Unlock() - return 0, err + return 0, s.err } if len(p) == 0 { - s.mutex.Unlock() return 0, nil } s.dataForWriting = make([]byte, len(p)) copy(s.dataForWriting, p) s.onData() - s.mutex.Unlock() + var err error for { - s.mutex.Lock() - if s.dataForWriting == nil || s.err != nil { - s.mutex.Unlock() + deadline := s.writeDeadline + if !deadline.IsZero() && !time.Now().Before(deadline) { + err = errDeadline break } + if s.dataForWriting == nil || s.err != nil { + break + } + s.mutex.Unlock() - <-s.writeChan + if deadline.IsZero() { + <-s.writeChan + } else { + select { + case <-s.writeChan: + case <-time.After(deadline.Sub(time.Now())): + } + } + s.mutex.Lock() } - s.mutex.Lock() - defer s.mutex.Unlock() - + if err != nil { + return 0, err + } if s.err != nil { return len(p) - len(s.dataForWriting), s.err } - return len(p), nil } @@ -308,6 +319,31 @@ func (s *stream) SetReadDeadline(t time.Time) error { return nil } +// SetWriteDeadline sets the deadline for future Write calls +// and any currently-blocked Write call. +// Even if write times out, it may return n > 0, indicating that +// some of the data was successfully written. +// A zero value for t means Write will not time out. +func (s *stream) SetWriteDeadline(t time.Time) error { + s.mutex.Lock() + oldDeadline := s.writeDeadline + s.writeDeadline = t + s.mutex.Unlock() + if t.Before(oldDeadline) { + s.signalWrite() + } + return nil +} + +// SetDeadline sets the read and write deadlines associated +// with the connection. It is equivalent to calling both +// SetReadDeadline and SetWriteDeadline. +func (s *stream) SetDeadline(t time.Time) error { + _ = s.SetReadDeadline(t) // SetReadDeadline never errors + _ = s.SetWriteDeadline(t) // SetWriteDeadline never errors + return nil +} + // CloseRemote makes the stream receive a "virtual" FIN stream frame at a given offset func (s *stream) CloseRemote(offset protocol.ByteCount) { s.AddStreamFrame(&frames.StreamFrame{FinBit: true, Offset: offset}) diff --git a/stream_test.go b/stream_test.go index 6b7cdf72a..c2a1bb308 100644 --- a/stream_test.go +++ b/stream_test.go @@ -252,7 +252,7 @@ var _ = Describe("Stream", func() { Expect(n).To(BeZero()) }) - It("unblocks read after the deadline", func() { + It("unblocks after the deadline", func() { deadline := time.Now().Add(200 * time.Millisecond) str.SetReadDeadline(deadline) b := make([]byte, 6) @@ -298,6 +298,18 @@ var _ = Describe("Stream", func() { Expect(err).To(MatchError(errDeadline)) Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond)) }) + + It("sets a read deadline, when SetDeadline is called", func() { + mockFcm.EXPECT().UpdateHighestReceived(streamID, protocol.ByteCount(6)).AnyTimes() + f := &frames.StreamFrame{Data: []byte("foobar")} + err := str.AddStreamFrame(f) + Expect(err).ToNot(HaveOccurred()) + str.SetDeadline(time.Now().Add(-time.Second)) + b := make([]byte, 6) + n, err := str.Read(b) + Expect(err).To(MatchError(errDeadline)) + Expect(n).To(BeZero()) + }) }) Context("closing", func() { @@ -763,6 +775,66 @@ var _ = Describe("Stream", func() { Expect(err).ToNot(HaveOccurred()) }) + Context("deadlines", func() { + It("returns an error when Write is called after the deadline", func() { + str.SetWriteDeadline(time.Now().Add(-time.Second)) + n, err := str.Write([]byte("foobar")) + Expect(err).To(MatchError(errDeadline)) + Expect(n).To(BeZero()) + }) + + It("unblocks after the deadline", func() { + deadline := time.Now().Add(200 * time.Millisecond) + str.SetWriteDeadline(deadline) + n, err := str.Write([]byte("foobar")) + Expect(err).To(MatchError(errDeadline)) + Expect(n).To(BeZero()) + Expect(time.Now()).To(BeTemporally("~", deadline, 50*time.Millisecond)) + }) + + It("doesn't unblock if the deadline is changed before the first one expires", func() { + deadline1 := time.Now().Add(200 * time.Millisecond) + deadline2 := time.Now().Add(400 * time.Millisecond) + str.SetWriteDeadline(deadline1) + go func() { + defer GinkgoRecover() + time.Sleep(50 * time.Millisecond) + str.SetWriteDeadline(deadline2) + // make sure that this was actually execute before the deadline expires + Expect(time.Now()).To(BeTemporally("<", deadline1)) + }() + runtime.Gosched() + n, err := str.Write([]byte("foobar")) + Expect(err).To(MatchError(errDeadline)) + Expect(n).To(BeZero()) + Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond)) + }) + + It("unblocks earlier, when a new deadline is set", func() { + deadline1 := time.Now().Add(1200 * time.Millisecond) + deadline2 := time.Now().Add(300 * time.Millisecond) + go func() { + defer GinkgoRecover() + time.Sleep(50 * time.Millisecond) + str.SetWriteDeadline(deadline2) + // make sure that this was actually execute before the deadline expires + Expect(time.Now()).To(BeTemporally("<", deadline2)) + }() + str.SetWriteDeadline(deadline1) + runtime.Gosched() + _, err := str.Write([]byte("foobar")) + Expect(err).To(MatchError(errDeadline)) + Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond)) + }) + + It("sets a read deadline, when SetDeadline is called", func() { + str.SetDeadline(time.Now().Add(-time.Second)) + n, err := str.Write([]byte("foobar")) + Expect(err).To(MatchError(errDeadline)) + Expect(n).To(BeZero()) + }) + }) + Context("closing", func() { It("sets finishedWriting when calling Close", func() { str.Close() From e09993403dd92018b1a99391dfd86d3d724915a2 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 11 Jun 2017 15:43:52 +0200 Subject: [PATCH 5/7] add stream deadlines to the Stream interface --- Changelog.md | 1 + h2quic/response_writer_test.go | 4 ++++ interface.go | 14 ++++++++++++++ stream.go | 11 ----------- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/Changelog.md b/Changelog.md index 591752b75..0b357ab8e 100644 --- a/Changelog.md +++ b/Changelog.md @@ -8,6 +8,7 @@ - Add a `quic.Config` option to configure the source address validation - Add a `quic.Config` option to configure the handshake timeout - Add a `quic.Config` option to configure keep-alive +- Implement `net.Conn`-style deadlines for streams - Remove the `tls.Config` from the `quic.Config`. The `tls.Config` must now be passed to the `Dial` and `Listen` functions as a separate parameter. See the [Godoc](https://godoc.org/github.com/lucas-clemente/quic-go) for details. - Changed the log level environment variable to only accept strings ("DEBUG", "INFO", "ERROR"), see [the wiki](https://github.com/lucas-clemente/quic-go/wiki/Logging) for more details. - Rename the `h2quic.QuicRoundTripper` to `h2quic.RoundTripper` diff --git a/h2quic/response_writer_test.go b/h2quic/response_writer_test.go index f8217f316..93ebae162 100644 --- a/h2quic/response_writer_test.go +++ b/h2quic/response_writer_test.go @@ -5,6 +5,7 @@ import ( "io" "net/http" "sync" + "time" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" @@ -36,6 +37,9 @@ func (s *mockStream) Close() error { s.closed = true; r func (s *mockStream) Reset(error) { s.reset = true } func (s *mockStream) CloseRemote(offset protocol.ByteCount) { s.remoteClosed = true } func (s mockStream) StreamID() protocol.StreamID { return s.id } +func (s *mockStream) SetDeadline(time.Time) error { panic("not implemented") } +func (s *mockStream) SetReadDeadline(time.Time) error { panic("not implemented") } +func (s *mockStream) SetWriteDeadline(time.Time) error { panic("not implemented") } func (s *mockStream) Read(p []byte) (int, error) { n, _ := s.dataToRead.Read(p) diff --git a/interface.go b/interface.go index 4ba4cb42b..685a1ab79 100644 --- a/interface.go +++ b/interface.go @@ -16,6 +16,20 @@ type Stream interface { StreamID() protocol.StreamID // Reset closes the stream with an error. Reset(error) + // SetReadDeadline sets the deadline for future Read calls and + // any currently-blocked Read call. + // A zero value for t means Read will not time out. + SetReadDeadline(t time.Time) error + // SetWriteDeadline sets the deadline for future Write calls + // and any currently-blocked Write call. + // Even if write times out, it may return n > 0, indicating that + // some of the data was successfully written. + // A zero value for t means Write will not time out. + SetWriteDeadline(t time.Time) error + // SetDeadline sets the read and write deadlines associated + // with the connection. It is equivalent to calling both + // SetReadDeadline and SetWriteDeadline. + SetDeadline(t time.Time) error } // A Session is a QUIC connection between two peers. diff --git a/stream.go b/stream.go index 1348eb9cc..f78f815db 100644 --- a/stream.go +++ b/stream.go @@ -304,9 +304,6 @@ func (s *stream) signalWrite() { } } -// SetReadDeadline sets the deadline for future Read calls and -// any currently-blocked Read call. -// A zero value for t means Read will not time out. func (s *stream) SetReadDeadline(t time.Time) error { s.mutex.Lock() oldDeadline := s.readDeadline @@ -319,11 +316,6 @@ func (s *stream) SetReadDeadline(t time.Time) error { return nil } -// SetWriteDeadline sets the deadline for future Write calls -// and any currently-blocked Write call. -// Even if write times out, it may return n > 0, indicating that -// some of the data was successfully written. -// A zero value for t means Write will not time out. func (s *stream) SetWriteDeadline(t time.Time) error { s.mutex.Lock() oldDeadline := s.writeDeadline @@ -335,9 +327,6 @@ func (s *stream) SetWriteDeadline(t time.Time) error { return nil } -// SetDeadline sets the read and write deadlines associated -// with the connection. It is equivalent to calling both -// SetReadDeadline and SetWriteDeadline. func (s *stream) SetDeadline(t time.Time) error { _ = s.SetReadDeadline(t) // SetReadDeadline never errors _ = s.SetWriteDeadline(t) // SetWriteDeadline never errors From cfc7d1604de082c35fd267ba7d4f81db4ad049b4 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 10 Jul 2017 14:33:58 +0800 Subject: [PATCH 6/7] scale all deadline duration in stream tests on the CIs by 20 This allows us to run the tests faster locally. On the CIs, where the timing is less accurate, the tests take longer, but won't be flaky. --- stream_test.go | 53 +++++++++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/stream_test.go b/stream_test.go index c2a1bb308..105f15f2c 100644 --- a/stream_test.go +++ b/stream_test.go @@ -6,6 +6,8 @@ import ( "runtime" "time" + "os" + "github.com/lucas-clemente/quic-go/frames" "github.com/lucas-clemente/quic-go/internal/mocks/mocks_fc" "github.com/lucas-clemente/quic-go/protocol" @@ -27,6 +29,17 @@ var _ = Describe("Stream", func() { mockFcm *mocks_fc.MockFlowControlManager ) + // in the tests for the stream deadlines we set a deadline + // and wait to make an assertion when Read / Write was unblocked + // on the CIs, the timing is a lot less precise, so scale every duration by this factor + scaleDuration := func(t time.Duration) time.Duration { + scaleFactor := 1 + if os.Getenv("CI") == "true" { + scaleFactor = 20 + } + return time.Duration(scaleFactor) * t + } + onData := func() { onDataCalled = true } @@ -253,22 +266,22 @@ var _ = Describe("Stream", func() { }) It("unblocks after the deadline", func() { - deadline := time.Now().Add(200 * time.Millisecond) + deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) str.SetReadDeadline(deadline) b := make([]byte, 6) n, err := str.Read(b) Expect(err).To(MatchError(errDeadline)) Expect(n).To(BeZero()) - Expect(time.Now()).To(BeTemporally("~", deadline, 50*time.Millisecond)) + Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(10*time.Millisecond))) }) It("doesn't unblock if the deadline is changed before the first one expires", func() { - deadline1 := time.Now().Add(200 * time.Millisecond) - deadline2 := time.Now().Add(400 * time.Millisecond) + deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond)) + deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond)) str.SetReadDeadline(deadline1) go func() { defer GinkgoRecover() - time.Sleep(50 * time.Millisecond) + time.Sleep(scaleDuration(20 * time.Millisecond)) str.SetReadDeadline(deadline2) // make sure that this was actually execute before the deadline expires Expect(time.Now()).To(BeTemporally("<", deadline1)) @@ -278,15 +291,15 @@ var _ = Describe("Stream", func() { n, err := str.Read(b) Expect(err).To(MatchError(errDeadline)) Expect(n).To(BeZero()) - Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond)) + Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond))) }) It("unblocks earlier, when a new deadline is set", func() { - deadline1 := time.Now().Add(1200 * time.Millisecond) - deadline2 := time.Now().Add(300 * time.Millisecond) + deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond)) + deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond)) go func() { defer GinkgoRecover() - time.Sleep(50 * time.Millisecond) + time.Sleep(scaleDuration(10 * time.Millisecond)) str.SetReadDeadline(deadline2) // make sure that this was actually execute before the deadline expires Expect(time.Now()).To(BeTemporally("<", deadline2)) @@ -296,7 +309,7 @@ var _ = Describe("Stream", func() { b := make([]byte, 10) _, err := str.Read(b) Expect(err).To(MatchError(errDeadline)) - Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond)) + Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(25*time.Millisecond))) }) It("sets a read deadline, when SetDeadline is called", func() { @@ -784,21 +797,21 @@ var _ = Describe("Stream", func() { }) It("unblocks after the deadline", func() { - deadline := time.Now().Add(200 * time.Millisecond) + deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) str.SetWriteDeadline(deadline) n, err := str.Write([]byte("foobar")) Expect(err).To(MatchError(errDeadline)) Expect(n).To(BeZero()) - Expect(time.Now()).To(BeTemporally("~", deadline, 50*time.Millisecond)) + Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond))) }) It("doesn't unblock if the deadline is changed before the first one expires", func() { - deadline1 := time.Now().Add(200 * time.Millisecond) - deadline2 := time.Now().Add(400 * time.Millisecond) + deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond)) + deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond)) str.SetWriteDeadline(deadline1) go func() { defer GinkgoRecover() - time.Sleep(50 * time.Millisecond) + time.Sleep(scaleDuration(20 * time.Millisecond)) str.SetWriteDeadline(deadline2) // make sure that this was actually execute before the deadline expires Expect(time.Now()).To(BeTemporally("<", deadline1)) @@ -807,15 +820,15 @@ var _ = Describe("Stream", func() { n, err := str.Write([]byte("foobar")) Expect(err).To(MatchError(errDeadline)) Expect(n).To(BeZero()) - Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond)) + Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond))) }) It("unblocks earlier, when a new deadline is set", func() { - deadline1 := time.Now().Add(1200 * time.Millisecond) - deadline2 := time.Now().Add(300 * time.Millisecond) + deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond)) + deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond)) go func() { defer GinkgoRecover() - time.Sleep(50 * time.Millisecond) + time.Sleep(scaleDuration(10 * time.Millisecond)) str.SetWriteDeadline(deadline2) // make sure that this was actually execute before the deadline expires Expect(time.Now()).To(BeTemporally("<", deadline2)) @@ -824,7 +837,7 @@ var _ = Describe("Stream", func() { runtime.Gosched() _, err := str.Write([]byte("foobar")) Expect(err).To(MatchError(errDeadline)) - Expect(time.Now()).To(BeTemporally("~", deadline2, 50*time.Millisecond)) + Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond))) }) It("sets a read deadline, when SetDeadline is called", func() { From 56155986e94645e398eb86e02150995a5c2c56cc Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 15 Jul 2017 23:48:25 +0700 Subject: [PATCH 7/7] read the CI timescale factor from an environment variable --- .travis.yml | 11 +++++++---- appveyor.yml | 1 + stream_test.go | 6 ++++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index 23faf62c3..94db43c8b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,10 +14,13 @@ go: # setting the GOARCH directly doesn't work, since the value will be overwritten later # so set it to a temporary environment variable first env: - - TRAVIS_GOARCH=amd64 TESTMODE=unit - - TRAVIS_GOARCH=amd64 TESTMODE=integration - - TRAVIS_GOARCH=386 TESTMODE=unit - - TRAVIS_GOARCH=386 TESTMODE=integration + global: + - TIMESCALE_FACTOR=20 + matrix: + - TRAVIS_GOARCH=amd64 TESTMODE=unit + - TRAVIS_GOARCH=amd64 TESTMODE=integration + - TRAVIS_GOARCH=386 TESTMODE=unit + - TRAVIS_GOARCH=386 TESTMODE=integration # second part of the GOARCH workaround # now actually set the GOARCH env variable to the value of the temporary variable set earlier diff --git a/appveyor.yml b/appveyor.yml index 1de758e0f..a013bb3bc 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -5,6 +5,7 @@ os: Windows Server 2012 R2 environment: GOPATH: c:\gopath CGO_ENABLED: 0 + TIMESCALE_FACTOR: 20 matrix: - GOARCH: 386 - GOARCH: amd64 diff --git a/stream_test.go b/stream_test.go index 105f15f2c..0d1bd6e10 100644 --- a/stream_test.go +++ b/stream_test.go @@ -4,6 +4,7 @@ import ( "errors" "io" "runtime" + "strconv" "time" "os" @@ -34,9 +35,10 @@ var _ = Describe("Stream", func() { // on the CIs, the timing is a lot less precise, so scale every duration by this factor scaleDuration := func(t time.Duration) time.Duration { scaleFactor := 1 - if os.Getenv("CI") == "true" { - scaleFactor = 20 + if f, err := strconv.Atoi(os.Getenv("TIMESCALE_FACTOR")); err == nil { // parsing "" errors, so this works fine if the env is not set + scaleFactor = f } + Expect(scaleFactor).ToNot(BeZero()) return time.Duration(scaleFactor) * t }