From 8c5741ae79723009ff27541346b2ae51618ea567 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 13 Dec 2017 16:32:32 +0700 Subject: [PATCH] rename stream.Cancel to stream.CloseForShutdown --- crypto_stream.go | 2 +- internal/mocks/stream.go | 20 ++++++++++---------- session.go | 2 +- session_test.go | 4 ++-- stream.go | 21 +++++++++++---------- stream_test.go | 16 ++++++++-------- streams_map.go | 2 +- streams_map_test.go | 2 +- 8 files changed, 35 insertions(+), 34 deletions(-) diff --git a/crypto_stream.go b/crypto_stream.go index b9263da6f..50d775e4d 100644 --- a/crypto_stream.go +++ b/crypto_stream.go @@ -13,7 +13,7 @@ type cryptoStreamI interface { io.Writer HandleStreamFrame(*wire.StreamFrame) error PopStreamFrame(protocol.ByteCount) *wire.StreamFrame - Cancel(error) + CloseForShutdown(error) HasDataForWriting() bool SetReadOffset(protocol.ByteCount) // methods needed for flow control diff --git a/internal/mocks/stream.go b/internal/mocks/stream.go index 107d9177a..0ebe91949 100644 --- a/internal/mocks/stream.go +++ b/internal/mocks/stream.go @@ -36,16 +36,6 @@ func (_m *MockStreamI) EXPECT() *MockStreamIMockRecorder { return _m.recorder } -// Cancel mocks base method -func (_m *MockStreamI) Cancel(_param0 error) { - _m.ctrl.Call(_m, "Cancel", _param0) -} - -// Cancel indicates an expected call of Cancel -func (_mr *MockStreamIMockRecorder) Cancel(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Cancel", reflect.TypeOf((*MockStreamI)(nil).Cancel), arg0) -} - // Close mocks base method func (_m *MockStreamI) Close() error { ret := _m.ctrl.Call(_m, "Close") @@ -58,6 +48,16 @@ func (_mr *MockStreamIMockRecorder) Close() *gomock.Call { return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "Close", reflect.TypeOf((*MockStreamI)(nil).Close)) } +// CloseForShutdown mocks base method +func (_m *MockStreamI) CloseForShutdown(_param0 error) { + _m.ctrl.Call(_m, "CloseForShutdown", _param0) +} + +// CloseForShutdown indicates an expected call of CloseForShutdown +func (_mr *MockStreamIMockRecorder) CloseForShutdown(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCallWithMethodType(_mr.mock, "CloseForShutdown", reflect.TypeOf((*MockStreamI)(nil).CloseForShutdown), arg0) +} + // Context mocks base method func (_m *MockStreamI) Context() context.Context { ret := _m.ctrl.Call(_m, "Context") diff --git a/session.go b/session.go index 8b6892904..316eafba9 100644 --- a/session.go +++ b/session.go @@ -656,7 +656,7 @@ func (s *session) handleCloseError(closeErr closeError) error { utils.Errorf("Closing session with error: %s", closeErr.err.Error()) } - s.cryptoStream.Cancel(quicErr) + s.cryptoStream.CloseForShutdown(quicErr) s.streamsMap.CloseWithError(quicErr) if closeErr.err == errCloseSessionForNewVersion || closeErr.err == handshake.ErrCloseSessionForRetry { diff --git a/session_test.go b/session_test.go index 62a9e89af..cf1768eaa 100644 --- a/session_test.go +++ b/session_test.go @@ -498,7 +498,7 @@ var _ = Describe("Session", func() { _, err := sess.GetOrOpenStream(5) Expect(err).ToNot(HaveOccurred()) sess.streamsMap.Range(func(s streamI) { - s.(*mocks.MockStreamI).EXPECT().Cancel(gomock.Any()) + s.(*mocks.MockStreamI).EXPECT().CloseForShutdown(gomock.Any()) }) err = sess.handleFrames([]wire.Frame{&wire.ConnectionCloseFrame{ErrorCode: qerr.ProofInvalid, ReasonPhrase: "foobar"}}, protocol.EncryptionUnspecified) Expect(err).NotTo(HaveOccurred()) @@ -1397,7 +1397,7 @@ var _ = Describe("Session", func() { str, err := sess.GetOrOpenStream(9) Expect(err).ToNot(HaveOccurred()) str.Close() - str.(*stream).Cancel(nil) + str.(*stream).CloseForShutdown(nil) Expect(str.(*stream).Finished()).To(BeTrue()) err = sess.streamsMap.DeleteClosedStreams() Expect(err).ToNot(HaveOccurred()) diff --git a/stream.go b/stream.go index 6dacfd3f8..2118a7f1e 100644 --- a/stream.go +++ b/stream.go @@ -21,7 +21,7 @@ type streamI interface { HandleRstStreamFrame(*wire.RstStreamFrame) error PopStreamFrame(maxBytes protocol.ByteCount) *wire.StreamFrame Finished() bool - Cancel(error) + CloseForShutdown(error) // methods needed for flow control GetWindowUpdate() protocol.ByteCount HandleMaxStreamDataFrame(*wire.MaxStreamDataFrame) @@ -51,8 +51,8 @@ type stream struct { // Once set, the errors must not be changed! err error - // cancelled is set when Cancel() is called - cancelled utils.AtomicBool + // closedForShutdown is set when Cancel() is called + closedForShutdown utils.AtomicBool // finishedReading is set once we read a frame with a FinBit finishedReading utils.AtomicBool // finisedWriting is set once Close() is called @@ -113,7 +113,7 @@ func (s *stream) Read(p []byte) (int, error) { s.mutex.Lock() err := s.err s.mutex.Unlock() - if s.cancelled.Get() || s.resetLocally.Get() { + if s.closedForShutdown.Get() || s.resetLocally.Get() { return 0, err } if s.finishedReading.Get() { @@ -133,7 +133,7 @@ func (s *stream) Read(p []byte) (int, error) { var err error for { // Stop waiting on errors - if s.resetLocally.Get() || s.cancelled.Get() { + if s.resetLocally.Get() || s.closedForShutdown.Get() { err = s.err break } @@ -393,11 +393,12 @@ func (s *stream) CloseRemote(offset protocol.ByteCount) { s.HandleStreamFrame(&wire.StreamFrame{FinBit: true, Offset: offset}) } -// Cancel is called by session to indicate that an error occurred -// The stream should will be closed immediately -func (s *stream) Cancel(err error) { +// CloseForShutdown closes a stream abruptly. +// It makes Read and Write unblock (and return the error) immediately. +// The peer will NOT be informed about this: the stream is closed without sending a FIN or RST. +func (s *stream) CloseForShutdown(err error) { s.mutex.Lock() - s.cancelled.Set(true) + s.closedForShutdown.Set(true) s.ctxCancel() // errors must not be changed! if s.err == nil { @@ -465,7 +466,7 @@ func (s *stream) finishedWriteAndSentFin() bool { } func (s *stream) Finished() bool { - return s.cancelled.Get() || + return s.closedForShutdown.Get() || (s.finishedReading.Get() && s.finishedWriteAndSentFin()) || (s.resetRemotely.Get() && s.rstSent.Get()) || (s.finishedReading.Get() && s.rstSent.Get()) || diff --git a/stream_test.go b/stream_test.go index 5bc197cb6..cadd86722 100644 --- a/stream_test.go +++ b/stream_test.go @@ -457,12 +457,12 @@ var _ = Describe("Stream", func() { close(done) }() Consistently(done).ShouldNot(BeClosed()) - str.Cancel(testErr) + str.CloseForShutdown(testErr) Eventually(done).Should(BeClosed()) }) It("errors for all following reads", func() { - str.Cancel(testErr) + str.CloseForShutdown(testErr) b := make([]byte, 1) n, err := strWithTimeout.Read(b) Expect(n).To(BeZero()) @@ -471,7 +471,7 @@ var _ = Describe("Stream", func() { It("cancels the context", func() { Expect(str.Context().Done()).ToNot(BeClosed()) - str.Cancel(testErr) + str.CloseForShutdown(testErr) Expect(str.Context().Done()).To(BeClosed()) }) }) @@ -1001,7 +1001,7 @@ var _ = Describe("Stream", func() { }) It("doesn't allow FIN after an error", func() { - str.Cancel(errors.New("test")) + str.CloseForShutdown(errors.New("test")) f := str.PopStreamFrame(1000) Expect(f).To(BeNil()) }) @@ -1016,11 +1016,11 @@ var _ = Describe("Stream", func() { }) }) - Context("cancelling", func() { + Context("closing abruptly", func() { testErr := errors.New("test") It("returns errors when the stream is cancelled", func() { - str.Cancel(testErr) + str.CloseForShutdown(testErr) n, err := strWithTimeout.Write([]byte("foo")) Expect(n).To(BeZero()) Expect(err).To(MatchError(testErr)) @@ -1037,7 +1037,7 @@ var _ = Describe("Stream", func() { close(done) }() Eventually(func() *wire.StreamFrame { return str.PopStreamFrame(50) }).ShouldNot(BeNil()) // get a STREAM frame containing some data, but not all - str.Cancel(testErr) + str.CloseForShutdown(testErr) Expect(str.PopStreamFrame(1000)).To(BeNil()) Eventually(done).Should(BeClosed()) }) @@ -1068,7 +1068,7 @@ var _ = Describe("Stream", func() { } It("is finished after it is canceled", func() { - str.Cancel(testErr) + str.CloseForShutdown(testErr) Expect(str.Finished()).To(BeTrue()) }) diff --git a/streams_map.go b/streams_map.go index e162205c6..a039e0e41 100644 --- a/streams_map.go +++ b/streams_map.go @@ -317,7 +317,7 @@ func (m *streamsMap) CloseWithError(err error) { m.nextStreamOrErrCond.Broadcast() m.openStreamOrErrCond.Broadcast() for _, s := range m.openStreams { - m.streams[s].Cancel(err) + m.streams[s].CloseForShutdown(err) } } diff --git a/streams_map_test.go b/streams_map_test.go index 99936c998..6abf9a441 100644 --- a/streams_map_test.go +++ b/streams_map_test.go @@ -245,7 +245,7 @@ var _ = Describe("Streams Map", func() { testErr := errors.New("test error") openMaxNumStreams() for _, str := range m.streams { - str.(*mocks.MockStreamI).EXPECT().Cancel(testErr) + str.(*mocks.MockStreamI).EXPECT().CloseForShutdown(testErr) } done := make(chan struct{})