diff --git a/session.go b/session.go index a1eb3a707..349050372 100644 --- a/session.go +++ b/session.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "net" + "sync" "github.com/lucas-clemente/quic-go/frames" "github.com/lucas-clemente/quic-go/handshake" @@ -27,14 +28,16 @@ type Session struct { ServerConfig *handshake.ServerConfig cryptoSetup *handshake.CryptoSetup - EntropyReceived EntropyAccumulator - EntropySent EntropyAccumulator - EntropyHistory map[protocol.PacketNumber]EntropyAccumulator // ToDo: store this with the packet itself + EntropyReceived EntropyAccumulator + EntropySent EntropyAccumulator + EntropyHistory map[protocol.PacketNumber]EntropyAccumulator // ToDo: store this with the packet itself + entropyHistoryMutex sync.Mutex lastSentPacketNumber protocol.PacketNumber lastObservedPacketNumber protocol.PacketNumber - Streams map[protocol.StreamID]*Stream + Streams map[protocol.StreamID]*Stream + streamsMutex sync.RWMutex streamCallback StreamCallback } @@ -147,7 +150,10 @@ func (s *Session) handleStreamFrame(r *bytes.Reader) error { return errors.New("Session: 0 is not a valid Stream ID") } + s.streamsMutex.RLock() stream, newStream := s.Streams[frame.StreamID] + s.streamsMutex.RUnlock() + if !newStream { stream, _ = s.NewStream(frame.StreamID) } @@ -168,6 +174,8 @@ func (s *Session) handleAckFrame(r *bytes.Reader) error { return err } + s.entropyHistoryMutex.Lock() + defer s.entropyHistoryMutex.Unlock() expectedEntropy, ok := s.EntropyHistory[frame.LargestObserved] if !ok { return errors.New("No entropy value saved for received ACK packet") @@ -257,6 +265,8 @@ func (s *Session) SendFrame(frame frames.Frame) error { return err } s.EntropySent.Add(packetNumber, entropyBit) + s.entropyHistoryMutex.Lock() + defer s.entropyHistoryMutex.Unlock() s.EntropyHistory[packetNumber] = s.EntropySent ciphertext := s.cryptoSetup.Seal(s.lastSentPacketNumber, fullReply.Bytes(), framesData.Bytes()) @@ -269,6 +279,8 @@ func (s *Session) SendFrame(frame frames.Frame) error { // NewStream creates a new strean open for reading and writing func (s *Session) NewStream(id protocol.StreamID) (*Stream, error) { + s.streamsMutex.Lock() + defer s.streamsMutex.Unlock() stream := NewStream(s, id) if s.Streams[id] != nil { return nil, fmt.Errorf("Session: stream with ID %d already exists", id)