forked from quic-go/quic-go
http3: add support for HTTP Datagrams (RFC 9297) (#4452)
* http3: add support for HTTP Datagrams (RFC 9297) * README: reference HTTP Datagrams (RFC 9297)
This commit is contained in:
100
http3/datagram.go
Normal file
100
http3/datagram.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package http3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const maxQuarterStreamID = 1<<60 - 1
|
||||
|
||||
const streamDatagramQueueLen = 32
|
||||
|
||||
type datagrammer struct {
|
||||
sendDatagram func([]byte) error
|
||||
|
||||
hasData chan struct{}
|
||||
queue [][]byte // TODO: use a ring buffer
|
||||
|
||||
mx sync.Mutex
|
||||
sendErr error
|
||||
receiveErr error
|
||||
}
|
||||
|
||||
func newDatagrammer(sendDatagram func([]byte) error) *datagrammer {
|
||||
return &datagrammer{
|
||||
sendDatagram: sendDatagram,
|
||||
hasData: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *datagrammer) SetReceiveError(err error) (isDone bool) {
|
||||
d.mx.Lock()
|
||||
defer d.mx.Unlock()
|
||||
|
||||
d.receiveErr = err
|
||||
d.signalHasData()
|
||||
return d.sendErr != nil
|
||||
}
|
||||
|
||||
func (d *datagrammer) SetSendError(err error) (isDone bool) {
|
||||
d.mx.Lock()
|
||||
defer d.mx.Unlock()
|
||||
|
||||
d.sendErr = err
|
||||
return d.receiveErr != nil
|
||||
}
|
||||
|
||||
func (d *datagrammer) Send(b []byte) error {
|
||||
d.mx.Lock()
|
||||
sendErr := d.sendErr
|
||||
d.mx.Unlock()
|
||||
if sendErr != nil {
|
||||
return sendErr
|
||||
}
|
||||
|
||||
return d.sendDatagram(b)
|
||||
}
|
||||
|
||||
func (d *datagrammer) signalHasData() {
|
||||
select {
|
||||
case d.hasData <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (d *datagrammer) enqueue(data []byte) {
|
||||
d.mx.Lock()
|
||||
defer d.mx.Unlock()
|
||||
|
||||
if d.receiveErr != nil {
|
||||
return
|
||||
}
|
||||
if len(d.queue) >= streamDatagramQueueLen {
|
||||
return
|
||||
}
|
||||
d.queue = append(d.queue, data)
|
||||
d.signalHasData()
|
||||
}
|
||||
|
||||
func (d *datagrammer) Receive(ctx context.Context) ([]byte, error) {
|
||||
start:
|
||||
d.mx.Lock()
|
||||
if len(d.queue) >= 1 {
|
||||
data := d.queue[0]
|
||||
d.queue = d.queue[1:]
|
||||
d.mx.Unlock()
|
||||
return data, nil
|
||||
}
|
||||
if d.receiveErr != nil {
|
||||
d.mx.Unlock()
|
||||
return nil, d.receiveErr
|
||||
}
|
||||
d.mx.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, context.Cause(ctx)
|
||||
case <-d.hasData:
|
||||
}
|
||||
goto start
|
||||
}
|
||||
Reference in New Issue
Block a user