diff --git a/datagram_queue.go b/datagram_queue.go new file mode 100644 index 000000000..eb32fe7cf --- /dev/null +++ b/datagram_queue.go @@ -0,0 +1,51 @@ +package quic + +import ( + "sync" + + "github.com/lucas-clemente/quic-go/internal/wire" +) + +type datagramQueue struct { + mutex sync.Mutex + queue chan *wire.DatagramFrame + + closeErr error + closed chan struct{} + + hasData func() +} + +func newDatagramQueue(hasData func()) *datagramQueue { + return &datagramQueue{ + queue: make(chan *wire.DatagramFrame), + hasData: hasData, + closed: make(chan struct{}), + } +} + +// AddAndWait queues a new DATAGRAM frame. +// It blocks until the frame has been dequeued. +func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error { + h.hasData() + select { + case h.queue <- f: + return nil + case <-h.closed: + return h.closeErr + } +} + +func (h *datagramQueue) Get() *wire.DatagramFrame { + select { + case f := <-h.queue: + return f + default: + return nil + } +} + +func (h *datagramQueue) CloseWithError(e error) { + h.closeErr = e + close(h.closed) +} diff --git a/datagram_queue_test.go b/datagram_queue_test.go new file mode 100644 index 000000000..36e35a7e3 --- /dev/null +++ b/datagram_queue_test.go @@ -0,0 +1,54 @@ +package quic + +import ( + "errors" + + "github.com/lucas-clemente/quic-go/internal/wire" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Datagram Queue", func() { + var queue *datagramQueue + var queued chan struct{} + + BeforeEach(func() { + queued = make(chan struct{}, 100) + queue = newDatagramQueue(func() { + queued <- struct{}{} + }) + }) + + It("returns nil when there's no datagram to send", func() { + Expect(queue.Get()).To(BeNil()) + }) + + It("queues a datagram", func() { + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(done) + Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foobar")})).To(Succeed()) + }() + + Eventually(queued).Should(HaveLen(1)) + Consistently(done).ShouldNot(BeClosed()) + f := queue.Get() + Expect(f).ToNot(BeNil()) + Expect(f.Data).To(Equal([]byte("foobar"))) + Eventually(done).Should(BeClosed()) + Expect(queue.Get()).To(BeNil()) + }) + + It("closes", func() { + errChan := make(chan error, 1) + go func() { + defer GinkgoRecover() + errChan <- queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foobar")}) + }() + + Consistently(errChan).ShouldNot(Receive()) + queue.CloseWithError(errors.New("test error")) + Eventually(errChan).Should(Receive(MatchError("test error"))) + }) +})