Compare commits
11 Commits
290756f7d9
...
2992284f37
Author | SHA1 | Date |
---|---|---|
|
2992284f37 | |
|
968cc4204b | |
|
5e45b94a6a | |
|
1a00a785eb | |
|
4b3316a19e | |
|
5afae607ab | |
|
f3e973070d | |
|
09689c5700 | |
|
f375216d39 | |
|
85e9d0dd9e | |
|
f48b5b63b8 |
|
@ -1,8 +1,9 @@
|
||||||
[package]
|
[package]
|
||||||
name = "timer_heap"
|
name = "timer_heap"
|
||||||
version = "0.1.1"
|
version = "0.3.0"
|
||||||
authors = ["Andrew J. Stone <andrew.j.stone.1@gmail.com>"]
|
authors = ["Andrew J. Stone <andrew.j.stone.1@gmail.com>"]
|
||||||
description = "A binary heap based timer management system"
|
description = "A binary heap based timer management system"
|
||||||
|
repository = "https://github.com/andrewjstone/timer_heap"
|
||||||
keywords = ["timer", "heap"]
|
keywords = ["timer", "heap"]
|
||||||
license = "Apache-2.0"
|
license = "Apache-2.0"
|
||||||
|
|
||||||
|
|
230
src/lib.rs
230
src/lib.rs
|
@ -30,6 +30,8 @@ use std::collections::{BinaryHeap, HashMap};
|
||||||
use std::cmp::{Ordering, Ord, PartialOrd, PartialEq};
|
use std::cmp::{Ordering, Ord, PartialOrd, PartialEq};
|
||||||
use std::time::{Instant, Duration};
|
use std::time::{Instant, Duration};
|
||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
|
use std::fmt::{self, Debug};
|
||||||
|
use std::convert::From;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
@ -46,6 +48,42 @@ pub enum TimerType {
|
||||||
Recurring
|
Recurring
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An Iterator over expired timers
|
||||||
|
pub struct Expired<'a, T> where T: 'a {
|
||||||
|
now: Instant,
|
||||||
|
heap: &'a mut TimerHeap<T>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> Iterator for Expired<'a, T> where T: Eq + Clone + Hash {
|
||||||
|
type Item = T;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<T> {
|
||||||
|
while let Some(mut popped) = self.heap.timers.pop() {
|
||||||
|
if popped.expires_at <= self.now {
|
||||||
|
if self.heap.active.get(&popped.key) != Some(&popped.counter) {
|
||||||
|
// Drop an old deleted timer
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if popped.recurring {
|
||||||
|
let key = popped.key.clone();
|
||||||
|
// We use the expires_at time so we don't keep skewing later and later
|
||||||
|
// by adding the duration to the current time.
|
||||||
|
popped.expires_at += popped.duration;
|
||||||
|
self.heap.timers.push(popped);
|
||||||
|
return Some(key);
|
||||||
|
} else {
|
||||||
|
let _ = self.heap.active.remove(&popped.key);
|
||||||
|
return Some(popped.key);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.heap.timers.push(popped);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Store timers in a binary heap. Keep them sorted by which timer is going to expire first.
|
/// Store timers in a binary heap. Keep them sorted by which timer is going to expire first.
|
||||||
pub struct TimerHeap<T> {
|
pub struct TimerHeap<T> {
|
||||||
timers: BinaryHeap<TimerEntry<T>>,
|
timers: BinaryHeap<TimerEntry<T>>,
|
||||||
|
@ -62,6 +100,17 @@ pub struct TimerHeap<T> {
|
||||||
counter: u64
|
counter: u64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T:Debug + Eq + Clone + Hash + Ord> Debug for TimerHeap<T> {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
fmt.debug_map()
|
||||||
|
.entries(self.timers
|
||||||
|
.iter()
|
||||||
|
.filter(|e| self.is_active(e))
|
||||||
|
.map(|e| (&e.key, DebugEntry::from(e))))
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<T: Eq + Clone + Hash> TimerHeap<T> {
|
impl<T: Eq + Clone + Hash> TimerHeap<T> {
|
||||||
/// Create a new TimerHeap
|
/// Create a new TimerHeap
|
||||||
pub fn new() -> TimerHeap<T> {
|
pub fn new() -> TimerHeap<T> {
|
||||||
|
@ -77,7 +126,9 @@ impl<T: Eq + Clone + Hash> TimerHeap<T> {
|
||||||
self.timers.len()
|
self.timers.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert a TimerEntry into the heap
|
/// Insert a timer into the heap
|
||||||
|
///
|
||||||
|
/// Return an error if the key already exists.
|
||||||
pub fn insert(&mut self, key: T, duration: Duration, ty: TimerType) -> Result<(), Error> {
|
pub fn insert(&mut self, key: T, duration: Duration, ty: TimerType) -> Result<(), Error> {
|
||||||
self._insert(key, duration, ty, Instant::now())
|
self._insert(key, duration, ty, Instant::now())
|
||||||
}
|
}
|
||||||
|
@ -94,6 +145,17 @@ impl<T: Eq + Clone + Hash> TimerHeap<T> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Insert a timer into the heap, replacing any existing timer if one exists
|
||||||
|
///
|
||||||
|
/// Return true if a timer already existed in the heap, false otherwise
|
||||||
|
pub fn upsert(&mut self, key: T, duration: Duration, ty: TimerType) -> bool {
|
||||||
|
let entry = TimerEntry::new(key.clone(), duration, ty, Instant::now(), self.counter);
|
||||||
|
self.timers.push(entry);
|
||||||
|
let existed = self.active.insert(key, self.counter).is_some();
|
||||||
|
self.counter += 1;
|
||||||
|
existed
|
||||||
|
}
|
||||||
|
|
||||||
/// Remove a TimerEnry by Id
|
/// Remove a TimerEnry by Id
|
||||||
///
|
///
|
||||||
/// Return true if it exists, false otherwise
|
/// Return true if it exists, false otherwise
|
||||||
|
@ -101,80 +163,57 @@ impl<T: Eq + Clone + Hash> TimerHeap<T> {
|
||||||
self.active.remove(&key).is_some()
|
self.active.remove(&key).is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the amount of time remaining (in ms) for the earliest expiring timer
|
/// Return the amount of time remaining for the earliest expiring timer.
|
||||||
/// Return `None` if there are no timers in the heap
|
/// Return `None` if there are no timers in the heap.
|
||||||
pub fn time_remaining(&self) -> Option<u64> {
|
pub fn time_remaining(&self) -> Option<Duration> {
|
||||||
self._time_remaining(Instant::now())
|
self._time_remaining(Instant::now())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A deterministically testable version of `time_remaining()`
|
/// A deterministically testable version of `time_remaining()`
|
||||||
fn _time_remaining(&self, now: Instant) -> Option<u64> {
|
fn _time_remaining(&self, now: Instant) -> Option<Duration> {
|
||||||
self.timers.iter().find(|e| {
|
self.timers
|
||||||
self.active.get(&e.key) == Some(&e.counter)
|
.iter()
|
||||||
}).map(|e| {
|
.find(|e| self.is_active(e))
|
||||||
if now > e.expires_at {
|
.map(|e| {
|
||||||
return 0;
|
if now > e.expires_at {
|
||||||
}
|
return Duration::new(0, 0);
|
||||||
let duration = e.expires_at - now;
|
}
|
||||||
// We add a millisecond if there is a fractional ms milliseconds in
|
e.expires_at - now
|
||||||
// duration.subsec_nanos() / 1000000 so that we never fire early.
|
})
|
||||||
let nanos = duration.subsec_nanos() as u64;
|
|
||||||
// TODO: This can almost certainly be done faster
|
|
||||||
let subsec_ms = nanos / 1000000;
|
|
||||||
let mut remaining = duration.as_secs()*1000 + subsec_ms;
|
|
||||||
if subsec_ms * 1000000 < nanos {
|
|
||||||
remaining += 1;
|
|
||||||
}
|
|
||||||
remaining
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the earliest timeout based on a user timeout and the least remaining time in the
|
/// Return the earliest timeout based on a user timeout and the least remaining time in the
|
||||||
/// next timer to fire.
|
/// next timer to fire.
|
||||||
pub fn earliest_timeout(&self, user_timeout_ms: usize) -> usize {
|
pub fn earliest_timeout(&self, user_timeout: Duration) -> Duration {
|
||||||
if let Some(remaining) = self.time_remaining() {
|
if let Some(remaining) = self.time_remaining() {
|
||||||
if user_timeout_ms < remaining as usize {
|
if user_timeout < remaining {
|
||||||
user_timeout_ms
|
user_timeout
|
||||||
} else {
|
} else {
|
||||||
remaining as usize
|
remaining
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
user_timeout_ms
|
user_timeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return all expired keys
|
/// Return all expired keys
|
||||||
///
|
///
|
||||||
/// Any recurring timers will be re-added to the heap in the correct spot
|
/// Any recurring timers will be re-added to the heap in the correct spot
|
||||||
pub fn expired(&mut self) -> Vec<T> {
|
pub fn expired(&mut self) -> Expired<T> {
|
||||||
self._expired(Instant::now())
|
self._expired(Instant::now())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A deterministically testable version of `expired()`
|
/// A deterministically testable version of `expired()`
|
||||||
fn _expired(&mut self, now: Instant) -> Vec<T> {
|
fn _expired(&mut self, now: Instant) -> Expired<T> {
|
||||||
let mut expired = Vec::new();
|
Expired {
|
||||||
while let Some(mut popped) = self.timers.pop() {
|
now: now,
|
||||||
if popped.expires_at <= now {
|
heap: self
|
||||||
if self.active.get(&popped.key) != Some(&popped.counter) {
|
|
||||||
// Drop an old deleted timer
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if popped.recurring {
|
|
||||||
expired.push(popped.key.clone());
|
|
||||||
// We use the expired_at time so we don't keep skewing later and later
|
|
||||||
// by adding the duration to the current time.
|
|
||||||
popped.expires_at += popped.duration;
|
|
||||||
self.timers.push(popped);
|
|
||||||
} else {
|
|
||||||
let _ = self.active.remove(&popped.key);
|
|
||||||
expired.push(popped.key)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.timers.push(popped);
|
|
||||||
return expired;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
expired
|
}
|
||||||
|
|
||||||
|
/// Is a given entry still active ?
|
||||||
|
fn is_active(&self, entry: &TimerEntry<T>) -> bool {
|
||||||
|
self.active.get(&entry.key) == Some(&entry.counter)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,6 +226,33 @@ struct TimerEntry<T> {
|
||||||
counter: u64
|
counter: u64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A timer entry used only when debug formatting a TimerHeap
|
||||||
|
struct DebugEntry {
|
||||||
|
recurring: bool,
|
||||||
|
expires_at: Instant,
|
||||||
|
duration: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Debug for DebugEntry {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
fmt.debug_struct("Timer")
|
||||||
|
.field("recurring", &self.recurring)
|
||||||
|
.field("expires_at", &self.expires_at)
|
||||||
|
.field("duration", &self.duration)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> From<&'a TimerEntry<T>> for DebugEntry {
|
||||||
|
fn from(e: &TimerEntry<T>) -> Self {
|
||||||
|
DebugEntry {
|
||||||
|
recurring: e.recurring,
|
||||||
|
expires_at: e.expires_at,
|
||||||
|
duration: e.duration
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<T> TimerEntry<T> {
|
impl<T> TimerEntry<T> {
|
||||||
pub fn new(key: T,
|
pub fn new(key: T,
|
||||||
duration: Duration,
|
duration: Duration,
|
||||||
|
@ -237,19 +303,29 @@ mod tests {
|
||||||
use super::{TimerHeap, TimerType, Error};
|
use super::{TimerHeap, TimerType, Error};
|
||||||
use std::time::{Instant, Duration};
|
use std::time::{Instant, Duration};
|
||||||
|
|
||||||
|
// Run this test with `cargo test -- --nocapture` to see the debug output
|
||||||
#[test]
|
#[test]
|
||||||
fn time_remaining() {
|
fn time_remaining() {
|
||||||
let mut heap = TimerHeap::new();
|
let mut heap = TimerHeap::new();
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let duration = Duration::from_millis(500);
|
let duration = Duration::from_millis(500);
|
||||||
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
|
heap._insert(1u64, duration, TimerType::Oneshot, now)
|
||||||
assert_matches!(heap._time_remaining(now), Some(500));
|
.unwrap();
|
||||||
assert_matches!(heap._time_remaining(now + duration), Some(0));
|
println!("Active Oneshot Timer: {:?}", heap);
|
||||||
assert_matches!(heap._time_remaining(now + duration + Duration::from_millis(100)),
|
assert_eq!(heap._time_remaining(now), Some(Duration::from_millis(500)));
|
||||||
Some(0));
|
assert_eq!(
|
||||||
|
heap._time_remaining(now + duration),
|
||||||
|
Some(Duration::new(0, 0))
|
||||||
|
);
|
||||||
|
println!("Expired Oneshot Timer: {:?}", heap);
|
||||||
|
assert_eq!(
|
||||||
|
heap._time_remaining(now + duration + Duration::from_millis(100)),
|
||||||
|
Some(Duration::new(0, 0))
|
||||||
|
);
|
||||||
assert_eq!(heap.remove(2), false);
|
assert_eq!(heap.remove(2), false);
|
||||||
assert!(heap.remove(1));
|
assert!(heap.remove(1));
|
||||||
assert_matches!(heap._time_remaining(now), None);
|
println!("Empty heap: {:?}", heap);
|
||||||
|
assert_eq!(heap._time_remaining(now), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -258,12 +334,12 @@ mod tests {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let duration = Duration::from_millis(500);
|
let duration = Duration::from_millis(500);
|
||||||
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
|
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
|
||||||
assert_eq!(heap._expired(now), vec![]);
|
assert_eq!(heap._expired(now).count(), 0);
|
||||||
let v = heap._expired(now + duration);
|
let count = heap._expired(now + duration).count();
|
||||||
assert_eq!(heap.active.len(), 0);
|
assert_eq!(heap.active.len(), 0);
|
||||||
assert_eq!(v.len(), 1);
|
assert_eq!(count, 1);
|
||||||
assert_eq!(heap.len(), 0);
|
assert_eq!(heap.len(), 0);
|
||||||
assert_eq!(heap._expired(now + duration), vec![]);
|
assert_eq!(heap._expired(now + duration).next(), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -272,15 +348,15 @@ mod tests {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let duration = Duration::from_millis(500);
|
let duration = Duration::from_millis(500);
|
||||||
heap._insert(1u64, duration, TimerType::Recurring, now).unwrap();
|
heap._insert(1u64, duration, TimerType::Recurring, now).unwrap();
|
||||||
assert_eq!(heap._expired(now), vec![]);
|
assert_eq!(heap._expired(now).count(), 0);
|
||||||
let v = heap._expired(now + duration);
|
let count = heap._expired(now + duration).count();
|
||||||
assert_eq!(v.len(), 1);
|
assert_eq!(count, 1);
|
||||||
assert_eq!(heap.len(), 1);
|
assert_eq!(heap.len(), 1);
|
||||||
assert_eq!(heap._expired(now + duration + Duration::from_millis(1)), vec![]);
|
assert_eq!(heap._expired(now + duration + Duration::from_millis(1)).count(), 0);
|
||||||
let v = heap._expired(now + duration + duration);
|
let count = heap._expired(now + duration + duration).count();
|
||||||
assert_eq!(v.len(), 1);
|
assert_eq!(count, 1);
|
||||||
assert_eq!(heap.len(), 1);
|
assert_eq!(heap.len(), 1);
|
||||||
assert_eq!(heap._expired(now + duration + duration), vec![]);
|
assert_eq!(heap._expired(now + duration + duration).count(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -298,7 +374,7 @@ mod tests {
|
||||||
let duration = Duration::from_millis(500);
|
let duration = Duration::from_millis(500);
|
||||||
heap._insert(1u64, duration, TimerType::Recurring, now).unwrap();
|
heap._insert(1u64, duration, TimerType::Recurring, now).unwrap();
|
||||||
assert_eq!(heap.remove(1u64), true);
|
assert_eq!(heap.remove(1u64), true);
|
||||||
assert_eq!(heap._expired(now + duration), vec![]);
|
assert_eq!(heap._expired(now + duration).count(), 0);
|
||||||
assert_eq!(heap.len(), 0);
|
assert_eq!(heap.len(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,9 +386,19 @@ mod tests {
|
||||||
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
|
heap._insert(1u64, duration, TimerType::Oneshot, now).unwrap();
|
||||||
assert_eq!(heap.remove(1u64), true);
|
assert_eq!(heap.remove(1u64), true);
|
||||||
heap._insert(1u64, duration, TimerType::Oneshot, now + duration).unwrap();
|
heap._insert(1u64, duration, TimerType::Oneshot, now + duration).unwrap();
|
||||||
let v = heap._expired(now + duration + duration);
|
assert_eq!(heap._expired(now + duration + duration).count(), 1);
|
||||||
assert_eq!(v.len(), 1);
|
|
||||||
assert_eq!(heap.active.len(), 0);
|
assert_eq!(heap.active.len(), 0);
|
||||||
assert_eq!(heap.len(), 0);
|
assert_eq!(heap.len(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn upsert() {
|
||||||
|
let mut heap = TimerHeap::new();
|
||||||
|
let duration = Duration::from_millis(500);
|
||||||
|
heap.insert(1u64, duration, TimerType::Oneshot).unwrap();
|
||||||
|
assert_eq!(heap.upsert(1u64, duration, TimerType::Oneshot), true);
|
||||||
|
assert_eq!(heap.remove(1u64), true);
|
||||||
|
assert_eq!(heap.upsert(1u64, duration, TimerType::Oneshot), false);
|
||||||
|
assert_eq!(heap.upsert(1u64, duration, TimerType::Oneshot), true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue