use the STREAM frame buffer for receiving data

This commit is contained in:
Marten Seemann
2019-09-04 16:51:58 +07:00
parent 5ea33cd31e
commit ba1bcf6e0c
4 changed files with 394 additions and 221 deletions

View File

@@ -7,8 +7,13 @@ import (
"github.com/lucas-clemente/quic-go/internal/utils"
)
type frameSorterEntry struct {
Data []byte
DoneCb func()
}
type frameSorter struct {
queue map[protocol.ByteCount][]byte
queue map[protocol.ByteCount]frameSorterEntry
readPos protocol.ByteCount
gaps *utils.ByteIntervalList
}
@@ -18,30 +23,38 @@ var errDuplicateStreamData = errors.New("Duplicate Stream Data")
func newFrameSorter() *frameSorter {
s := frameSorter{
gaps: utils.NewByteIntervalList(),
queue: make(map[protocol.ByteCount][]byte),
queue: make(map[protocol.ByteCount]frameSorterEntry),
}
s.gaps.PushFront(utils.ByteInterval{Start: 0, End: protocol.MaxByteCount})
return &s
}
func (s *frameSorter) Push(data []byte, offset protocol.ByteCount) error {
err := s.push(data, offset)
func (s *frameSorter) Push(data []byte, offset protocol.ByteCount, doneCb func()) error {
err := s.push(data, offset, doneCb)
if err == errDuplicateStreamData {
if doneCb != nil {
doneCb()
}
return nil
}
return err
}
func (s *frameSorter) push(data []byte, offset protocol.ByteCount) error {
func (s *frameSorter) push(data []byte, offset protocol.ByteCount, doneCb func()) error {
if len(data) == 0 {
return nil
return errDuplicateStreamData
}
if oldData, ok := s.queue[offset]; ok {
if len(data) <= len(oldData) {
if oldEntry, ok := s.queue[offset]; ok {
if len(data) <= len(oldEntry.Data) {
return errDuplicateStreamData
}
s.queue[offset] = data
// The data we currently have is shorter than the new data.
// Replace it.
if oldEntry.DoneCb != nil {
oldEntry.DoneCb()
}
s.queue[offset] = frameSorterEntry{Data: data, DoneCb: doneCb}
}
start := offset
@@ -82,11 +95,17 @@ func (s *frameSorter) push(data []byte, offset protocol.ByteCount) error {
if endGap != gap {
s.gaps.Remove(endGap)
}
if end <= nextEndGap.Value.Start {
if end < nextEndGap.Value.Start {
break
}
// delete queued frames completely covered by the current frame
delete(s.queue, endGap.Value.End)
end := endGap.Value.End
if end != offset {
if cb := s.queue[end].DoneCb; cb != nil {
cb()
}
delete(s.queue, end)
}
endGap = nextEndGap
}
@@ -130,25 +149,29 @@ func (s *frameSorter) push(data []byte, offset protocol.ByteCount) error {
return errors.New("Too many gaps in received data")
}
if wasCut {
if wasCut && len(data) < protocol.MinStreamFrameBufferSize {
newData := make([]byte, len(data))
copy(newData, data)
data = newData
if doneCb != nil {
doneCb()
doneCb = nil
}
}
s.queue[offset] = data
s.queue[offset] = frameSorterEntry{Data: data, DoneCb: doneCb}
return nil
}
func (s *frameSorter) Pop() (protocol.ByteCount, []byte) {
data, ok := s.queue[s.readPos]
func (s *frameSorter) Pop() (protocol.ByteCount, []byte, func()) {
entry, ok := s.queue[s.readPos]
if !ok {
return s.readPos, nil
return s.readPos, nil, nil
}
delete(s.queue, s.readPos)
offset := s.readPos
s.readPos += protocol.ByteCount(len(data))
return offset, data
s.readPos += protocol.ByteCount(len(entry.Data))
return offset, entry.Data, entry.DoneCb
}
// HasMoreData says if there is any more data queued at *any* offset.