diff --git a/session.go b/session.go index f113dd0b..26072f66 100644 --- a/session.go +++ b/session.go @@ -98,14 +98,8 @@ func newSession(conn connection, v protocol.VersionNumber, connectionID protocol var sentPacketHandler ackhandler.SentPacketHandler var receivedPacketHandler ackhandler.ReceivedPacketHandler - if v <= protocol.Version33 { - stopWaitingManager = ackhandlerlegacy.NewStopWaitingManager().(ackhandler.StopWaitingManager) - sentPacketHandler = ackhandlerlegacy.NewSentPacketHandler(stopWaitingManager).(ackhandler.SentPacketHandler) - receivedPacketHandler = ackhandlerlegacy.NewReceivedPacketHandler().(ackhandler.ReceivedPacketHandler) - } else { - sentPacketHandler = ackhandler.NewSentPacketHandler() - receivedPacketHandler = ackhandler.NewReceivedPacketHandler() - } + sentPacketHandler = ackhandler.NewSentPacketHandler() + receivedPacketHandler = ackhandler.NewReceivedPacketHandler() session := &Session{ connectionID: connectionID, diff --git a/session_test.go b/session_test.go index a456daab..42db7670 100644 --- a/session_test.go +++ b/session_test.go @@ -3,7 +3,6 @@ package quic import ( "bytes" "errors" - "fmt" "io" "net" "reflect" @@ -98,812 +97,794 @@ var _ = Describe("Session", func() { conn *mockConnection ) - for _, versionLoop := range []protocol.VersionNumber{protocol.Version33, protocol.Version34} { - version := versionLoop + BeforeEach(func() { + conn = &mockConnection{} + streamCallbackCalled = false + closeCallbackCalled = false - Context(fmt.Sprintf("with quic version %d", version), func() { + signer, err := crypto.NewProofSource(testdata.GetTLSConfig()) + Expect(err).ToNot(HaveOccurred()) + kex, err := crypto.NewCurve25519KEX() + Expect(err).NotTo(HaveOccurred()) + scfg, err := handshake.NewServerConfig(kex, signer) + Expect(err).NotTo(HaveOccurred()) + pSession, err := newSession( + conn, + protocol.Version35, + 0, + scfg, + func(*Session, utils.Stream) { streamCallbackCalled = true }, + func(protocol.ConnectionID) { closeCallbackCalled = true }, + ) + Expect(err).NotTo(HaveOccurred()) + session = pSession.(*Session) + Expect(session.streamsMap.NumberOfStreams()).To(Equal(1)) // Crypto stream + }) - BeforeEach(func() { - conn = &mockConnection{} - streamCallbackCalled = false - closeCallbackCalled = false - - signer, err := crypto.NewProofSource(testdata.GetTLSConfig()) - Expect(err).ToNot(HaveOccurred()) - kex, err := crypto.NewCurve25519KEX() - Expect(err).NotTo(HaveOccurred()) - scfg, err := handshake.NewServerConfig(kex, signer) - Expect(err).NotTo(HaveOccurred()) - pSession, err := newSession( - conn, - version, - 0, - scfg, - func(*Session, utils.Stream) { streamCallbackCalled = true }, - func(protocol.ConnectionID) { closeCallbackCalled = true }, - ) - Expect(err).NotTo(HaveOccurred()) - session = pSession.(*Session) - Expect(session.streamsMap.NumberOfStreams()).To(Equal(1)) // Crypto stream + Context("when handling stream frames", func() { + It("makes new streams", func() { + session.handleStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte{0xde, 0xca, 0xfb, 0xad}, }) + Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) + Expect(streamCallbackCalled).To(BeTrue()) + p := make([]byte, 4) + str, _ := session.streamsMap.GetOrOpenStream(5) + Expect(str).ToNot(BeNil()) + _, err := str.Read(p) + Expect(err).ToNot(HaveOccurred()) + Expect(p).To(Equal([]byte{0xde, 0xca, 0xfb, 0xad})) + }) - Context("when handling stream frames", func() { - It("makes new streams", func() { - session.handleStreamFrame(&frames.StreamFrame{ - StreamID: 5, - Data: []byte{0xde, 0xca, 0xfb, 0xad}, - }) - Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) - Expect(streamCallbackCalled).To(BeTrue()) - p := make([]byte, 4) - str, _ := session.streamsMap.GetOrOpenStream(5) - Expect(str).ToNot(BeNil()) - _, err := str.Read(p) - Expect(err).ToNot(HaveOccurred()) - Expect(p).To(Equal([]byte{0xde, 0xca, 0xfb, 0xad})) - }) - - It("does not reject existing streams with even StreamIDs", func() { - _, err := session.GetOrOpenStream(5) - Expect(err).ToNot(HaveOccurred()) - err = session.handleStreamFrame(&frames.StreamFrame{ - StreamID: 5, - Data: []byte{0xde, 0xca, 0xfb, 0xad}, - }) - Expect(err).ToNot(HaveOccurred()) - }) - - It("handles existing streams", func() { - session.handleStreamFrame(&frames.StreamFrame{ - StreamID: 5, - Data: []byte{0xde, 0xca}, - }) - Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) - Expect(streamCallbackCalled).To(BeTrue()) - session.handleStreamFrame(&frames.StreamFrame{ - StreamID: 5, - Offset: 2, - Data: []byte{0xfb, 0xad}, - }) - Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) - p := make([]byte, 4) - str, _ := session.streamsMap.GetOrOpenStream(5) - Expect(str).ToNot(BeNil()) - _, err := str.Read(p) - Expect(err).ToNot(HaveOccurred()) - Expect(p).To(Equal([]byte{0xde, 0xca, 0xfb, 0xad})) - }) - - It("does not delete streams with Close()", func() { - str, err := session.GetOrOpenStream(5) - Expect(err).ToNot(HaveOccurred()) - str.Close() - session.garbageCollectStreams() - Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) - str, _ = session.streamsMap.GetOrOpenStream(5) - Expect(str).ToNot(BeNil()) - }) - - It("does not delete streams with FIN bit", func() { - session.handleStreamFrame(&frames.StreamFrame{ - StreamID: 5, - Data: []byte{0xde, 0xca, 0xfb, 0xad}, - FinBit: true, - }) - Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) - str, _ := session.streamsMap.GetOrOpenStream(5) - Expect(str).ToNot(BeNil()) - Expect(streamCallbackCalled).To(BeTrue()) - p := make([]byte, 4) - _, err := str.Read(p) - Expect(err).To(MatchError(io.EOF)) - Expect(p).To(Equal([]byte{0xde, 0xca, 0xfb, 0xad})) - session.garbageCollectStreams() - Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) - str, _ = session.streamsMap.GetOrOpenStream(5) - Expect(str).ToNot(BeNil()) - }) - - It("deletes streams with FIN bit & close", func() { - session.handleStreamFrame(&frames.StreamFrame{ - StreamID: 5, - Data: []byte{0xde, 0xca, 0xfb, 0xad}, - FinBit: true, - }) - Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) - str, _ := session.streamsMap.GetOrOpenStream(5) - Expect(str).ToNot(BeNil()) - Expect(streamCallbackCalled).To(BeTrue()) - p := make([]byte, 4) - _, err := str.Read(p) - Expect(err).To(MatchError(io.EOF)) - Expect(p).To(Equal([]byte{0xde, 0xca, 0xfb, 0xad})) - session.garbageCollectStreams() - Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) - str, _ = session.streamsMap.GetOrOpenStream(5) - Expect(str).ToNot(BeNil()) - // We still need to close the stream locally - str.Close() - // ... and simulate that we actually the FIN - str.sentFin() - session.garbageCollectStreams() - Expect(session.streamsMap.NumberOfStreams()).To(Equal(1)) - str, err = session.streamsMap.GetOrOpenStream(5) - Expect(err).NotTo(HaveOccurred()) - Expect(str).To(BeNil()) - // flow controller should have been notified - _, err = session.flowControlManager.SendWindowSize(5) - Expect(err).To(MatchError("Error accessing the flowController map.")) - }) - - It("closes streams with error", func() { - testErr := errors.New("test") - session.handleStreamFrame(&frames.StreamFrame{ - StreamID: 5, - Data: []byte{0xde, 0xca, 0xfb, 0xad}, - }) - Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) - str, _ := session.streamsMap.GetOrOpenStream(5) - Expect(str).ToNot(BeNil()) - Expect(streamCallbackCalled).To(BeTrue()) - p := make([]byte, 4) - _, err := str.Read(p) - session.closeStreamsWithError(testErr) - _, err = str.Read(p) - Expect(err).To(MatchError(testErr)) - session.garbageCollectStreams() - Expect(session.streamsMap.NumberOfStreams()).To(Equal(1)) - str, err = session.streamsMap.GetOrOpenStream(5) - Expect(err).NotTo(HaveOccurred()) - Expect(str).To(BeNil()) - }) - - It("closes empty streams with error", func() { - testErr := errors.New("test") - session.newStreamImpl(5) - Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) - str, _ := session.streamsMap.GetOrOpenStream(5) - Expect(str).ToNot(BeNil()) - session.closeStreamsWithError(testErr) - _, err := str.Read([]byte{0}) - Expect(err).To(MatchError(testErr)) - session.garbageCollectStreams() - str, err = session.streamsMap.GetOrOpenStream(5) - Expect(err).NotTo(HaveOccurred()) - Expect(str).To(BeNil()) - }) - - It("informs the FlowControlManager about new streams", func() { - // since the stream doesn't yet exist, this will throw an error - err := session.flowControlManager.UpdateHighestReceived(5, 1000) - Expect(err).To(HaveOccurred()) - session.newStreamImpl(5) - err = session.flowControlManager.UpdateHighestReceived(5, 2000) - Expect(err).ToNot(HaveOccurred()) - }) - - It("ignores streams that existed previously", func() { - session.handleStreamFrame(&frames.StreamFrame{ - StreamID: 5, - Data: []byte{}, - FinBit: true, - }) - str, _ := session.streamsMap.GetOrOpenStream(5) - Expect(str).ToNot(BeNil()) - _, err := str.Read([]byte{0}) - Expect(err).To(MatchError(io.EOF)) - str.Close() - str.sentFin() - session.garbageCollectStreams() - err = session.handleStreamFrame(&frames.StreamFrame{ - StreamID: 5, - Data: []byte{}, - }) - Expect(err).To(BeNil()) - }) + It("does not reject existing streams with even StreamIDs", func() { + _, err := session.GetOrOpenStream(5) + Expect(err).ToNot(HaveOccurred()) + err = session.handleStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte{0xde, 0xca, 0xfb, 0xad}, }) + Expect(err).ToNot(HaveOccurred()) + }) - Context("handling RST_STREAM frames", func() { - It("closes the receiving streams for writing and reading", func() { - s, err := session.GetOrOpenStream(5) - Expect(err).ToNot(HaveOccurred()) - err = session.handleRstStreamFrame(&frames.RstStreamFrame{ - StreamID: 5, - ErrorCode: 42, - }) - Expect(err).ToNot(HaveOccurred()) - n, err := s.Write([]byte{0}) - Expect(n).To(BeZero()) - Expect(err).To(MatchError("RST_STREAM received with code 42")) - n, err = s.Read([]byte{0}) - Expect(n).To(BeZero()) - Expect(err).To(MatchError("RST_STREAM received with code 42")) - }) - - It("ignores the error when the stream is not known", func() { - err := session.handleFrames([]frames.Frame{&frames.RstStreamFrame{ - StreamID: 5, - ErrorCode: 42, - }}) - Expect(err).NotTo(HaveOccurred()) - }) + It("handles existing streams", func() { + session.handleStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte{0xde, 0xca}, }) - - Context("handling WINDOW_UPDATE frames", func() { - It("updates the Flow Control Window of a stream", func() { - _, err := session.GetOrOpenStream(5) - Expect(err).ToNot(HaveOccurred()) - err = session.handleWindowUpdateFrame(&frames.WindowUpdateFrame{ - StreamID: 5, - ByteOffset: 100, - }) - Expect(err).ToNot(HaveOccurred()) - Expect(session.flowControlManager.SendWindowSize(5)).To(Equal(protocol.ByteCount(100))) - }) - - It("updates the Flow Control Window of the connection", func() { - err := session.handleWindowUpdateFrame(&frames.WindowUpdateFrame{ - StreamID: 0, - ByteOffset: 0x800000, - }) - Expect(err).ToNot(HaveOccurred()) - }) - - It("opens a new stream when receiving a WINDOW_UPDATE for an unknown stream", func() { - err := session.handleWindowUpdateFrame(&frames.WindowUpdateFrame{ - StreamID: 5, - ByteOffset: 1337, - }) - Expect(err).ToNot(HaveOccurred()) - str, err := session.streamsMap.GetOrOpenStream(5) - Expect(err).NotTo(HaveOccurred()) - Expect(str).ToNot(BeNil()) - }) - - It("errors when receiving a WindowUpdateFrame for a closed stream", func() { - session.handleStreamFrame(&frames.StreamFrame{StreamID: 5}) - err := session.streamsMap.RemoveStream(5) - Expect(err).ToNot(HaveOccurred()) - session.garbageCollectStreams() - err = session.handleWindowUpdateFrame(&frames.WindowUpdateFrame{ - StreamID: 5, - ByteOffset: 1337, - }) - Expect(err).To(MatchError(errWindowUpdateOnClosedStream)) - }) - - It("ignores errors when receiving a WindowUpdateFrame for a closed stream", func() { - session.handleStreamFrame(&frames.StreamFrame{StreamID: 5}) - err := session.streamsMap.RemoveStream(5) - Expect(err).ToNot(HaveOccurred()) - session.garbageCollectStreams() - err = session.handleFrames([]frames.Frame{&frames.WindowUpdateFrame{ - StreamID: 5, - ByteOffset: 1337, - }}) - Expect(err).NotTo(HaveOccurred()) - }) + Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) + Expect(streamCallbackCalled).To(BeTrue()) + session.handleStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Offset: 2, + Data: []byte{0xfb, 0xad}, }) + Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) + p := make([]byte, 4) + str, _ := session.streamsMap.GetOrOpenStream(5) + Expect(str).ToNot(BeNil()) + _, err := str.Read(p) + Expect(err).ToNot(HaveOccurred()) + Expect(p).To(Equal([]byte{0xde, 0xca, 0xfb, 0xad})) + }) - It("handles PING frames", func() { - err := session.handleFrames([]frames.Frame{&frames.PingFrame{}}) - Expect(err).NotTo(HaveOccurred()) + It("does not delete streams with Close()", func() { + str, err := session.GetOrOpenStream(5) + Expect(err).ToNot(HaveOccurred()) + str.Close() + session.garbageCollectStreams() + Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) + str, _ = session.streamsMap.GetOrOpenStream(5) + Expect(str).ToNot(BeNil()) + }) + + It("does not delete streams with FIN bit", func() { + session.handleStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte{0xde, 0xca, 0xfb, 0xad}, + FinBit: true, }) + Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) + str, _ := session.streamsMap.GetOrOpenStream(5) + Expect(str).ToNot(BeNil()) + Expect(streamCallbackCalled).To(BeTrue()) + p := make([]byte, 4) + _, err := str.Read(p) + Expect(err).To(MatchError(io.EOF)) + Expect(p).To(Equal([]byte{0xde, 0xca, 0xfb, 0xad})) + session.garbageCollectStreams() + Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) + str, _ = session.streamsMap.GetOrOpenStream(5) + Expect(str).ToNot(BeNil()) + }) - It("handles BLOCKED frames", func() { - err := session.handleFrames([]frames.Frame{&frames.BlockedFrame{}}) - Expect(err).NotTo(HaveOccurred()) + It("deletes streams with FIN bit & close", func() { + session.handleStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte{0xde, 0xca, 0xfb, 0xad}, + FinBit: true, }) + Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) + str, _ := session.streamsMap.GetOrOpenStream(5) + Expect(str).ToNot(BeNil()) + Expect(streamCallbackCalled).To(BeTrue()) + p := make([]byte, 4) + _, err := str.Read(p) + Expect(err).To(MatchError(io.EOF)) + Expect(p).To(Equal([]byte{0xde, 0xca, 0xfb, 0xad})) + session.garbageCollectStreams() + Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) + str, _ = session.streamsMap.GetOrOpenStream(5) + Expect(str).ToNot(BeNil()) + // We still need to close the stream locally + str.Close() + // ... and simulate that we actually the FIN + str.sentFin() + session.garbageCollectStreams() + Expect(session.streamsMap.NumberOfStreams()).To(Equal(1)) + str, err = session.streamsMap.GetOrOpenStream(5) + Expect(err).NotTo(HaveOccurred()) + Expect(str).To(BeNil()) + // flow controller should have been notified + _, err = session.flowControlManager.SendWindowSize(5) + Expect(err).To(MatchError("Error accessing the flowController map.")) + }) - It("errors on GOAWAY frames", func() { - err := session.handleFrames([]frames.Frame{&frames.GoawayFrame{}}) - Expect(err).To(MatchError("unimplemented: handling GOAWAY frames")) + It("closes streams with error", func() { + testErr := errors.New("test") + session.handleStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte{0xde, 0xca, 0xfb, 0xad}, }) + Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) + str, _ := session.streamsMap.GetOrOpenStream(5) + Expect(str).ToNot(BeNil()) + Expect(streamCallbackCalled).To(BeTrue()) + p := make([]byte, 4) + _, err := str.Read(p) + session.closeStreamsWithError(testErr) + _, err = str.Read(p) + Expect(err).To(MatchError(testErr)) + session.garbageCollectStreams() + Expect(session.streamsMap.NumberOfStreams()).To(Equal(1)) + str, err = session.streamsMap.GetOrOpenStream(5) + Expect(err).NotTo(HaveOccurred()) + Expect(str).To(BeNil()) + }) - It("handles STOP_WAITING frames", func() { - err := session.handleFrames([]frames.Frame{&frames.StopWaitingFrame{LeastUnacked: 10}}) - Expect(err).NotTo(HaveOccurred()) + It("closes empty streams with error", func() { + testErr := errors.New("test") + session.newStreamImpl(5) + Expect(session.streamsMap.NumberOfStreams()).To(Equal(2)) + str, _ := session.streamsMap.GetOrOpenStream(5) + Expect(str).ToNot(BeNil()) + session.closeStreamsWithError(testErr) + _, err := str.Read([]byte{0}) + Expect(err).To(MatchError(testErr)) + session.garbageCollectStreams() + str, err = session.streamsMap.GetOrOpenStream(5) + Expect(err).NotTo(HaveOccurred()) + Expect(str).To(BeNil()) + }) + + It("informs the FlowControlManager about new streams", func() { + // since the stream doesn't yet exist, this will throw an error + err := session.flowControlManager.UpdateHighestReceived(5, 1000) + Expect(err).To(HaveOccurred()) + session.newStreamImpl(5) + err = session.flowControlManager.UpdateHighestReceived(5, 2000) + Expect(err).ToNot(HaveOccurred()) + }) + + It("ignores streams that existed previously", func() { + session.handleStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte{}, + FinBit: true, }) - - It("handles CONNECTION_CLOSE frames", func() { - str, _ := session.GetOrOpenStream(5) - err := session.handleFrames([]frames.Frame{&frames.ConnectionCloseFrame{ErrorCode: 42, ReasonPhrase: "foobar"}}) - Expect(err).NotTo(HaveOccurred()) - _, err = str.Read([]byte{0}) - Expect(err).To(MatchError(qerr.Error(42, "foobar"))) + str, _ := session.streamsMap.GetOrOpenStream(5) + Expect(str).ToNot(BeNil()) + _, err := str.Read([]byte{0}) + Expect(err).To(MatchError(io.EOF)) + str.Close() + str.sentFin() + session.garbageCollectStreams() + err = session.handleStreamFrame(&frames.StreamFrame{ + StreamID: 5, + Data: []byte{}, }) + Expect(err).To(BeNil()) + }) + }) - Context("closing", func() { - var ( - nGoRoutinesBefore int - ) - - BeforeEach(func() { - time.Sleep(10 * time.Millisecond) // Wait for old goroutines to finish - nGoRoutinesBefore = runtime.NumGoroutine() - go session.run() - Eventually(func() int { return runtime.NumGoroutine() }).Should(Equal(nGoRoutinesBefore + 2)) - }) - - It("shuts down without error", func() { - session.Close(nil) - Expect(closeCallbackCalled).To(BeTrue()) - Eventually(func() int { return runtime.NumGoroutine() }).Should(Equal(nGoRoutinesBefore)) - Expect(conn.written).To(HaveLen(1)) - Expect(conn.written[0][len(conn.written[0])-7:]).To(Equal([]byte{0x02, byte(qerr.PeerGoingAway), 0, 0, 0, 0, 0})) - }) - - It("only closes once", func() { - session.Close(nil) - session.Close(nil) - Eventually(func() int { return runtime.NumGoroutine() }).Should(Equal(nGoRoutinesBefore)) - Expect(conn.written).To(HaveLen(1)) - }) - - It("closes streams with proper error", func() { - testErr := errors.New("test error") - s, err := session.GetOrOpenStream(5) - Expect(err).NotTo(HaveOccurred()) - session.Close(testErr) - Expect(closeCallbackCalled).To(BeTrue()) - Eventually(func() int { return runtime.NumGoroutine() }).Should(Equal(nGoRoutinesBefore)) - n, err := s.Read([]byte{0}) - Expect(n).To(BeZero()) - Expect(err.Error()).To(ContainSubstring(testErr.Error())) - n, err = s.Write([]byte{0}) - Expect(n).To(BeZero()) - Expect(err.Error()).To(ContainSubstring(testErr.Error())) - }) + Context("handling RST_STREAM frames", func() { + It("closes the receiving streams for writing and reading", func() { + s, err := session.GetOrOpenStream(5) + Expect(err).ToNot(HaveOccurred()) + err = session.handleRstStreamFrame(&frames.RstStreamFrame{ + StreamID: 5, + ErrorCode: 42, }) + Expect(err).ToNot(HaveOccurred()) + n, err := s.Write([]byte{0}) + Expect(n).To(BeZero()) + Expect(err).To(MatchError("RST_STREAM received with code 42")) + n, err = s.Read([]byte{0}) + Expect(n).To(BeZero()) + Expect(err).To(MatchError("RST_STREAM received with code 42")) + }) - Context("receiving packets", func() { - var hdr *PublicHeader + It("ignores the error when the stream is not known", func() { + err := session.handleFrames([]frames.Frame{&frames.RstStreamFrame{ + StreamID: 5, + ErrorCode: 42, + }}) + Expect(err).NotTo(HaveOccurred()) + }) + }) - BeforeEach(func() { - session.unpacker = &mockUnpacker{} - hdr = &PublicHeader{PacketNumberLen: protocol.PacketNumberLen6} - }) - - It("sets the {last,largest}RcvdPacketNumber", func() { - hdr.PacketNumber = 5 - err := session.handlePacketImpl(nil, hdr, nil) - Expect(err).ToNot(HaveOccurred()) - Expect(session.lastRcvdPacketNumber).To(Equal(protocol.PacketNumber(5))) - Expect(session.largestRcvdPacketNumber).To(Equal(protocol.PacketNumber(5))) - }) - - It("sets the {last,largest}RcvdPacketNumber, for an out-of-order packet", func() { - hdr.PacketNumber = 5 - err := session.handlePacketImpl(nil, hdr, nil) - Expect(err).ToNot(HaveOccurred()) - Expect(session.lastRcvdPacketNumber).To(Equal(protocol.PacketNumber(5))) - Expect(session.largestRcvdPacketNumber).To(Equal(protocol.PacketNumber(5))) - hdr.PacketNumber = 3 - err = session.handlePacketImpl(nil, hdr, nil) - Expect(err).ToNot(HaveOccurred()) - Expect(session.lastRcvdPacketNumber).To(Equal(protocol.PacketNumber(3))) - Expect(session.largestRcvdPacketNumber).To(Equal(protocol.PacketNumber(5))) - }) - - It("ignores duplicate packets", func() { - hdr.PacketNumber = 5 - err := session.handlePacketImpl(nil, hdr, nil) - Expect(err).ToNot(HaveOccurred()) - err = session.handlePacketImpl(nil, hdr, nil) - Expect(err).ToNot(HaveOccurred()) - }) - - It("ignores packets smaller than the highest LeastUnacked of a StopWaiting", func() { - err := session.receivedPacketHandler.ReceivedStopWaiting(&frames.StopWaitingFrame{LeastUnacked: 10}) - Expect(err).ToNot(HaveOccurred()) - hdr.PacketNumber = 5 - err = session.handlePacketImpl(nil, hdr, nil) - Expect(err).ToNot(HaveOccurred()) - }) + Context("handling WINDOW_UPDATE frames", func() { + It("updates the Flow Control Window of a stream", func() { + _, err := session.GetOrOpenStream(5) + Expect(err).ToNot(HaveOccurred()) + err = session.handleWindowUpdateFrame(&frames.WindowUpdateFrame{ + StreamID: 5, + ByteOffset: 100, }) + Expect(err).ToNot(HaveOccurred()) + Expect(session.flowControlManager.SendWindowSize(5)).To(Equal(protocol.ByteCount(100))) + }) - Context("sending packets", func() { - It("sends ack frames", func() { - packetNumber := protocol.PacketNumber(0x035E) - session.receivedPacketHandler.ReceivedPacket(packetNumber, true) - err := session.sendPacket() - Expect(err).NotTo(HaveOccurred()) - Expect(conn.written).To(HaveLen(1)) - // test for the beginning of an ACK frame: Entropy until LargestObserved - Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x5E, 0x03}))) - }) - - It("sends two WindowUpdate frames", func() { - _, err := session.GetOrOpenStream(5) - Expect(err).ToNot(HaveOccurred()) - session.flowControlManager.AddBytesRead(5, protocol.ReceiveStreamFlowControlWindow) - err = session.sendPacket() - Expect(err).NotTo(HaveOccurred()) - err = session.sendPacket() - Expect(err).NotTo(HaveOccurred()) - err = session.sendPacket() - Expect(err).NotTo(HaveOccurred()) - Expect(conn.written).To(HaveLen(2)) - Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x04, 0x05, 0, 0, 0}))) - Expect(conn.written[1]).To(ContainSubstring(string([]byte{0x04, 0x05, 0, 0, 0}))) - }) - - It("sends public reset", func() { - err := session.sendPublicReset(1) - Expect(err).NotTo(HaveOccurred()) - Expect(conn.written).To(HaveLen(1)) - Expect(conn.written[0]).To(ContainSubstring(string([]byte("PRST")))) - }) + It("updates the Flow Control Window of the connection", func() { + err := session.handleWindowUpdateFrame(&frames.WindowUpdateFrame{ + StreamID: 0, + ByteOffset: 0x800000, }) + Expect(err).ToNot(HaveOccurred()) + }) - Context("retransmissions", func() { - It("sends a StreamFrame from a packet queued for retransmission", func() { - // a StopWaitingFrame is added, so make sure the packet number of the new package is higher than the packet number of the retransmitted packet - session.packer.lastPacketNumber = 0x1337 + 10 - if session.version > protocol.Version33 { - session.packer.packetNumberGenerator.next = 0x1337 + 9 - } - - f := frames.StreamFrame{ - StreamID: 0x5, - Data: []byte("foobar1234567"), - } - p := ackhandlerlegacy.Packet{ - PacketNumber: 0x1337, - Frames: []frames.Frame{&f}, - } - sph := newMockSentPacketHandler() - sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandlerlegacy.Packet{&p} - session.sentPacketHandler = sph - - err := session.sendPacket() - Expect(err).NotTo(HaveOccurred()) - Expect(conn.written).To(HaveLen(1)) - if session.version > protocol.Version33 { // before version 34, this was handled by the StopWaitingManager - Expect(sph.(*mockSentPacketHandler).requestedStopWaiting).To(BeTrue()) - } - Expect(conn.written[0]).To(ContainSubstring("foobar1234567")) - }) - - It("sends a StreamFrame from a packet queued for retransmission", func() { - // a StopWaitingFrame is added, so make sure the packet number of the new package is higher than the packet number of the retransmitted packet - session.packer.lastPacketNumber = 0x1337 + 10 - if session.version > protocol.Version33 { - session.packer.packetNumberGenerator.next = 0x1337 + 9 - } - - f1 := frames.StreamFrame{ - StreamID: 0x5, - Data: []byte("foobar"), - } - f2 := frames.StreamFrame{ - StreamID: 0x7, - Data: []byte("loremipsum"), - } - p1 := ackhandlerlegacy.Packet{ - PacketNumber: 0x1337, - Frames: []frames.Frame{&f1}, - } - p2 := ackhandlerlegacy.Packet{ - PacketNumber: 0x1338, - Frames: []frames.Frame{&f2}, - } - sph := newMockSentPacketHandler() - sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandlerlegacy.Packet{&p1, &p2} - session.sentPacketHandler = sph - - err := session.sendPacket() - Expect(err).NotTo(HaveOccurred()) - Expect(conn.written).To(HaveLen(1)) - Expect(conn.written[0]).To(ContainSubstring("foobar")) - Expect(conn.written[0]).To(ContainSubstring("loremipsum")) - }) - - // this test is not necessary before QUIC 34, since the legacy StopWaitingManager repeats StopWaitingFrames with every packet until the client ACKs the receipt of any of them - if version > protocol.Version33 { - It("always attaches a StopWaiting to a packet that contains a retransmission", func() { - // make sure the packet number of the new package is higher than the packet number of the retransmitted packet - session.packer.packetNumberGenerator.next = 0x1337 + 9 - - f := &frames.StreamFrame{ - StreamID: 0x5, - Data: bytes.Repeat([]byte{'f'}, int(1.5*float32(protocol.MaxPacketSize))), - } - session.streamFramer.AddFrameForRetransmission(f) - - sph := newMockSentPacketHandler() - session.sentPacketHandler = sph - - err := session.sendPacket() - Expect(err).NotTo(HaveOccurred()) - Expect(conn.written).To(HaveLen(2)) - sentPackets := sph.(*mockSentPacketHandler).sentPackets - Expect(sentPackets).To(HaveLen(2)) - _, ok := sentPackets[0].Frames[0].(*frames.StopWaitingFrame) - Expect(ok).To(BeTrue()) - _, ok = sentPackets[1].Frames[0].(*frames.StopWaitingFrame) - Expect(ok).To(BeTrue()) - }) - } - - It("calls MaybeQueueRTOs even if congestion blocked, so that bytesInFlight is updated", func() { - sph := newMockSentPacketHandler() - sph.(*mockSentPacketHandler).congestionLimited = true - session.sentPacketHandler = sph - err := session.sendPacket() - Expect(err).NotTo(HaveOccurred()) - Expect(sph.(*mockSentPacketHandler).maybeQueueRTOsCalled).To(BeTrue()) - }) + It("opens a new stream when receiving a WINDOW_UPDATE for an unknown stream", func() { + err := session.handleWindowUpdateFrame(&frames.WindowUpdateFrame{ + StreamID: 5, + ByteOffset: 1337, }) + Expect(err).ToNot(HaveOccurred()) + str, err := session.streamsMap.GetOrOpenStream(5) + Expect(err).NotTo(HaveOccurred()) + Expect(str).ToNot(BeNil()) + }) - Context("scheduling sending", func() { - It("sends after writing to a stream", func(done Done) { - Expect(session.sendingScheduled).NotTo(Receive()) - s, err := session.GetOrOpenStream(3) - Expect(err).NotTo(HaveOccurred()) - go func() { - s.Write([]byte("foobar")) - close(done) - }() - Eventually(session.sendingScheduled).Should(Receive()) - s.(*stream).getDataForWriting(1000) // unblock - }) - - Context("bundling of small packets", func() { - It("bundles two small frames of different streams into one packet", func() { - s1, err := session.GetOrOpenStream(5) - Expect(err).NotTo(HaveOccurred()) - s2, err := session.GetOrOpenStream(7) - Expect(err).NotTo(HaveOccurred()) - - // Put data directly into the streams - s1.(*stream).dataForWriting = []byte("foobar1") - s2.(*stream).dataForWriting = []byte("foobar2") - - session.scheduleSending() - go session.run() - - Eventually(func() [][]byte { return conn.written }).Should(HaveLen(1)) - Expect(conn.written[0]).To(ContainSubstring("foobar1")) - Expect(conn.written[0]).To(ContainSubstring("foobar2")) - }) - - It("sends out two big frames in two packets", func() { - s1, err := session.GetOrOpenStream(5) - Expect(err).NotTo(HaveOccurred()) - s2, err := session.GetOrOpenStream(7) - Expect(err).NotTo(HaveOccurred()) - go session.run() - go func() { - defer GinkgoRecover() - _, err2 := s1.Write(bytes.Repeat([]byte{'e'}, 1000)) - Expect(err2).ToNot(HaveOccurred()) - }() - _, err = s2.Write(bytes.Repeat([]byte{'e'}, 1000)) - Expect(err).ToNot(HaveOccurred()) - Eventually(func() [][]byte { return conn.written }).Should(HaveLen(2)) - }) - - It("sends out two small frames that are written to long after one another into two packets", func() { - s, err := session.GetOrOpenStream(5) - Expect(err).NotTo(HaveOccurred()) - go session.run() - _, err = s.Write([]byte("foobar1")) - Expect(err).NotTo(HaveOccurred()) - Eventually(func() [][]byte { return conn.written }).Should(HaveLen(1)) - _, err = s.Write([]byte("foobar2")) - Expect(err).NotTo(HaveOccurred()) - Eventually(func() [][]byte { return conn.written }).Should(HaveLen(2)) - }) - - It("sends a queued ACK frame only once", func() { - packetNumber := protocol.PacketNumber(0x1337) - session.receivedPacketHandler.ReceivedPacket(packetNumber, true) - - s, err := session.GetOrOpenStream(5) - Expect(err).NotTo(HaveOccurred()) - go session.run() - _, err = s.Write([]byte("foobar1")) - Expect(err).NotTo(HaveOccurred()) - Eventually(func() [][]byte { return conn.written }).Should(HaveLen(1)) - _, err = s.Write([]byte("foobar2")) - Expect(err).NotTo(HaveOccurred()) - - Eventually(func() [][]byte { return conn.written }).Should(HaveLen(2)) - Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x37, 0x13}))) - Expect(conn.written[1]).ToNot(ContainSubstring(string([]byte{0x37, 0x13}))) - }) - }) + It("errors when receiving a WindowUpdateFrame for a closed stream", func() { + session.handleStreamFrame(&frames.StreamFrame{StreamID: 5}) + err := session.streamsMap.RemoveStream(5) + Expect(err).ToNot(HaveOccurred()) + session.garbageCollectStreams() + err = session.handleWindowUpdateFrame(&frames.WindowUpdateFrame{ + StreamID: 5, + ByteOffset: 1337, }) + Expect(err).To(MatchError(errWindowUpdateOnClosedStream)) + }) - It("closes when crypto stream errors", func() { - go session.run() - s, err := session.GetOrOpenStream(3) - Expect(err).NotTo(HaveOccurred()) - err = session.handleStreamFrame(&frames.StreamFrame{ - StreamID: 1, - Data: []byte("4242\x00\x00\x00\x00"), - }) - Expect(err).NotTo(HaveOccurred()) - Eventually(func() bool { return atomic.LoadUint32(&session.closed) != 0 }).Should(BeTrue()) - _, err = s.Write([]byte{}) - Expect(err.(*qerr.QuicError).ErrorCode).To(Equal(qerr.InvalidCryptoMessageType)) - }) + It("ignores errors when receiving a WindowUpdateFrame for a closed stream", func() { + session.handleStreamFrame(&frames.StreamFrame{StreamID: 5}) + err := session.streamsMap.RemoveStream(5) + Expect(err).ToNot(HaveOccurred()) + session.garbageCollectStreams() + err = session.handleFrames([]frames.Frame{&frames.WindowUpdateFrame{ + StreamID: 5, + ByteOffset: 1337, + }}) + Expect(err).NotTo(HaveOccurred()) + }) + }) - It("sends public reset after too many undecryptable packets", func() { - // Write protocol.MaxUndecryptablePackets and expect a public reset to happen - for i := 0; i < protocol.MaxUndecryptablePackets; i++ { - hdr := &PublicHeader{ - PacketNumber: protocol.PacketNumber(i + 1), - } - session.handlePacket(nil, hdr, []byte("foobar")) - } - session.run() + It("handles PING frames", func() { + err := session.handleFrames([]frames.Frame{&frames.PingFrame{}}) + Expect(err).NotTo(HaveOccurred()) + }) - Expect(conn.written).To(HaveLen(1)) - Expect(conn.written[0]).To(ContainSubstring(string([]byte("PRST")))) - }) + It("handles BLOCKED frames", func() { + err := session.handleFrames([]frames.Frame{&frames.BlockedFrame{}}) + Expect(err).NotTo(HaveOccurred()) + }) - It("ignores undecryptable packets after the handshake is complete", func() { - *(*bool)(unsafe.Pointer(reflect.ValueOf(session.cryptoSetup).Elem().FieldByName("receivedForwardSecurePacket").UnsafeAddr())) = true - for i := 0; i < protocol.MaxUndecryptablePackets; i++ { - hdr := &PublicHeader{ - PacketNumber: protocol.PacketNumber(i + 1), - } - session.handlePacket(nil, hdr, []byte("foobar")) - } - go session.run() - Consistently(session.undecryptablePackets).Should(HaveLen(0)) - session.closeImpl(nil, true) - }) + It("errors on GOAWAY frames", func() { + err := session.handleFrames([]frames.Frame{&frames.GoawayFrame{}}) + Expect(err).To(MatchError("unimplemented: handling GOAWAY frames")) + }) - It("unqueues undecryptable packets for later decryption", func() { - session.undecryptablePackets = []receivedPacket{{ - nil, - &PublicHeader{PacketNumber: protocol.PacketNumber(42)}, - nil, - }} - Expect(session.receivedPackets).NotTo(Receive()) - session.tryDecryptingQueuedPackets() - Expect(session.undecryptablePackets).To(BeEmpty()) - Expect(session.receivedPackets).To(Receive()) - }) + It("handles STOP_WAITING frames", func() { + err := session.handleFrames([]frames.Frame{&frames.StopWaitingFrame{LeastUnacked: 10}}) + Expect(err).NotTo(HaveOccurred()) + }) - It("times out", func(done Done) { - session.connectionParametersManager.SetFromMap(map[handshake.Tag][]byte{ - handshake.TagICSL: {0, 0, 0, 0}, - }) - session.packer.connectionParametersManager = session.connectionParametersManager - session.run() // Would normally not return - Expect(conn.written[0]).To(ContainSubstring("No recent network activity.")) + It("handles CONNECTION_CLOSE frames", func() { + str, _ := session.GetOrOpenStream(5) + err := session.handleFrames([]frames.Frame{&frames.ConnectionCloseFrame{ErrorCode: 42, ReasonPhrase: "foobar"}}) + Expect(err).NotTo(HaveOccurred()) + _, err = str.Read([]byte{0}) + Expect(err).To(MatchError(qerr.Error(42, "foobar"))) + }) + + Context("closing", func() { + var ( + nGoRoutinesBefore int + ) + + BeforeEach(func() { + time.Sleep(10 * time.Millisecond) // Wait for old goroutines to finish + nGoRoutinesBefore = runtime.NumGoroutine() + go session.run() + Eventually(func() int { return runtime.NumGoroutine() }).Should(Equal(nGoRoutinesBefore + 2)) + }) + + It("shuts down without error", func() { + session.Close(nil) + Expect(closeCallbackCalled).To(BeTrue()) + Eventually(func() int { return runtime.NumGoroutine() }).Should(Equal(nGoRoutinesBefore)) + Expect(conn.written).To(HaveLen(1)) + Expect(conn.written[0][len(conn.written[0])-7:]).To(Equal([]byte{0x02, byte(qerr.PeerGoingAway), 0, 0, 0, 0, 0})) + }) + + It("only closes once", func() { + session.Close(nil) + session.Close(nil) + Eventually(func() int { return runtime.NumGoroutine() }).Should(Equal(nGoRoutinesBefore)) + Expect(conn.written).To(HaveLen(1)) + }) + + It("closes streams with proper error", func() { + testErr := errors.New("test error") + s, err := session.GetOrOpenStream(5) + Expect(err).NotTo(HaveOccurred()) + session.Close(testErr) + Expect(closeCallbackCalled).To(BeTrue()) + Eventually(func() int { return runtime.NumGoroutine() }).Should(Equal(nGoRoutinesBefore)) + n, err := s.Read([]byte{0}) + Expect(n).To(BeZero()) + Expect(err.Error()).To(ContainSubstring(testErr.Error())) + n, err = s.Write([]byte{0}) + Expect(n).To(BeZero()) + Expect(err.Error()).To(ContainSubstring(testErr.Error())) + }) + }) + + Context("receiving packets", func() { + var hdr *PublicHeader + + BeforeEach(func() { + session.unpacker = &mockUnpacker{} + hdr = &PublicHeader{PacketNumberLen: protocol.PacketNumberLen6} + }) + + It("sets the {last,largest}RcvdPacketNumber", func() { + hdr.PacketNumber = 5 + err := session.handlePacketImpl(nil, hdr, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(session.lastRcvdPacketNumber).To(Equal(protocol.PacketNumber(5))) + Expect(session.largestRcvdPacketNumber).To(Equal(protocol.PacketNumber(5))) + }) + + It("sets the {last,largest}RcvdPacketNumber, for an out-of-order packet", func() { + hdr.PacketNumber = 5 + err := session.handlePacketImpl(nil, hdr, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(session.lastRcvdPacketNumber).To(Equal(protocol.PacketNumber(5))) + Expect(session.largestRcvdPacketNumber).To(Equal(protocol.PacketNumber(5))) + hdr.PacketNumber = 3 + err = session.handlePacketImpl(nil, hdr, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(session.lastRcvdPacketNumber).To(Equal(protocol.PacketNumber(3))) + Expect(session.largestRcvdPacketNumber).To(Equal(protocol.PacketNumber(5))) + }) + + It("ignores duplicate packets", func() { + hdr.PacketNumber = 5 + err := session.handlePacketImpl(nil, hdr, nil) + Expect(err).ToNot(HaveOccurred()) + err = session.handlePacketImpl(nil, hdr, nil) + Expect(err).ToNot(HaveOccurred()) + }) + + It("ignores packets smaller than the highest LeastUnacked of a StopWaiting", func() { + err := session.receivedPacketHandler.ReceivedStopWaiting(&frames.StopWaitingFrame{LeastUnacked: 10}) + Expect(err).ToNot(HaveOccurred()) + hdr.PacketNumber = 5 + err = session.handlePacketImpl(nil, hdr, nil) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Context("sending packets", func() { + It("sends ack frames", func() { + packetNumber := protocol.PacketNumber(0x035E) + session.receivedPacketHandler.ReceivedPacket(packetNumber, true) + err := session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(conn.written).To(HaveLen(1)) + // test for the beginning of an ACK frame: Entropy until LargestObserved + Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x5E, 0x03}))) + }) + + It("sends two WindowUpdate frames", func() { + _, err := session.GetOrOpenStream(5) + Expect(err).ToNot(HaveOccurred()) + session.flowControlManager.AddBytesRead(5, protocol.ReceiveStreamFlowControlWindow) + err = session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + err = session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + err = session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(conn.written).To(HaveLen(2)) + Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x04, 0x05, 0, 0, 0}))) + Expect(conn.written[1]).To(ContainSubstring(string([]byte{0x04, 0x05, 0, 0, 0}))) + }) + + It("sends public reset", func() { + err := session.sendPublicReset(1) + Expect(err).NotTo(HaveOccurred()) + Expect(conn.written).To(HaveLen(1)) + Expect(conn.written[0]).To(ContainSubstring(string([]byte("PRST")))) + }) + }) + + Context("retransmissions", func() { + It("sends a StreamFrame from a packet queued for retransmission", func() { + // a StopWaitingFrame is added, so make sure the packet number of the new package is higher than the packet number of the retransmitted packet + session.packer.lastPacketNumber = 0x1337 + 10 + if session.version > protocol.Version33 { + session.packer.packetNumberGenerator.next = 0x1337 + 9 + } + + f := frames.StreamFrame{ + StreamID: 0x5, + Data: []byte("foobar1234567"), + } + p := ackhandlerlegacy.Packet{ + PacketNumber: 0x1337, + Frames: []frames.Frame{&f}, + } + sph := newMockSentPacketHandler() + sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandlerlegacy.Packet{&p} + session.sentPacketHandler = sph + + err := session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(conn.written).To(HaveLen(1)) + if session.version > protocol.Version33 { // before version 34, this was handled by the StopWaitingManager + Expect(sph.(*mockSentPacketHandler).requestedStopWaiting).To(BeTrue()) + } + Expect(conn.written[0]).To(ContainSubstring("foobar1234567")) + }) + + It("sends a StreamFrame from a packet queued for retransmission", func() { + // a StopWaitingFrame is added, so make sure the packet number of the new package is higher than the packet number of the retransmitted packet + session.packer.lastPacketNumber = 0x1337 + 10 + if session.version > protocol.Version33 { + session.packer.packetNumberGenerator.next = 0x1337 + 9 + } + + f1 := frames.StreamFrame{ + StreamID: 0x5, + Data: []byte("foobar"), + } + f2 := frames.StreamFrame{ + StreamID: 0x7, + Data: []byte("loremipsum"), + } + p1 := ackhandlerlegacy.Packet{ + PacketNumber: 0x1337, + Frames: []frames.Frame{&f1}, + } + p2 := ackhandlerlegacy.Packet{ + PacketNumber: 0x1338, + Frames: []frames.Frame{&f2}, + } + sph := newMockSentPacketHandler() + sph.(*mockSentPacketHandler).retransmissionQueue = []*ackhandlerlegacy.Packet{&p1, &p2} + session.sentPacketHandler = sph + + err := session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(conn.written).To(HaveLen(1)) + Expect(conn.written[0]).To(ContainSubstring("foobar")) + Expect(conn.written[0]).To(ContainSubstring("loremipsum")) + }) + + It("always attaches a StopWaiting to a packet that contains a retransmission", func() { + // make sure the packet number of the new package is higher than the packet number of the retransmitted packet + session.packer.packetNumberGenerator.next = 0x1337 + 9 + + f := &frames.StreamFrame{ + StreamID: 0x5, + Data: bytes.Repeat([]byte{'f'}, int(1.5*float32(protocol.MaxPacketSize))), + } + session.streamFramer.AddFrameForRetransmission(f) + + sph := newMockSentPacketHandler() + session.sentPacketHandler = sph + + err := session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(conn.written).To(HaveLen(2)) + sentPackets := sph.(*mockSentPacketHandler).sentPackets + Expect(sentPackets).To(HaveLen(2)) + _, ok := sentPackets[0].Frames[0].(*frames.StopWaitingFrame) + Expect(ok).To(BeTrue()) + _, ok = sentPackets[1].Frames[0].(*frames.StopWaitingFrame) + Expect(ok).To(BeTrue()) + }) + + It("calls MaybeQueueRTOs even if congestion blocked, so that bytesInFlight is updated", func() { + sph := newMockSentPacketHandler() + sph.(*mockSentPacketHandler).congestionLimited = true + session.sentPacketHandler = sph + err := session.sendPacket() + Expect(err).NotTo(HaveOccurred()) + Expect(sph.(*mockSentPacketHandler).maybeQueueRTOsCalled).To(BeTrue()) + }) + }) + + Context("scheduling sending", func() { + It("sends after writing to a stream", func(done Done) { + Expect(session.sendingScheduled).NotTo(Receive()) + s, err := session.GetOrOpenStream(3) + Expect(err).NotTo(HaveOccurred()) + go func() { + s.Write([]byte("foobar")) close(done) - }, 3) + }() + Eventually(session.sendingScheduled).Should(Receive()) + s.(*stream).getDataForWriting(1000) // unblock + }) - It("errors when the SentPacketHandler has too many packets tracked", func() { - streamFrame := frames.StreamFrame{StreamID: 5, Data: []byte("foobar")} - for i := uint32(1); i < protocol.MaxTrackedSentPackets+10; i++ { - packet := ackhandlerlegacy.Packet{PacketNumber: protocol.PacketNumber(i), Frames: []frames.Frame{&streamFrame}, Length: 1} - err := session.sentPacketHandler.SentPacket(&packet) - Expect(err).ToNot(HaveOccurred()) - } - // now session.sentPacketHandler.CheckForError will return an error - err := session.sendPacket() - Expect(err).To(MatchError(ackhandlerlegacy.ErrTooManyTrackedSentPackets)) - }) - - It("stores up to MaxSessionUnprocessedPackets packets", func(done Done) { - // Nothing here should block - for i := 0; i < protocol.MaxSessionUnprocessedPackets+10; i++ { - session.handlePacket(nil, nil, nil) - } - close(done) - }, 0.5) - - It("retransmits RTO packets", func() { - // We simulate consistently low RTTs, so that the test works faster - n := protocol.PacketNumber(10) - for p := protocol.PacketNumber(1); p < n; p++ { - err := session.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{PacketNumber: p, Length: 1}) - Expect(err).NotTo(HaveOccurred()) - time.Sleep(time.Microsecond) - ack := &frames.AckFrame{} - if version == protocol.Version33 { - ack.AckFrameLegacy = &frames.AckFrameLegacy{LargestObserved: p} - } else { - ack.LargestAcked = p - } - err = session.sentPacketHandler.ReceivedAck(ack, p) - Expect(err).NotTo(HaveOccurred()) - } - if version == protocol.Version33 { - session.packer.lastPacketNumber = n - } else { - session.packer.packetNumberGenerator.next = n + 1 - } - // Now, we send a single packet, and expect that it was retransmitted later - err := session.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{ - PacketNumber: n, - Length: 1, - Frames: []frames.Frame{&frames.StreamFrame{ - Data: []byte("foobar"), - }}, - }) + Context("bundling of small packets", func() { + It("bundles two small frames of different streams into one packet", func() { + s1, err := session.GetOrOpenStream(5) Expect(err).NotTo(HaveOccurred()) - go session.run() + s2, err := session.GetOrOpenStream(7) + Expect(err).NotTo(HaveOccurred()) + + // Put data directly into the streams + s1.(*stream).dataForWriting = []byte("foobar1") + s2.(*stream).dataForWriting = []byte("foobar2") + session.scheduleSending() - Eventually(func() [][]byte { return conn.written }).ShouldNot(BeEmpty()) - Expect(conn.written[0]).To(ContainSubstring("foobar")) + go session.run() + + Eventually(func() [][]byte { return conn.written }).Should(HaveLen(1)) + Expect(conn.written[0]).To(ContainSubstring("foobar1")) + Expect(conn.written[0]).To(ContainSubstring("foobar2")) }) - Context("counting streams", func() { - It("errors when too many streams are opened", func() { - for i := 2; i <= 110; i++ { - _, err := session.GetOrOpenStream(protocol.StreamID(i*2 + 1)) - Expect(err).NotTo(HaveOccurred()) - } - _, err := session.GetOrOpenStream(protocol.StreamID(301)) - Expect(err).To(MatchError(qerr.TooManyOpenStreams)) - }) - - It("does not error when many streams are opened and closed", func() { - for i := 2; i <= 1000; i++ { - s, err := session.GetOrOpenStream(protocol.StreamID(i*2 + 1)) - Expect(err).NotTo(HaveOccurred()) - err = s.Close() - Expect(err).NotTo(HaveOccurred()) - s.(*stream).sentFin() - s.CloseRemote(0) - _, err = s.Read([]byte("a")) - Expect(err).To(MatchError(io.EOF)) - session.garbageCollectStreams() - } - }) + It("sends out two big frames in two packets", func() { + s1, err := session.GetOrOpenStream(5) + Expect(err).NotTo(HaveOccurred()) + s2, err := session.GetOrOpenStream(7) + Expect(err).NotTo(HaveOccurred()) + go session.run() + go func() { + defer GinkgoRecover() + _, err2 := s1.Write(bytes.Repeat([]byte{'e'}, 1000)) + Expect(err2).ToNot(HaveOccurred()) + }() + _, err = s2.Write(bytes.Repeat([]byte{'e'}, 1000)) + Expect(err).ToNot(HaveOccurred()) + Eventually(func() [][]byte { return conn.written }).Should(HaveLen(2)) }) - Context("ignoring errors", func() { - It("ignores duplicate acks", func() { - session.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{ - PacketNumber: 1, - Length: 1, - }) - err := session.handleFrames([]frames.Frame{&frames.AckFrame{ - AckFrameLegacy: &frames.AckFrameLegacy{ - LargestObserved: 1, - }, - }}) - Expect(err).NotTo(HaveOccurred()) - err = session.handleFrames([]frames.Frame{&frames.AckFrame{ - AckFrameLegacy: &frames.AckFrameLegacy{ - LargestObserved: 1, - }, - }}) - Expect(err).NotTo(HaveOccurred()) - }) + It("sends out two small frames that are written to long after one another into two packets", func() { + s, err := session.GetOrOpenStream(5) + Expect(err).NotTo(HaveOccurred()) + go session.run() + _, err = s.Write([]byte("foobar1")) + Expect(err).NotTo(HaveOccurred()) + Eventually(func() [][]byte { return conn.written }).Should(HaveLen(1)) + _, err = s.Write([]byte("foobar2")) + Expect(err).NotTo(HaveOccurred()) + Eventually(func() [][]byte { return conn.written }).Should(HaveLen(2)) }) - Context("window updates", func() { - It("gets stream level window updates", func() { - err := session.flowControlManager.AddBytesRead(1, protocol.ReceiveStreamFlowControlWindow) - Expect(err).NotTo(HaveOccurred()) - frames, err := session.getWindowUpdateFrames() - Expect(err).NotTo(HaveOccurred()) - Expect(frames).To(HaveLen(1)) - Expect(frames[0].StreamID).To(Equal(protocol.StreamID(1))) - Expect(frames[0].ByteOffset).To(Equal(protocol.ReceiveStreamFlowControlWindow * 2)) - }) + It("sends a queued ACK frame only once", func() { + packetNumber := protocol.PacketNumber(0x1337) + session.receivedPacketHandler.ReceivedPacket(packetNumber, true) - It("gets connection level window updates", func() { - _, err := session.GetOrOpenStream(5) - Expect(err).NotTo(HaveOccurred()) - err = session.flowControlManager.AddBytesRead(5, protocol.ReceiveConnectionFlowControlWindow) - Expect(err).NotTo(HaveOccurred()) - frames, err := session.getWindowUpdateFrames() - Expect(err).NotTo(HaveOccurred()) - Expect(frames).To(HaveLen(1)) - Expect(frames[0].StreamID).To(Equal(protocol.StreamID(0))) - Expect(frames[0].ByteOffset).To(Equal(protocol.ReceiveConnectionFlowControlWindow * 2)) - }) + s, err := session.GetOrOpenStream(5) + Expect(err).NotTo(HaveOccurred()) + go session.run() + _, err = s.Write([]byte("foobar1")) + Expect(err).NotTo(HaveOccurred()) + Eventually(func() [][]byte { return conn.written }).Should(HaveLen(1)) + _, err = s.Write([]byte("foobar2")) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func() [][]byte { return conn.written }).Should(HaveLen(2)) + Expect(conn.written[0]).To(ContainSubstring(string([]byte{0x37, 0x13}))) + Expect(conn.written[1]).ToNot(ContainSubstring(string([]byte{0x37, 0x13}))) }) }) - } + }) + + It("closes when crypto stream errors", func() { + go session.run() + s, err := session.GetOrOpenStream(3) + Expect(err).NotTo(HaveOccurred()) + err = session.handleStreamFrame(&frames.StreamFrame{ + StreamID: 1, + Data: []byte("4242\x00\x00\x00\x00"), + }) + Expect(err).NotTo(HaveOccurred()) + Eventually(func() bool { return atomic.LoadUint32(&session.closed) != 0 }).Should(BeTrue()) + _, err = s.Write([]byte{}) + Expect(err.(*qerr.QuicError).ErrorCode).To(Equal(qerr.InvalidCryptoMessageType)) + }) + + It("sends public reset after too many undecryptable packets", func() { + // Write protocol.MaxUndecryptablePackets and expect a public reset to happen + for i := 0; i < protocol.MaxUndecryptablePackets; i++ { + hdr := &PublicHeader{ + PacketNumber: protocol.PacketNumber(i + 1), + } + session.handlePacket(nil, hdr, []byte("foobar")) + } + session.run() + + Expect(conn.written).To(HaveLen(1)) + Expect(conn.written[0]).To(ContainSubstring(string([]byte("PRST")))) + }) + + It("ignores undecryptable packets after the handshake is complete", func() { + *(*bool)(unsafe.Pointer(reflect.ValueOf(session.cryptoSetup).Elem().FieldByName("receivedForwardSecurePacket").UnsafeAddr())) = true + for i := 0; i < protocol.MaxUndecryptablePackets; i++ { + hdr := &PublicHeader{ + PacketNumber: protocol.PacketNumber(i + 1), + } + session.handlePacket(nil, hdr, []byte("foobar")) + } + go session.run() + Consistently(session.undecryptablePackets).Should(HaveLen(0)) + session.closeImpl(nil, true) + }) + + It("unqueues undecryptable packets for later decryption", func() { + session.undecryptablePackets = []receivedPacket{{ + nil, + &PublicHeader{PacketNumber: protocol.PacketNumber(42)}, + nil, + }} + Expect(session.receivedPackets).NotTo(Receive()) + session.tryDecryptingQueuedPackets() + Expect(session.undecryptablePackets).To(BeEmpty()) + Expect(session.receivedPackets).To(Receive()) + }) + + It("times out", func(done Done) { + session.connectionParametersManager.SetFromMap(map[handshake.Tag][]byte{ + handshake.TagICSL: {0, 0, 0, 0}, + }) + session.packer.connectionParametersManager = session.connectionParametersManager + session.run() // Would normally not return + Expect(conn.written[0]).To(ContainSubstring("No recent network activity.")) + close(done) + }, 3) + + It("errors when the SentPacketHandler has too many packets tracked", func() { + streamFrame := frames.StreamFrame{StreamID: 5, Data: []byte("foobar")} + for i := uint32(1); i < protocol.MaxTrackedSentPackets+10; i++ { + packet := ackhandlerlegacy.Packet{PacketNumber: protocol.PacketNumber(i), Frames: []frames.Frame{&streamFrame}, Length: 1} + err := session.sentPacketHandler.SentPacket(&packet) + Expect(err).ToNot(HaveOccurred()) + } + // now session.sentPacketHandler.CheckForError will return an error + err := session.sendPacket() + Expect(err).To(MatchError(ackhandlerlegacy.ErrTooManyTrackedSentPackets)) + }) + + It("stores up to MaxSessionUnprocessedPackets packets", func(done Done) { + // Nothing here should block + for i := 0; i < protocol.MaxSessionUnprocessedPackets+10; i++ { + session.handlePacket(nil, nil, nil) + } + close(done) + }, 0.5) + + It("retransmits RTO packets", func() { + // We simulate consistently low RTTs, so that the test works faster + n := protocol.PacketNumber(10) + for p := protocol.PacketNumber(1); p < n; p++ { + err := session.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{PacketNumber: p, Length: 1}) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Microsecond) + ack := &frames.AckFrame{} + ack.LargestAcked = p + err = session.sentPacketHandler.ReceivedAck(ack, p) + Expect(err).NotTo(HaveOccurred()) + } + session.packer.packetNumberGenerator.next = n + 1 + // Now, we send a single packet, and expect that it was retransmitted later + err := session.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{ + PacketNumber: n, + Length: 1, + Frames: []frames.Frame{&frames.StreamFrame{ + Data: []byte("foobar"), + }}, + }) + Expect(err).NotTo(HaveOccurred()) + go session.run() + session.scheduleSending() + Eventually(func() [][]byte { return conn.written }).ShouldNot(BeEmpty()) + Expect(conn.written[0]).To(ContainSubstring("foobar")) + }) + + Context("counting streams", func() { + It("errors when too many streams are opened", func() { + for i := 2; i <= 110; i++ { + _, err := session.GetOrOpenStream(protocol.StreamID(i*2 + 1)) + Expect(err).NotTo(HaveOccurred()) + } + _, err := session.GetOrOpenStream(protocol.StreamID(301)) + Expect(err).To(MatchError(qerr.TooManyOpenStreams)) + }) + + It("does not error when many streams are opened and closed", func() { + for i := 2; i <= 1000; i++ { + s, err := session.GetOrOpenStream(protocol.StreamID(i*2 + 1)) + Expect(err).NotTo(HaveOccurred()) + err = s.Close() + Expect(err).NotTo(HaveOccurred()) + s.(*stream).sentFin() + s.CloseRemote(0) + _, err = s.Read([]byte("a")) + Expect(err).To(MatchError(io.EOF)) + session.garbageCollectStreams() + } + }) + }) + + Context("ignoring errors", func() { + It("ignores duplicate acks", func() { + session.sentPacketHandler.SentPacket(&ackhandlerlegacy.Packet{ + PacketNumber: 1, + Length: 1, + }) + err := session.handleFrames([]frames.Frame{&frames.AckFrame{ + AckFrameLegacy: &frames.AckFrameLegacy{ + LargestObserved: 1, + }, + }}) + Expect(err).NotTo(HaveOccurred()) + err = session.handleFrames([]frames.Frame{&frames.AckFrame{ + AckFrameLegacy: &frames.AckFrameLegacy{ + LargestObserved: 1, + }, + }}) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("window updates", func() { + It("gets stream level window updates", func() { + err := session.flowControlManager.AddBytesRead(1, protocol.ReceiveStreamFlowControlWindow) + Expect(err).NotTo(HaveOccurred()) + frames, err := session.getWindowUpdateFrames() + Expect(err).NotTo(HaveOccurred()) + Expect(frames).To(HaveLen(1)) + Expect(frames[0].StreamID).To(Equal(protocol.StreamID(1))) + Expect(frames[0].ByteOffset).To(Equal(protocol.ReceiveStreamFlowControlWindow * 2)) + }) + + It("gets connection level window updates", func() { + _, err := session.GetOrOpenStream(5) + Expect(err).NotTo(HaveOccurred()) + err = session.flowControlManager.AddBytesRead(5, protocol.ReceiveConnectionFlowControlWindow) + Expect(err).NotTo(HaveOccurred()) + frames, err := session.getWindowUpdateFrames() + Expect(err).NotTo(HaveOccurred()) + Expect(frames).To(HaveLen(1)) + Expect(frames[0].StreamID).To(Equal(protocol.StreamID(0))) + Expect(frames[0].ByteOffset).To(Equal(protocol.ReceiveConnectionFlowControlWindow * 2)) + }) + }) })