Merge pull request #1090 from lucas-clemente/better-h2quic-client-errors

improve error handling in the h2quic client for header stream handling
This commit is contained in:
Marten Seemann
2018-01-03 09:36:53 +07:00
committed by GitHub
2 changed files with 44 additions and 50 deletions

View File

@@ -97,24 +97,23 @@ func (c *client) handleHeaderStream() {
decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) {}) decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) {})
h2framer := http2.NewFramer(nil, c.headerStream) h2framer := http2.NewFramer(nil, c.headerStream)
var lastStream protocol.StreamID var err error
for { for {
frame, err := h2framer.ReadFrame() var frame http2.Frame
frame, err = h2framer.ReadFrame()
if err != nil { if err != nil {
c.headerErr = qerr.Error(qerr.HeadersStreamDataDecompressFailure, "cannot read frame")
break break
} }
lastStream = protocol.StreamID(frame.Header().StreamID) lastStream := protocol.StreamID(frame.Header().StreamID)
hframe, ok := frame.(*http2.HeadersFrame) hframe, ok := frame.(*http2.HeadersFrame)
if !ok { if !ok {
c.headerErr = qerr.Error(qerr.InvalidHeadersStreamData, "not a headers frame") err = errors.New("not a headers frame")
break break
} }
mhframe := &http2.MetaHeadersFrame{HeadersFrame: hframe} mhframe := &http2.MetaHeadersFrame{HeadersFrame: hframe}
mhframe.Fields, err = decoder.DecodeFull(hframe.HeaderBlockFragment()) mhframe.Fields, err = decoder.DecodeFull(hframe.HeaderBlockFragment())
if err != nil { if err != nil {
c.headerErr = qerr.Error(qerr.InvalidHeadersStreamData, "cannot read header fields") err = fmt.Errorf("cannot read header fields: %s", err.Error())
break break
} }
@@ -122,19 +121,21 @@ func (c *client) handleHeaderStream() {
responseChan, ok := c.responses[protocol.StreamID(hframe.StreamID)] responseChan, ok := c.responses[protocol.StreamID(hframe.StreamID)]
c.mutex.RUnlock() c.mutex.RUnlock()
if !ok { if !ok {
c.headerErr = qerr.Error(qerr.InternalError, fmt.Sprintf("h2client BUG: response channel for stream %d not found", lastStream)) err = fmt.Errorf("response channel for stream %d not found", lastStream)
break break
} }
rsp, err := responseFromHeaders(mhframe) var rsp *http.Response
rsp, err = responseFromHeaders(mhframe)
if err != nil { if err != nil {
c.headerErr = qerr.Error(qerr.InternalError, err.Error()) break
} }
responseChan <- rsp responseChan <- rsp
} }
utils.Debugf("Error handling header stream: %s", err)
c.headerErr = qerr.Error(qerr.InvalidHeadersStreamData, err.Error())
// stop all running request // stop all running request
utils.Debugf("Error handling header stream %d: %s", lastStream, c.headerErr.Error())
close(c.headerErrored) close(c.headerErrored)
} }

View File

