forked from quic-go/quic-go
27
session.go
27
session.go
@@ -22,6 +22,10 @@ type receivedPacket struct {
|
||||
r *bytes.Reader
|
||||
}
|
||||
|
||||
var (
|
||||
errRstStreamOnInvalidStream = errors.New("RST_STREAM received for unknown stream")
|
||||
)
|
||||
|
||||
// StreamCallback gets a stream frame and returns a reply frame
|
||||
type StreamCallback func(*Session, utils.Stream)
|
||||
|
||||
@@ -81,9 +85,13 @@ func (s *Session) Run() {
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err == ackhandler.ErrDuplicateOrOutOfOrderAck {
|
||||
switch err {
|
||||
// Can happen e.g. when packets thought missing arrive late
|
||||
case ackhandler.ErrDuplicateOrOutOfOrderAck:
|
||||
// Can happen when RST_STREAMs arrive early or late (?)
|
||||
case errRstStreamOnInvalidStream:
|
||||
fmt.Printf("Ignoring error in session: %s\n", err.Error())
|
||||
} else {
|
||||
default:
|
||||
s.Close(err)
|
||||
}
|
||||
}
|
||||
@@ -128,7 +136,7 @@ func (s *Session) handlePacket(addr *net.UDPAddr, publicHeader *PublicHeader, r
|
||||
case *frames.StopWaitingFrame:
|
||||
err = s.receivedPacketHandler.ReceivedStopWaiting(frame)
|
||||
case *frames.RstStreamFrame:
|
||||
fmt.Printf("%#v\n", frame)
|
||||
err = s.handleRstStreamFrame(frame)
|
||||
case *frames.WindowUpdateFrame:
|
||||
fmt.Printf("%#v\n", frame)
|
||||
default:
|
||||
@@ -146,6 +154,7 @@ func (s *Session) HandlePacket(addr *net.UDPAddr, publicHeader *PublicHeader, r
|
||||
s.receivedPackets <- receivedPacket{addr: addr, publicHeader: publicHeader, r: r}
|
||||
}
|
||||
|
||||
// TODO: Ignore data for closed streams
|
||||
func (s *Session) handleStreamFrame(frame *frames.StreamFrame) error {
|
||||
fmt.Printf("Got %d bytes for stream %d\n", len(frame.Data), frame.StreamID)
|
||||
if frame.StreamID == 0 {
|
||||
@@ -172,6 +181,18 @@ func (s *Session) handleStreamFrame(frame *frames.StreamFrame) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: Handle frame.byteOffset
|
||||
func (s *Session) handleRstStreamFrame(frame *frames.RstStreamFrame) error {
|
||||
s.streamsMutex.RLock()
|
||||
str, streamExists := s.streams[frame.StreamID]
|
||||
s.streamsMutex.RUnlock()
|
||||
if !streamExists || str == nil {
|
||||
return errRstStreamOnInvalidStream
|
||||
}
|
||||
str.RegisterError(fmt.Errorf("RST_STREAM received with code %d", frame.ErrorCode))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close the connection by sending a ConnectionClose frame
|
||||
func (s *Session) Close(e error) error {
|
||||
if e == nil {
|
||||
|
||||
@@ -129,4 +129,30 @@ var _ = Describe("Session", func() {
|
||||
Expect(err).To(MatchError("Session: reopening streams is not allowed"))
|
||||
})
|
||||
})
|
||||
|
||||
Context("handling RST_STREAM frames", func() {
|
||||
It("closes the receiving streams for writing and reading", func() {
|
||||
s, err := session.NewStream(5)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = session.handleRstStreamFrame(&frames.RstStreamFrame{
|
||||
StreamID: 5,
|
||||
ErrorCode: 42,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
n, err := s.Write([]byte{0})
|
||||
Expect(n).To(BeZero())
|
||||
Expect(err).To(MatchError("RST_STREAM received with code 42"))
|
||||
n, err = s.Read([]byte{0})
|
||||
Expect(n).To(BeZero())
|
||||
Expect(err).To(MatchError("RST_STREAM received with code 42"))
|
||||
})
|
||||
|
||||
It("errors when the stream is not known", func() {
|
||||
err := session.handleRstStreamFrame(&frames.RstStreamFrame{
|
||||
StreamID: 5,
|
||||
ErrorCode: 42,
|
||||
})
|
||||
Expect(err).To(MatchError("RST_STREAM received for unknown stream"))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user