Compare commits
11 Commits
290756f7d9
...
2992284f37
Author | SHA1 | Date |
---|---|---|
Andrew J. Stone | 2992284f37 | |
Andrew J. Stone | 968cc4204b | |
Andrew J. Stone | 5e45b94a6a | |
Andrew J. Stone | 1a00a785eb | |
Jon Gjengset | 4b3316a19e | |
Andrew J. Stone | 5afae607ab | |
Andrew J. Stone | f3e973070d | |
Andrew J. Stone | 09689c5700 | |
Andrew J. Stone | f375216d39 | |
Andrew J. Stone | 85e9d0dd9e | |
Jon Gjengset | f48b5b63b8 |
|
@ -1,8 +1,9 @@
|
|||
[package]
|
||||
name = "timer_heap"
|
||||
version = "0.1.1"
|
||||
version = "0.3.0"
|
||||
authors = ["Andrew J. Stone <andrew.j.stone.1@gmail.com>"]
|
||||
description = "A binary heap based timer management system"
|
||||
repository = "https://github.com/andrewjstone/timer_heap"
|
||||
keywords = ["timer", "heap"]
|
||||
license = "Apache-2.0"
|
||||
|
||||
|
|
230
src/lib.rs
230
src/lib.rs
|
@ -30,6 +30,8 @@ use std::collections::{BinaryHeap, HashMap};
|
|||
use std::cmp::{Ordering, Ord, PartialOrd, PartialEq};
|
||||
use std::time::{Instant, Duration};
|
||||
use std::hash::Hash;
|
||||
use std::fmt::{self, Debug};
|
||||
use std::convert::From;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
|
@ -46,6 +48,42 @@ pub enum TimerType {
|
|||
Recurring
|
||||
}
|
||||
|
||||
/// An Iterator over expired timers
|
||||
pub struct Expired<'a, T> where T: 'a {
|
||||
now: Instant,
|
||||
heap: &'a mut TimerHeap<T>
|
||||
}
|
||||
|
||||
impl<'a, T> Iterator for Expired<'a, T> where T: Eq + Clone + Hash {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<T> {
|
||||
while let Some(mut popped) = self.heap.timers.pop() {
|
||||
if popped.expires_at <= self.now {
|
||||
if self.heap.active.get(&popped.key) != Some(&popped.counter) {
|
||||
// Drop an old deleted timer
|
||||
continue;
|
||||
}
|
||||
if popped.recurring {
|
||||
let key = popped.key.clone();
|
||||
// We use the expires_at time so we don't keep skewing later and later
|
||||
// by adding the duration to the current time.
|
||||
popped.expires_at += popped.duration;
|
||||
self.heap.timers.push(popped);
|
||||
return Some(key);
|
||||
} else {
|
||||
let _ = self.heap.active.remove(&popped.key);
|
||||
return Some(popped.key);
|
||||
}
|
||||
} else {
|
||||
self.heap.timers.push(popped);
|
||||
return None;
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Store timers in a binary heap. Keep them sorted by which timer is going to expire first.
|
||||
pub struct TimerHeap<T> {
|
||||
timers: BinaryHeap<TimerEntry<T>>,
|
||||
|
@ -62,6 +100,17 @@ pub struct TimerHeap<T> {
|
|||
counter: u64
|
||||
}
|
||||
|
||||
impl<T:Debug + Eq + Clone + Hash + Ord> Debug for TimerHeap<T> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_map()
|
||||
.entries(self.timers
|
||||
.iter()
|
||||
.filter(|e| self.is_active(e))
|
||||
.map(|e| (&e.key, DebugEntry::from(e))))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Eq + Clone + Hash> TimerHeap<T> {
|
||||
/// Create a new TimerHeap
|
||||
pub fn new() -> TimerHeap<T> {
|
||||
|
@ -77,7 +126,9 @@ impl<T: Eq + Clone + Hash> TimerHeap<T> {
|
|||
self.timers.len()
|
||||
}
|
||||
|
||||
/// Insert a TimerEntry into the heap
|
||||
/// Insert a timer into the heap
|
||||
///
|
||||
/// Return an error if the key already exists.
|
||||
pub fn insert(&mut self, key: T, duration: Duration, ty: TimerType) -> Result<(), Error> {
|
||||
self._insert(key, duration, ty, Instant::now())
|
||||
}
|
||||
|
@ -94,6 +145,17 @@ impl<T: Eq + Clone + Hash> TimerHeap<T> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert a timer into the heap, replacing any existing timer if one exists
|
||||
///
|
||||
/// Return true if a timer already existed in the heap, false otherwise
|
||||
pub fn upsert(&mut self, key: T, duration: Duration, ty: TimerType) -> bool {
|
||||
let entry = TimerEntry::new(key.clone(), duration, ty, Instant::now(), self.counter);
|
||||
self.timers.push(entry);
|
||||
let existed = self.active.insert(key, self.counter).is_some();
|
||||
self.counter += 1;
|
||||
existed
|
||||
}
|
||||
|
||||
/// Remove a TimerEnry by Id
|
||||
///
|
||||
/// Return true if it exists, false otherwise
|
||||
|
@ -101,80 +163,57 @@ impl<T: Eq + Clone + Hash> TimerHeap<T> {
|
|||
self.active.remove(&key).is_some()
|
||||
}
|
||||
|
||||
/// Return the amount of time remaining (in ms) for the earliest expiring timer
|
||||
/// Return `None` if there are no timers in the heap
|
||||
pub fn time_remaining(&self) -> Option<u64> {
|
||||
/// Return the amount of time remaining for the earliest expiring timer.
|
||||
/// Return `None` if there are no timers in the heap.
|
||||
pub fn time_remaining(&self) -> Option<Duration> {
|
||||
self._time_remaining(Instant::now())
|
||||
}
|
||||
|
||||
/// A deterministically testable version of `time_remaining()`
|
||||
fn _time_remaining(&self, now: Instant) -> Option<u64> {
|
||||
self.timers.iter().find(|e| {
|
||||
self.active.get(&e.key) == Some(&e.counter)
|
||||
}).map(|e| {
|
||||
if now > e.expires_at {
|
||||
return 0;
|
||||
}
|
||||
let duration = e.expires_at - now;
|
||||
// We add a millisecond if there is a fractional ms milliseconds in
|
||||
// duration.subsec_nanos() / 1000000 so that we never fire early.
|
||||
let nanos = duration.subsec_nanos() as u64;
|
||||
// TODO: This can almost certainly be done faster
|
||||
let subsec_ms = nanos / 1000000;
|
||||
let mut remaining = duration.as_secs()*1000 + subsec_ms;
|
||||
if subsec_ms * 1000000 < nanos {
|
||||
remaining += 1;
|
||||
}
|
||||
remaining
|
||||
})
|
||||
fn _time_remaining(&self, now: Instant) -> Option<Duration> {
|
||||
self.timers
|
||||
.iter()
|
||||
.find(|e| self.is_active(e))
|
||||
.map(|e| {
|
||||
if now > e.expires_at {
|
||||
return Duration::new(0, 0);
|
||||
}
|
||||
e.expires_at - now
|
||||
})
|
||||
}
|
||||
|
||||
/// Return the earliest timeout based on a user timeout and the least remaining time in the
|
||||
/// next timer to fire.
|
||||
pub fn earliest_timeout(&self, user_timeout_ms: usize) -> usize {
|
||||
pub fn earliest_timeout(&self, user_timeout: Duration) -> Duration {
|
||||
if let Some(remaining) = self.time_remaining() {
|
||||
if user_timeout_ms < remaining as usize {
|
||||
user_timeout_ms
|
||||
if user_timeout < remaining {
|
||||
user_timeout
|
||||
} else {
|
||||
remaining as usize
|
||||
remaining
|
||||
}
|
||||
} else {
|
||||
user_timeout_ms
|
||||
user_timeout
|
||||
}
|
||||
}
|
||||
|
||||
/// Return all expired keys
|
||||
///
|
||||
/// Any recurring timers will be re-added to the heap in the correct spot
|
||||
pub fn expired(&mut self) -> Vec<T> {
|
||||
pub fn expired(&mut self) -> Expired<T> {
|
||||
self._expired(Instant::now())
|
||||
}
|
||||
|
||||
/// A deterministically testable version of `expired()`
|
||||
fn _expired(&mut self, now: Instant) -> Vec<T> {
|
||||
let mut expired = Vec::new();
|
||||
while let Some(mut popped) = self.timers.pop() {
|
||||
if popped.expires_at <= now {
|
||||
if self.active.get(&popped.key) != Some(&popped.counter) {
|
||||
// Drop an old deleted timer
|
||||
continue;
|
||||
}
|
||||
if popped.recurring {
|
||||
expired.push(popped.key.clone());
|
||||
// We use the expired_at time so we don't keep skewing later and later
|
||||
// by adding the duration to the current time.
|
||||
popped.expires_at += popped.duration;
|
||||
self.timers.push(popped);
|
||||
} else {
|
||||
let _ = self.active.remove(&popped.key);
|
||||
expired.push(popped.key)
|
||||
}
|
||||
} else {
|
||||
self.timers.push(popped);
|
||||
return expired;
|
||||
}
|
||||
fn _expired(&mut self, now: Instant) -> Expired<T> {
|
||||
Expired {
|
||||
now: now,
|
||||
heap: self
|
||||
}
|
||||
expired
|
||||
}
|
||||
|
||||
/// Is a given entry still active ?
|
||||
fn is_active(&self, entry: &TimerEntry<T>) -> bool {
|
||||
self.active.get(&entry.key) == Some(&entry.counter)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -187,6 +226,33 @@ struct TimerEntry<T> {
|
|||
counter: u64
|
||||
}
|
||||
|
||||
/// A timer entry used only when debug formatting a TimerHeap
|
||||
struct DebugEntry {
|
||||
recurring: bool,
|
||||
expires_at: Instant,
|
||||
duration: Duration,
|
||||
}
|
||||
|
||||
impl Debug for DebugEntry {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("Timer")
|
||||
.field("recurring", &self.recurring)
|
||||
.field("expires_at", &self.expires_at)
|
||||
.field("duration", &self.duration)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> From<&'a TimerEntry<T>> for DebugEntry {
|
||||
fn from(e: &TimerEntry<T>) -> Self {
|
||||
DebugEntry {
|
||||
recurring: e.recurring,
|
||||
expires_at: e.expires_at,
|
||||
duration: e.duration
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> TimerEntry<T> {
|
||||
pub fn new(key: T,
|
||||
duration: Duration,
|
||||
|
@ -237,19 +303,29 @@ mod tests {
|
|||
use super::{TimerHeap, TimerType, Error};
|
||||
use std::time::{Instant, Duration};
|
||||
|
||||
// Run this test with `cargo test -- --nocapture` to see the debug output
|
||||
#[test]
|
||||
fn time_remaining() {
|
||||
let mut heap = TimerHeap::new();
|
||||
let now = Instant::now();
|
||||
let duration = Duration::from_millis(500);
|
||||
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
|
||||
assert_matches!(heap._time_remaining(now), Some(500));
|
||||
assert_matches!(heap._time_remaining(now + duration), Some(0));
|
||||
assert_matches!(heap._time_remaining(now + duration + Duration::from_millis(100)),
|
||||
Some(0));
|
||||
heap._insert(1u64, duration, TimerType::Oneshot, now)
|
||||
.unwrap();
|
||||
println!("Active Oneshot Timer: {:?}", heap);
|
||||
assert_eq!(heap._time_remaining(now), Some(Duration::from_millis(500)));
|
||||
assert_eq!(
|
||||
heap._time_remaining(now + duration),
|
||||
Some(Duration::new(0, 0))
|
||||
);
|
||||
println!("Expired Oneshot Timer: {:?}", heap);
|
||||
assert_eq!(
|
||||
heap._time_remaining(now + duration + Duration::from_millis(100)),
|
||||
Some(Duration::new(0, 0))
|
||||
);
|
||||
assert_eq!(heap.remove(2), false);
|
||||
assert!(heap.remove(1));
|
||||
assert_matches!(heap._time_remaining(now), None);
|
||||
println!("Empty heap: {:?}", heap);
|
||||
assert_eq!(heap._time_remaining(now), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -258,12 +334,12 @@ mod tests {
|
|||
let now = Instant::now();
|
||||
let duration = Duration::from_millis(500);
|
||||
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
|
||||
assert_eq!(heap._expired(now), vec![]);
|
||||
let v = heap._expired(now + duration);
|
||||
assert_eq!(heap._expired(now).count(), 0);
|
||||
let count = heap._expired(now + duration).count();
|
||||
assert_eq!(heap.active.len(), 0);
|
||||
assert_eq!(v.len(), 1);
|
||||
assert_eq!(count, 1);
|
||||
assert_eq!(heap.len(), 0);
|
||||
assert_eq!(heap._expired(now + duration), vec![]);
|
||||
assert_eq!(heap._expired(now + duration).next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -272,15 +348,15 @@ mod tests {
|
|||
let now = Instant::now();
|
||||
let duration = Duration::from_millis(500);
|
||||
heap._insert(1u64, duration, TimerType::Recurring, now).unwrap();
|
||||
assert_eq!(heap._expired(now), vec![]);
|
||||
let v = heap._expired(now + duration);
|
||||
assert_eq!(v.len(), 1);
|
||||
assert_eq!(heap._expired(now).count(), 0);
|
||||
let count = heap._expired(now + duration).count();
|
||||
assert_eq!(count, 1);
|
||||
assert_eq!(heap.len(), 1);
|
||||
assert_eq!(heap._expired(now + duration + Duration::from_millis(1)), vec![]);
|
||||
let v = heap._expired(now + duration + duration);
|
||||
assert_eq!(v.len(), 1);
|
||||
assert_eq!(heap._expired(now + duration + Duration::from_millis(1)).count(), 0);
|
||||
let count = heap._expired(now + duration + duration).count();
|
||||
assert_eq!(count, 1);
|
||||
assert_eq!(heap.len(), 1);
|
||||
assert_eq!(heap._expired(now + duration + duration), vec![]);
|
||||
assert_eq!(heap._expired(now + duration + duration).count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -298,7 +374,7 @@ mod tests {
|
|||
let duration = Duration::from_millis(500);
|
||||
heap._insert(1u64, duration, TimerType::Recurring, now).unwrap();
|
||||
assert_eq!(heap.remove(1u64), true);
|
||||
assert_eq!(heap._expired(now + duration), vec![]);
|
||||
assert_eq!(heap._expired(now + duration).count(), 0);
|
||||
assert_eq!(heap.len(), 0);
|
||||
}
|
||||
|
||||
|
@ -310,9 +386,19 @@ mod tests {
|
|||
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
|
||||
assert_eq!(heap.remove(1u64), true);
|
||||
heap._insert(1u64, duration, TimerType::Oneshot, now + duration).unwrap();
|
||||
let v = heap._expired(now + duration + duration);
|
||||
assert_eq!(v.len(), 1);
|
||||
assert_eq!(heap._expired(now + duration + duration).count(), 1);
|
||||
assert_eq!(heap.active.len(), 0);
|
||||
assert_eq!(heap.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn upsert() {
|
||||
let mut heap = TimerHeap::new();
|
||||
let duration = Duration::from_millis(500);
|
||||
heap.insert(1u64, duration, TimerType::Oneshot).unwrap();
|
||||
assert_eq!(heap.upsert(1u64, duration, TimerType::Oneshot), true);
|
||||
assert_eq!(heap.remove(1u64), true);
|
||||
assert_eq!(heap.upsert(1u64, duration, TimerType::Oneshot), false);
|
||||
assert_eq!(heap.upsert(1u64, duration, TimerType::Oneshot), true);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue