queue stream-related control frames in the respective stream (#4610)

* use a separate method for queueing control frames from the streams map

* queue stream-related control frames in the respective stream
This commit is contained in:
Marten Seemann
2024-08-04 15:42:56 -07:00
committed by GitHub
parent d1f9af4cc6
commit af9fa7a555
20 changed files with 526 additions and 553 deletions

View File

@@ -15,14 +15,13 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.uber.org/mock/gomock"
)
var _ = Describe("Streams Map (outgoing)", func() {
var (
m *outgoingStreamsMap[*mockGenericStream]
newStr func(num protocol.StreamNum) *mockGenericStream
mockSender *MockStreamSender
m *outgoingStreamsMap[*mockGenericStream]
newStr func(num protocol.StreamNum) *mockGenericStream
queuedControlFrames []wire.Frame
)
const streamType = 42
@@ -37,11 +36,15 @@ var _ = Describe("Streams Map (outgoing)", func() {
}
BeforeEach(func() {
queuedControlFrames = []wire.Frame{}
newStr = func(num protocol.StreamNum) *mockGenericStream {
return &mockGenericStream{num: num}
}
mockSender = NewMockStreamSender(mockCtrl)
m = newOutgoingStreamsMap[*mockGenericStream](streamType, newStr, mockSender.queueControlFrame)
m = newOutgoingStreamsMap[*mockGenericStream](
streamType,
newStr,
func(f wire.Frame) { queuedControlFrames = append(queuedControlFrames, f) },
)
})
Context("no stream ID limit", func() {
@@ -130,7 +133,6 @@ var _ = Describe("Streams Map (outgoing)", func() {
Context("with stream ID limits", func() {
It("errors when no stream can be opened immediately", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
_, err := m.OpenStream()
expectTooManyStreamsError(err)
})
@@ -143,7 +145,6 @@ var _ = Describe("Streams Map (outgoing)", func() {
})
It("blocks until a stream can be opened synchronously", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@@ -159,7 +160,6 @@ var _ = Describe("Streams Map (outgoing)", func() {
})
It("unblocks when the context is canceled", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
@@ -181,7 +181,6 @@ var _ = Describe("Streams Map (outgoing)", func() {
})
It("opens streams in the right order", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
done1 := make(chan struct{})
go func() {
defer GinkgoRecover()
@@ -210,7 +209,6 @@ var _ = Describe("Streams Map (outgoing)", func() {
})
It("opens streams in the right order, when one of the contexts is canceled", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
done1 := make(chan struct{})
go func() {
defer GinkgoRecover()
@@ -249,7 +247,6 @@ var _ = Describe("Streams Map (outgoing)", func() {
})
It("unblocks multiple OpenStreamSync calls at the same time", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
done := make(chan struct{})
go func() {
defer GinkgoRecover()
@@ -282,7 +279,6 @@ var _ = Describe("Streams Map (outgoing)", func() {
})
It("returns an error for OpenStream while an OpenStreamSync call is blocking", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any()).MaxTimes(2)
openedSync := make(chan struct{})
go func() {
defer GinkgoRecover()
@@ -322,7 +318,6 @@ var _ = Describe("Streams Map (outgoing)", func() {
})
It("stops opening synchronously when it is closed", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any())
testErr := errors.New("test error")
done := make(chan struct{})
go func() {
@@ -355,33 +350,31 @@ var _ = Describe("Streams Map (outgoing)", func() {
Expect(err).ToNot(HaveOccurred())
}
mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
bf := f.(*wire.StreamsBlockedFrame)
Expect(bf.Type).To(BeEquivalentTo(streamType))
Expect(bf.StreamLimit).To(BeEquivalentTo(6))
})
Expect(queuedControlFrames).To(BeEmpty())
_, err := m.OpenStream()
Expect(err).To(MatchError(&StreamLimitReachedError{}))
Expect(queuedControlFrames).To(HaveLen(1))
bf := queuedControlFrames[0].(*wire.StreamsBlockedFrame)
Expect(bf.Type).To(BeEquivalentTo(streamType))
Expect(bf.StreamLimit).To(BeEquivalentTo(6))
})
It("only sends one STREAMS_BLOCKED frame for one stream ID", func() {
m.SetMaxStream(1)
mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(1))
})
_, err := m.OpenStream()
Expect(err).ToNot(HaveOccurred())
Expect(queuedControlFrames).To(BeEmpty())
// try to open a stream twice, but expect only one STREAMS_BLOCKED to be sent
_, err = m.OpenStream()
expectTooManyStreamsError(err)
Expect(queuedControlFrames).To(HaveLen(1))
Expect(queuedControlFrames[0].(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(1))
_, err = m.OpenStream()
expectTooManyStreamsError(err)
Expect(queuedControlFrames).To(HaveLen(1))
})
It("queues a STREAMS_BLOCKED frame when there more streams waiting for OpenStreamSync than MAX_STREAMS allows", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(0))
})
done := make(chan struct{}, 2)
go func() {
defer GinkgoRecover()
@@ -396,13 +389,14 @@ var _ = Describe("Streams Map (outgoing)", func() {
done <- struct{}{}
}()
waitForEnqueued(2)
Expect(queuedControlFrames).To(HaveLen(1))
Expect(queuedControlFrames[0].(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(0))
mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
Expect(f.(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(1))
})
m.SetMaxStream(1)
Eventually(done).Should(Receive())
Consistently(done).ShouldNot(Receive())
Expect(queuedControlFrames).To(HaveLen(2))
Expect(queuedControlFrames[1].(*wire.StreamsBlockedFrame).StreamLimit).To(BeEquivalentTo(1))
m.SetMaxStream(2)
Eventually(done).Should(Receive())
})
@@ -414,10 +408,6 @@ var _ = Describe("Streams Map (outgoing)", func() {
const n = 100
fmt.Fprintf(GinkgoWriter, "Opening %d streams concurrently.\n", n)
var blockedAt []protocol.StreamNum
mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
blockedAt = append(blockedAt, f.(*wire.StreamsBlockedFrame).StreamLimit)
}).AnyTimes()
done := make(map[int]chan struct{})
for i := 1; i <= n; i++ {
c := make(chan struct{})
@@ -456,6 +446,10 @@ var _ = Describe("Streams Map (outgoing)", func() {
Expect(str.num).To(Equal(protocol.StreamNum(n + 1)))
}
}
var blockedAt []protocol.StreamNum
for _, f := range queuedControlFrames {
blockedAt = append(blockedAt, f.(*wire.StreamsBlockedFrame).StreamLimit)
}
Expect(blockedAt).To(Equal(limits))
})
@@ -464,11 +458,6 @@ var _ = Describe("Streams Map (outgoing)", func() {
const n = 100
fmt.Fprintf(GinkgoWriter, "Opening %d streams concurrently.\n", n)
var blockedAt []protocol.StreamNum
mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) {
blockedAt = append(blockedAt, f.(*wire.StreamsBlockedFrame).StreamLimit)
}).AnyTimes()
ctx, cancel := context.WithCancel(context.Background())
streamsToCancel := make(map[protocol.StreamNum]struct{}) // used as a set
for i := 0; i < 10; i++ {
@@ -537,6 +526,10 @@ var _ = Describe("Streams Map (outgoing)", func() {
Expect(streamIDs[i]).To(Equal(i + 1))
}
}
var blockedAt []protocol.StreamNum
for _, f := range queuedControlFrames {
blockedAt = append(blockedAt, f.(*wire.StreamsBlockedFrame).StreamLimit)
}
Expect(blockedAt).To(Equal(limits))
})
})