forked from quic-go/quic-go
protect maps in Stream with mutexes
This commit is contained in:
20
session.go
20
session.go
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/lucas-clemente/quic-go/frames"
|
||||
"github.com/lucas-clemente/quic-go/handshake"
|
||||
@@ -27,14 +28,16 @@ type Session struct {
|
||||
ServerConfig *handshake.ServerConfig
|
||||
cryptoSetup *handshake.CryptoSetup
|
||||
|
||||
EntropyReceived EntropyAccumulator
|
||||
EntropySent EntropyAccumulator
|
||||
EntropyHistory map[protocol.PacketNumber]EntropyAccumulator // ToDo: store this with the packet itself
|
||||
EntropyReceived EntropyAccumulator
|
||||
EntropySent EntropyAccumulator
|
||||
EntropyHistory map[protocol.PacketNumber]EntropyAccumulator // ToDo: store this with the packet itself
|
||||
entropyHistoryMutex sync.Mutex
|
||||
|
||||
lastSentPacketNumber protocol.PacketNumber
|
||||
lastObservedPacketNumber protocol.PacketNumber
|
||||
|
||||
Streams map[protocol.StreamID]*Stream
|
||||
Streams map[protocol.StreamID]*Stream
|
||||
streamsMutex sync.RWMutex
|
||||
|
||||
streamCallback StreamCallback
|
||||
}
|
||||
@@ -147,7 +150,10 @@ func (s *Session) handleStreamFrame(r *bytes.Reader) error {
|
||||
return errors.New("Session: 0 is not a valid Stream ID")
|
||||
}
|
||||
|
||||
s.streamsMutex.RLock()
|
||||
stream, newStream := s.Streams[frame.StreamID]
|
||||
s.streamsMutex.RUnlock()
|
||||
|
||||
if !newStream {
|
||||
stream, _ = s.NewStream(frame.StreamID)
|
||||
}
|
||||
@@ -168,6 +174,8 @@ func (s *Session) handleAckFrame(r *bytes.Reader) error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.entropyHistoryMutex.Lock()
|
||||
defer s.entropyHistoryMutex.Unlock()
|
||||
expectedEntropy, ok := s.EntropyHistory[frame.LargestObserved]
|
||||
if !ok {
|
||||
return errors.New("No entropy value saved for received ACK packet")
|
||||
@@ -257,6 +265,8 @@ func (s *Session) SendFrame(frame frames.Frame) error {
|
||||
return err
|
||||
}
|
||||
s.EntropySent.Add(packetNumber, entropyBit)
|
||||
s.entropyHistoryMutex.Lock()
|
||||
defer s.entropyHistoryMutex.Unlock()
|
||||
s.EntropyHistory[packetNumber] = s.EntropySent
|
||||
|
||||
ciphertext := s.cryptoSetup.Seal(s.lastSentPacketNumber, fullReply.Bytes(), framesData.Bytes())
|
||||
@@ -269,6 +279,8 @@ func (s *Session) SendFrame(frame frames.Frame) error {
|
||||
|
||||
// NewStream creates a new strean open for reading and writing
|
||||
func (s *Session) NewStream(id protocol.StreamID) (*Stream, error) {
|
||||
s.streamsMutex.Lock()
|
||||
defer s.streamsMutex.Unlock()
|
||||
stream := NewStream(s, id)
|
||||
if s.Streams[id] != nil {
|
||||
return nil, fmt.Errorf("Session: stream with ID %d already exists", id)
|
||||
|
||||
Reference in New Issue
Block a user