refactor session to support very basic packet pacing

This commit is contained in:
Lucas Clemente
2016-04-25 14:59:18 +02:00
parent c6b69e632e
commit a126e0e606
5 changed files with 70 additions and 50 deletions

View File

@@ -11,6 +11,7 @@ import (
// The PublicHeader of a QUIC packet
type PublicHeader struct {
Raw []byte
VersionFlag bool
ResetFlag bool
ConnectionID protocol.ConnectionID

View File

@@ -13,7 +13,8 @@ import (
)
type PacketHandler interface {
HandlePacket(addr *net.UDPAddr, publicHeaderBinary []byte, publicHeader *PublicHeader, r *bytes.Reader) error
HandlePacket(addr *net.UDPAddr, publicHeader *PublicHeader, r *bytes.Reader)
Run()
}
// A Server of QUIC
@@ -88,6 +89,7 @@ func (s *Server) handlePacket(conn *net.UDPConn, remoteAddr *net.UDPAddr, packet
// ToDo: send errorcodes.QUIC_INVALID_PACKET_HEADER
return errors.New("Could not parse public header")
}
publicHeader.Raw = packet[:len(packet)-r.Len()]
// fmt.Printf("<- Got packet %d (%d bytes) from %v\n", publicHeader.PacketNumber, n, remoteAddr)
@@ -105,12 +107,10 @@ func (s *Server) handlePacket(conn *net.UDPConn, remoteAddr *net.UDPAddr, packet
if !ok {
fmt.Printf("Serving new connection: %d from %v\n", publicHeader.ConnectionID, remoteAddr)
session = s.newSession(conn, publicHeader.VersionNumber, publicHeader.ConnectionID, s.scfg, s.streamCallback)
go session.Run()
s.sessions[publicHeader.ConnectionID] = session
}
err = session.HandlePacket(remoteAddr, packet[0:len(packet)-r.Len()], publicHeader, r)
if err != nil {
return err
}
session.HandlePacket(remoteAddr, publicHeader, r)
return nil
}

View File

@@ -17,9 +17,11 @@ type mockSession struct {
packetCount int
}
func (s *mockSession) HandlePacket(addr *net.UDPAddr, publicHeaderBinary []byte, publicHeader *PublicHeader, r *bytes.Reader) error {
func (s *mockSession) HandlePacket(addr *net.UDPAddr, publicHeader *PublicHeader, r *bytes.Reader) {
s.packetCount++
return nil
}
func (s *mockSession) Run() {
}
func newMockSession(conn *net.UDPConn, v protocol.VersionNumber, connectionID protocol.ConnectionID, sCfg *handshake.ServerConfig, streamCallback StreamCallback) PacketHandler {

View File

@@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/lucas-clemente/quic-go/ackhandler"
"github.com/lucas-clemente/quic-go/frames"
@@ -13,6 +13,12 @@ import (
"github.com/lucas-clemente/quic-go/protocol"
)
type receivedPacket struct {
addr *net.UDPAddr
publicHeader *PublicHeader
r *bytes.Reader
}
// StreamCallback gets a stream frame and returns a reply frame
type StreamCallback func(*Session, *Stream)
@@ -23,8 +29,7 @@ type Session struct {
Connection *net.UDPConn
CurrentRemoteAddr *net.UDPAddr
Streams map[protocol.StreamID]*Stream
streamsMutex sync.RWMutex
Streams map[protocol.StreamID]*Stream
outgoingAckHandler ackhandler.OutgoingPacketAckHandler
incomingAckHandler ackhandler.IncomingPacketAckHandler
@@ -32,6 +37,8 @@ type Session struct {
unpacker *packetUnpacker
packer *packetPacker
batchMode bool
receivedPackets chan receivedPacket
}
// NewSession makes a new session
@@ -42,6 +49,7 @@ func NewSession(conn *net.UDPConn, v protocol.VersionNumber, connectionID protoc
Streams: make(map[protocol.StreamID]*Stream),
outgoingAckHandler: ackhandler.NewOutgoingPacketAckHandler(),
incomingAckHandler: ackhandler.NewIncomingPacketAckHandler(),
receivedPackets: make(chan receivedPacket),
}
cryptoStream, _ := session.NewStream(1)
@@ -54,23 +62,37 @@ func NewSession(conn *net.UDPConn, v protocol.VersionNumber, connectionID protoc
return session
}
// HandlePacket handles a packet
func (s *Session) HandlePacket(addr *net.UDPAddr, publicHeaderBinary []byte, publicHeader *PublicHeader, r *bytes.Reader) error {
s.batchMode = true
// Run the session main loop
func (s *Session) Run() {
sendTimeout := 1 * time.Millisecond
for {
var err error
select {
case p := <-s.receivedPackets:
err = s.handlePacket(p.addr, p.publicHeader, p.r)
case <-time.After(sendTimeout):
err = s.sendPacket()
}
if err != nil {
fmt.Printf("Error in session: %s\n", err.Error())
}
}
}
func (s *Session) handlePacket(addr *net.UDPAddr, publicHeader *PublicHeader, r *bytes.Reader) error {
// TODO: Only do this after authenticating
if addr != s.CurrentRemoteAddr {
s.CurrentRemoteAddr = addr
}
packet, err := s.unpacker.Unpack(publicHeaderBinary, publicHeader, r)
packet, err := s.unpacker.Unpack(publicHeader.Raw, publicHeader, r)
if err != nil {
s.Close(err)
return err
}
s.incomingAckHandler.ReceivedPacket(publicHeader.PacketNumber, packet.entropyBit)
s.SendFrame(s.incomingAckHandler.DequeueAckFrame())
s.QueueFrame(s.incomingAckHandler.DequeueAckFrame())
for _, ff := range packet.frames {
var err error
@@ -96,9 +118,12 @@ func (s *Session) HandlePacket(addr *net.UDPAddr, publicHeaderBinary []byte, pub
return err
}
}
return nil
}
s.batchMode = false
return s.sendPackets()
// HandlePacket handles a packet
func (s *Session) HandlePacket(addr *net.UDPAddr, publicHeader *PublicHeader, r *bytes.Reader) {
s.receivedPackets <- receivedPacket{addr: addr, publicHeader: publicHeader, r: r}
}
func (s *Session) handleStreamFrame(frame *frames.StreamFrame) error {
@@ -106,9 +131,7 @@ func (s *Session) handleStreamFrame(frame *frames.StreamFrame) error {
if frame.StreamID == 0 {
return errors.New("Session: 0 is not a valid Stream ID")
}
s.streamsMutex.RLock()
stream, newStream := s.Streams[frame.StreamID]
s.streamsMutex.RUnlock()
if !newStream {
stream, _ = s.NewStream(frame.StreamID)
@@ -132,47 +155,41 @@ func (s *Session) Close(e error) error {
errorCode = quicError.ErrorCode
}
s.batchMode = false
return s.SendFrame(&frames.ConnectionCloseFrame{
return s.QueueFrame(&frames.ConnectionCloseFrame{
ErrorCode: errorCode,
ReasonPhrase: reasonPhrase,
})
}
func (s *Session) sendPackets() error {
for {
packet, err := s.packer.PackPacket()
if err != nil {
return err
}
if packet == nil {
return nil
}
s.outgoingAckHandler.SentPacket(&ackhandler.Packet{
PacketNumber: packet.number,
Plaintext: packet.payload,
EntropyBit: packet.entropyBit,
})
fmt.Printf("-> Sending packet %d (%d bytes)\n", packet.number, len(packet.raw))
_, err = s.Connection.WriteToUDP(packet.raw, s.CurrentRemoteAddr)
if err != nil {
return err
}
func (s *Session) sendPacket() error {
packet, err := s.packer.PackPacket()
if err != nil {
return err
}
}
// SendFrame sends a frame to the client
func (s *Session) SendFrame(frame frames.Frame) error {
s.packer.AddFrame(frame)
if s.batchMode {
if packet == nil {
return nil
}
return s.sendPackets()
s.outgoingAckHandler.SentPacket(&ackhandler.Packet{
PacketNumber: packet.number,
Plaintext: packet.payload,
EntropyBit: packet.entropyBit,
})
fmt.Printf("-> Sending packet %d (%d bytes)\n", packet.number, len(packet.raw))
_, err = s.Connection.WriteToUDP(packet.raw, s.CurrentRemoteAddr)
if err != nil {
return err
}
return nil
}
// QueueFrame queues a frame for sending to the client
func (s *Session) QueueFrame(frame frames.Frame) error {
s.packer.AddFrame(frame)
return nil
}
// NewStream creates a new strean open for reading and writing
func (s *Session) NewStream(id protocol.StreamID) (*Stream, error) {
s.streamsMutex.Lock()
defer s.streamsMutex.Unlock()
stream := NewStream(s, id)
if s.Streams[id] != nil {
return nil, fmt.Errorf("Session: stream with ID %d already exists", id)

View File

@@ -108,7 +108,7 @@ func (s *Stream) ReadByte() (byte, error) {
func (s *Stream) Write(p []byte) (int, error) {
data := make([]byte, len(p))
copy(data, p)
err := s.Session.SendFrame(&frames.StreamFrame{
err := s.Session.QueueFrame(&frames.StreamFrame{
StreamID: s.StreamID,
Offset: s.WriteOffset,
Data: data,
@@ -123,7 +123,7 @@ func (s *Stream) Write(p []byte) (int, error) {
// Close imlpements io.Closer
func (s *Stream) Close() error {
fmt.Printf("Closing stream %d\n", s.StreamID)
return s.Session.SendFrame(&frames.StreamFrame{
return s.Session.QueueFrame(&frames.StreamFrame{
StreamID: s.StreamID,
Offset: s.WriteOffset,
FinBit: true,