From 82acc7f2b7620c2419368cdc454de63bfd7f12b5 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 30 May 2019 03:16:45 +0800 Subject: [PATCH] introduce invalid stream ID to simplify the outgoing streams map --- internal/protocol/stream_id.go | 6 +++++- internal/protocol/stream_id_test.go | 4 ++++ internal/wire/log_test.go | 4 ++-- streams_map_outgoing_bidi.go | 15 +++++++-------- streams_map_outgoing_generic.go | 15 +++++++-------- streams_map_outgoing_uni.go | 15 +++++++-------- 6 files changed, 32 insertions(+), 27 deletions(-) diff --git a/internal/protocol/stream_id.go b/internal/protocol/stream_id.go index b96e0c2b..ba26eeb1 100644 --- a/internal/protocol/stream_id.go +++ b/internal/protocol/stream_id.go @@ -1,7 +1,11 @@ package protocol // A StreamID in QUIC -type StreamID uint64 +type StreamID int64 + +// InvalidPacketNumber is a stream ID that is invalid. +// The first valid stream ID in QUIC is 0. +const InvalidStreamID = -1 // StreamType encodes if this is a unidirectional or bidirectional stream type StreamType uint8 diff --git a/internal/protocol/stream_id_test.go b/internal/protocol/stream_id_test.go index 8c992484..c6c21132 100644 --- a/internal/protocol/stream_id_test.go +++ b/internal/protocol/stream_id_test.go @@ -6,6 +6,10 @@ import ( ) var _ = Describe("Stream ID", func() { + It("InvalidStreamID is smaller than all valid stream IDs", func() { + Expect(InvalidStreamID).To(BeNumerically("<", 0)) + }) + It("says who initiated a stream", func() { Expect(StreamID(4).InitiatedBy()).To(Equal(PerspectiveClient)) Expect(StreamID(5).InitiatedBy()).To(Equal(PerspectiveServer)) diff --git a/internal/wire/log_test.go b/internal/wire/log_test.go index f611e5f4..7177da48 100644 --- a/internal/wire/log_test.go +++ b/internal/wire/log_test.go @@ -38,12 +38,12 @@ var _ = Describe("Frame logging", func() { It("logs sent frames", func() { LogFrame(logger, &ResetStreamFrame{}, true) - Expect(buf.Bytes()).To(ContainSubstring("\t-> &wire.ResetStreamFrame{StreamID:0x0, ErrorCode:0x0, ByteOffset:0x0}\n")) + Expect(buf.Bytes()).To(ContainSubstring("\t-> &wire.ResetStreamFrame{StreamID:0, ErrorCode:0x0, ByteOffset:0x0}\n")) }) It("logs received frames", func() { LogFrame(logger, &ResetStreamFrame{}, false) - Expect(buf.Bytes()).To(ContainSubstring("\t<- &wire.ResetStreamFrame{StreamID:0x0, ErrorCode:0x0, ByteOffset:0x0}\n")) + Expect(buf.Bytes()).To(ContainSubstring("\t<- &wire.ResetStreamFrame{StreamID:0, ErrorCode:0x0, ByteOffset:0x0}\n")) }) It("logs CRYPTO frames", func() { diff --git a/streams_map_outgoing_bidi.go b/streams_map_outgoing_bidi.go index a4457775..1c174f45 100644 --- a/streams_map_outgoing_bidi.go +++ b/streams_map_outgoing_bidi.go @@ -19,10 +19,9 @@ type outgoingBidiStreamsMap struct { streams map[protocol.StreamID]streamI - nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) - maxStream protocol.StreamID // the maximum stream ID we're allowed to open - maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) - blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream + nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) + maxStream protocol.StreamID // the maximum stream ID we're allowed to open + blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream newStream func(protocol.StreamID) streamI queueStreamIDBlocked func(*wire.StreamsBlockedFrame) @@ -38,6 +37,7 @@ func newOutgoingBidiStreamsMap( m := &outgoingBidiStreamsMap{ streams: make(map[protocol.StreamID]streamI), nextStream: nextStream, + maxStream: protocol.InvalidStreamID, newStream: newStream, queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) }, } @@ -80,9 +80,9 @@ func (m *outgoingBidiStreamsMap) OpenStreamSync() (streamI, error) { } func (m *outgoingBidiStreamsMap) openStreamImpl() (streamI, error) { - if !m.maxStreamSet || m.nextStream > m.maxStream { + if m.nextStream > m.maxStream { if !m.blockedSent { - if m.maxStreamSet { + if m.maxStream != protocol.InvalidStreamID { m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ Type: protocol.StreamTypeBidi, StreamLimit: m.maxStream.StreamNum(), @@ -127,9 +127,8 @@ func (m *outgoingBidiStreamsMap) DeleteStream(id protocol.StreamID) error { func (m *outgoingBidiStreamsMap) SetMaxStream(id protocol.StreamID) { m.mutex.Lock() - if !m.maxStreamSet || id > m.maxStream { + if id > m.maxStream { m.maxStream = id - m.maxStreamSet = true m.blockedSent = false m.cond.Broadcast() } diff --git a/streams_map_outgoing_generic.go b/streams_map_outgoing_generic.go index c0657b90..ea84421d 100644 --- a/streams_map_outgoing_generic.go +++ b/streams_map_outgoing_generic.go @@ -17,10 +17,9 @@ type outgoingItemsMap struct { streams map[protocol.StreamID]item - nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) - maxStream protocol.StreamID // the maximum stream ID we're allowed to open - maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) - blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream + nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) + maxStream protocol.StreamID // the maximum stream ID we're allowed to open + blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream newStream func(protocol.StreamID) item queueStreamIDBlocked func(*wire.StreamsBlockedFrame) @@ -36,6 +35,7 @@ func newOutgoingItemsMap( m := &outgoingItemsMap{ streams: make(map[protocol.StreamID]item), nextStream: nextStream, + maxStream: protocol.InvalidStreamID, newStream: newStream, queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) }, } @@ -78,9 +78,9 @@ func (m *outgoingItemsMap) OpenStreamSync() (item, error) { } func (m *outgoingItemsMap) openStreamImpl() (item, error) { - if !m.maxStreamSet || m.nextStream > m.maxStream { + if m.nextStream > m.maxStream { if !m.blockedSent { - if m.maxStreamSet { + if m.maxStream != protocol.InvalidStreamID { m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ Type: streamTypeGeneric, StreamLimit: m.maxStream.StreamNum(), @@ -125,9 +125,8 @@ func (m *outgoingItemsMap) DeleteStream(id protocol.StreamID) error { func (m *outgoingItemsMap) SetMaxStream(id protocol.StreamID) { m.mutex.Lock() - if !m.maxStreamSet || id > m.maxStream { + if id > m.maxStream { m.maxStream = id - m.maxStreamSet = true m.blockedSent = false m.cond.Broadcast() } diff --git a/streams_map_outgoing_uni.go b/streams_map_outgoing_uni.go index a38240a6..8c856c1e 100644 --- a/streams_map_outgoing_uni.go +++ b/streams_map_outgoing_uni.go @@ -19,10 +19,9 @@ type outgoingUniStreamsMap struct { streams map[protocol.StreamID]sendStreamI - nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) - maxStream protocol.StreamID // the maximum stream ID we're allowed to open - maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) - blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream + nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) + maxStream protocol.StreamID // the maximum stream ID we're allowed to open + blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream newStream func(protocol.StreamID) sendStreamI queueStreamIDBlocked func(*wire.StreamsBlockedFrame) @@ -38,6 +37,7 @@ func newOutgoingUniStreamsMap( m := &outgoingUniStreamsMap{ streams: make(map[protocol.StreamID]sendStreamI), nextStream: nextStream, + maxStream: protocol.InvalidStreamID, newStream: newStream, queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) }, } @@ -80,9 +80,9 @@ func (m *outgoingUniStreamsMap) OpenStreamSync() (sendStreamI, error) { } func (m *outgoingUniStreamsMap) openStreamImpl() (sendStreamI, error) { - if !m.maxStreamSet || m.nextStream > m.maxStream { + if m.nextStream > m.maxStream { if !m.blockedSent { - if m.maxStreamSet { + if m.maxStream != protocol.InvalidStreamID { m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ Type: protocol.StreamTypeUni, StreamLimit: m.maxStream.StreamNum(), @@ -127,9 +127,8 @@ func (m *outgoingUniStreamsMap) DeleteStream(id protocol.StreamID) error { func (m *outgoingUniStreamsMap) SetMaxStream(id protocol.StreamID) { m.mutex.Lock() - if !m.maxStreamSet || id > m.maxStream { + if id > m.maxStream { m.maxStream = id - m.maxStreamSet = true m.blockedSent = false m.cond.Broadcast() }