Return an Iterator from expired method

Return an `Expired` struct that implements Iterator from `expired()`
instead of a Vec to avoid an allocation.
This commit is contained in:
Andrew J. Stone 2017-12-09 01:17:51 -05:00
parent 290756f7d9
commit f375216d39

View File

@ -46,6 +46,42 @@ pub enum TimerType {
Recurring Recurring
} }
/// An Iterator over expired timers
pub struct Expired<'a, T> where T: 'a {
now: Instant,
heap: &'a mut TimerHeap<T>
}
impl<'a, T> Iterator for Expired<'a, T> where T: Eq + Clone + Hash {
type Item = T;
fn next(&mut self) -> Option<T> {
while let Some(mut popped) = self.heap.timers.pop() {
if popped.expires_at <= self.now {
if self.heap.active.get(&popped.key) != Some(&popped.counter) {
// Drop an old deleted timer
continue;
}
if popped.recurring {
let key = popped.key.clone();
// We use the expired_at time so we don't keep skewing later and later
// by adding the duration to the current time.
popped.expires_at += popped.duration;
self.heap.timers.push(popped);
return Some(key);
} else {
let _ = self.heap.active.remove(&popped.key);
return Some(popped.key);
}
} else {
self.heap.timers.push(popped);
return None;
}
}
None
}
}
/// Store timers in a binary heap. Keep them sorted by which timer is going to expire first. /// Store timers in a binary heap. Keep them sorted by which timer is going to expire first.
pub struct TimerHeap<T> { pub struct TimerHeap<T> {
timers: BinaryHeap<TimerEntry<T>>, timers: BinaryHeap<TimerEntry<T>>,
@ -146,35 +182,16 @@ impl<T: Eq + Clone + Hash> TimerHeap<T> {
/// Return all expired keys /// Return all expired keys
/// ///
/// Any recurring timers will be re-added to the heap in the correct spot /// Any recurring timers will be re-added to the heap in the correct spot
pub fn expired(&mut self) -> Vec<T> { pub fn expired(&mut self) -> Expired<T> {
self._expired(Instant::now()) self._expired(Instant::now())
} }
/// A deterministically testable version of `expired()` /// A deterministically testable version of `expired()`
fn _expired(&mut self, now: Instant) -> Vec<T> { fn _expired(&mut self, now: Instant) -> Expired<T> {
let mut expired = Vec::new(); Expired {
while let Some(mut popped) = self.timers.pop() { now: now,
if popped.expires_at <= now { heap: self
if self.active.get(&popped.key) != Some(&popped.counter) {
// Drop an old deleted timer
continue;
} }
if popped.recurring {
expired.push(popped.key.clone());
// We use the expired_at time so we don't keep skewing later and later
// by adding the duration to the current time.
popped.expires_at += popped.duration;
self.timers.push(popped);
} else {
let _ = self.active.remove(&popped.key);
expired.push(popped.key)
}
} else {
self.timers.push(popped);
return expired;
}
}
expired
} }
} }
@ -258,12 +275,12 @@ mod tests {
let now = Instant::now(); let now = Instant::now();
let duration = Duration::from_millis(500); let duration = Duration::from_millis(500);
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap(); heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
assert_eq!(heap._expired(now), vec![]); assert_eq!(heap._expired(now).count(), 0);
let v = heap._expired(now + duration); let count = heap._expired(now + duration).count();
assert_eq!(heap.active.len(), 0); assert_eq!(heap.active.len(), 0);
assert_eq!(v.len(), 1); assert_eq!(count, 1);
assert_eq!(heap.len(), 0); assert_eq!(heap.len(), 0);
assert_eq!(heap._expired(now + duration), vec![]); assert_eq!(heap._expired(now + duration).next(), None);
} }
#[test] #[test]
@ -272,15 +289,15 @@ mod tests {
let now = Instant::now(); let now = Instant::now();
let duration = Duration::from_millis(500); let duration = Duration::from_millis(500);
heap._insert(1u64, duration, TimerType::Recurring, now).unwrap(); heap._insert(1u64, duration, TimerType::Recurring, now).unwrap();
assert_eq!(heap._expired(now), vec![]); assert_eq!(heap._expired(now).count(), 0);
let v = heap._expired(now + duration); let count = heap._expired(now + duration).count();
assert_eq!(v.len(), 1); assert_eq!(count, 1);
assert_eq!(heap.len(), 1); assert_eq!(heap.len(), 1);
assert_eq!(heap._expired(now + duration + Duration::from_millis(1)), vec![]); assert_eq!(heap._expired(now + duration + Duration::from_millis(1)).count(), 0);
let v = heap._expired(now + duration + duration); let count = heap._expired(now + duration + duration).count();
assert_eq!(v.len(), 1); assert_eq!(count, 1);
assert_eq!(heap.len(), 1); assert_eq!(heap.len(), 1);
assert_eq!(heap._expired(now + duration + duration), vec![]); assert_eq!(heap._expired(now + duration + duration).count(), 0);
} }
#[test] #[test]
@ -298,7 +315,7 @@ mod tests {
let duration = Duration::from_millis(500); let duration = Duration::from_millis(500);
heap._insert(1u64, duration, TimerType::Recurring, now).unwrap(); heap._insert(1u64, duration, TimerType::Recurring, now).unwrap();
assert_eq!(heap.remove(1u64), true); assert_eq!(heap.remove(1u64), true);
assert_eq!(heap._expired(now + duration), vec![]); assert_eq!(heap._expired(now + duration).count(), 0);
assert_eq!(heap.len(), 0); assert_eq!(heap.len(), 0);
} }
@ -310,8 +327,7 @@ mod tests {
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap(); heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
assert_eq!(heap.remove(1u64), true); assert_eq!(heap.remove(1u64), true);
heap._insert(1u64, duration, TimerType::Oneshot, now + duration).unwrap(); heap._insert(1u64, duration, TimerType::Oneshot, now + duration).unwrap();
let v = heap._expired(now + duration + duration); assert_eq!(heap._expired(now + duration + duration).count(), 1);
assert_eq!(v.len(), 1);
assert_eq!(heap.active.len(), 0); assert_eq!(heap.active.len(), 0);
assert_eq!(heap.len(), 0); assert_eq!(heap.len(), 0);
} }