@@ -188,41 +188,31 @@ var _ = Describe("Client", func() {
close(done) close(done)
}) })
It("closes the quic client when encountering an error on the header stream", func(done Done) { It("closes the quic client when encountering an error on the header stream", func() {
headerStream.dataToRead.Write(bytes.Repeat([]byte{0}, 100)) headerStream.dataToRead.Write(bytes.Repeat([]byte{0}, 100))
var doReturned bool done := make(chan struct{})
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
var err error
rsp, err := client.RoundTrip(request) rsp, err := client.RoundTrip(request)
Expect(err).To(MatchError(client.headerErr)) Expect(err).To(MatchError(client.headerErr))
Expect(rsp).To(BeNil()) Expect(rsp).To(BeNil())
doReturned = true close(done)
}() }()
Eventually(func() bool { return doReturned }).Should(BeTrue()) Eventually(done).Should(BeClosed())
Expect(client.headerErr).To(MatchError(qerr.Error(qerr.HeadersStreamDataDecompressFailure, "cannot read frame"))) Expect(client.headerErr.ErrorCode).To(Equal(qerr.InvalidHeadersStreamData))
Expect(client.session.(*mockSession).closedWithError).To(MatchError(client.headerErr)) Expect(client.session.(*mockSession).closedWithError).To(MatchError(client.headerErr))
close(done) })
}, 2)
It("returns subsequent request if there was an error on the header stream before", func(done Done) { It("returns subsequent request if there was an error on the header stream before", func() {
expectedErr := qerr.Error(qerr.HeadersStreamDataDecompressFailure, "cannot read frame")
session.streamsToOpen = []quic.Stream{headerStream, dataStream, newMockStream(7)} session.streamsToOpen = []quic.Stream{headerStream, dataStream, newMockStream(7)}
headerStream.dataToRead.Write(bytes.Repeat([]byte{0}, 100)) headerStream.dataToRead.Write(bytes.Repeat([]byte{0}, 100))
var firstReqReturned bool
go func() {
defer GinkgoRecover()
_, err := client.RoundTrip(request)
Expect(err).To(MatchError(expectedErr))
firstReqReturned = true
}()
Eventually(func() bool { return firstReqReturned }).Should(BeTrue())
// now that the first request failed due to an error on the header stream, try another request
_, err := client.RoundTrip(request) _, err := client.RoundTrip(request)
Expect(err).To(MatchError(expectedErr)) Expect(err).To(BeAssignableToTypeOf(&qerr.QuicError{}))
close(done) Expect(err.(*qerr.QuicError).ErrorCode).To(Equal(qerr.InvalidHeadersStreamData))
// now that the first request failed due to an error on the header stream, try another request
_, nextErr := client.RoundTrip(request)
Expect(nextErr).To(MatchError(err))
}) })
It("blocks if no stream is available", func() { It("blocks if no stream is available", func() {
@@ -479,16 +469,9 @@ var _ = Describe("Client", func() {
It("errors if the H2 frame is not a HeadersFrame", func() { It("errors if the H2 frame is not a HeadersFrame", func() {
h2framer.WritePing(true, [8]byte{0, 0, 0, 0, 0, 0, 0, 0}) h2framer.WritePing(true, [8]byte{0, 0, 0, 0, 0, 0, 0, 0})
client.handleHeaderStream()
var handlerReturned bool
go func() {
client.handleHeaderStream()
handlerReturned = true
}()
Eventually(client.headerErrored).Should(BeClosed()) Eventually(client.headerErrored).Should(BeClosed())
Expect(client.headerErr).To(MatchError(qerr.Error(qerr.InvalidHeadersStreamData, "not a headers frame"))) Expect(client.headerErr).To(MatchError(qerr.Error(qerr.InvalidHeadersStreamData, "not a headers frame")))
Eventually(func() bool { return handlerReturned }).Should(BeTrue())
}) })
It("errors if it can't read the HPACK encoded header fields", func() { It("errors if it can't read the HPACK encoded header fields", func() {
@@ -497,16 +480,26 @@ var _ = Describe("Client", func() {
EndHeaders: true, EndHeaders: true,
BlockFragment: []byte("invalid HPACK data"), BlockFragment: []byte("invalid HPACK data"),
}) })
client.handleHeaderStream()
var handlerReturned bool
go func() {
client.handleHeaderStream()
handlerReturned = true
}()
Eventually(client.headerErrored).Should(BeClosed()) Eventually(client.headerErrored).Should(BeClosed())
Expect(client.headerErr).To(MatchError(qerr.Error(qerr.InvalidHeadersStreamData, "cannot read header fields"))) Expect(client.headerErr.ErrorCode).To(Equal(qerr.InvalidHeadersStreamData))
Eventually(func() bool { return handlerReturned }).Should(BeTrue()) Expect(client.headerErr.ErrorMessage).To(ContainSubstring("cannot read header fields"))
})
It("errors if the stream cannot be found", func() {
var headers bytes.Buffer
enc := hpack.NewEncoder(&headers)
enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
err := h2framer.WriteHeaders(http2.HeadersFrameParam{
StreamID: 1337,
EndHeaders: true,
BlockFragment: headers.Bytes(),
})
Expect(err).ToNot(HaveOccurred())
client.handleHeaderStream()
Eventually(client.headerErrored).Should(BeClosed())
Expect(client.headerErr.ErrorCode).To(Equal(qerr.InvalidHeadersStreamData))
Expect(client.headerErr.ErrorMessage).To(ContainSubstring("response channel for stream 1337 not found"))
}) })
}) })
}) })