1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use std::{
    cmp::{Ord, Ordering},
    collections::VecDeque,
    fmt,
    fs::{File, OpenOptions},
    io::BufReader,
    num::NonZeroUsize,
    path::{Path, PathBuf},
};

use anyhow::Result;

use necsim_core::event::PackedEvent;

use crate::event_log::EventLogHeader;

#[allow(clippy::module_name_repetitions)]
pub struct SortedSegment {
    path: PathBuf,
    header: EventLogHeader,
    reader: BufReader<File>,
    buffer: VecDeque<PackedEvent>,
    capacity: NonZeroUsize,
}

impl fmt::Debug for SortedSegment {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        fmt.debug_struct(stringify!(SortedSegment))
            .field("path", &self.path)
            .field("header", &self.header)
            .finish_non_exhaustive()
    }
}

impl SortedSegment {
    /// # Errors
    ///
    /// Fails if the `path` cannot be read as an event log segment
    pub fn try_new(path: &Path, capacity: NonZeroUsize) -> Result<Self> {
        let file = OpenOptions::new().read(true).write(false).open(path)?;

        let mut buf_reader = BufReader::new(file);

        let header: EventLogHeader = bincode::deserialize_from(&mut buf_reader)?;

        let mut buffer = VecDeque::with_capacity(header.length.min(capacity.get()));

        if let Ok(event) = bincode::deserialize_from(&mut buf_reader) {
            buffer.push_back(event);
        }

        Ok(Self {
            path: path.to_owned(),
            header,
            reader: buf_reader,
            buffer,
            capacity,
        })
    }

    pub fn set_capacity(&mut self, capacity: NonZeroUsize) {
        if let Some(additional) = capacity.get().checked_sub(self.capacity.get()) {
            self.buffer.reserve(additional);
        }

        self.capacity = capacity;
    }

    #[must_use]
    pub fn header(&self) -> &EventLogHeader {
        &self.header
    }

    #[must_use]
    pub fn length(&self) -> usize {
        self.header.length()
    }

    #[must_use]
    pub fn capacity(&self) -> NonZeroUsize {
        self.capacity
    }

    #[must_use]
    pub fn path(&self) -> &Path {
        &self.path
    }
}

impl Iterator for SortedSegment {
    type Item = PackedEvent;

    fn next(&mut self) -> Option<Self::Item> {
        let next_event = self.buffer.pop_front();

        if next_event.is_some() && self.buffer.is_empty() {
            for _ in 0..self.capacity.get() {
                if let Ok(event) = bincode::deserialize_from(&mut self.reader) {
                    self.buffer.push_back(event);
                } else {
                    break;
                }
            }
        }

        next_event
    }
}

impl Ord for SortedSegment {
    fn cmp(&self, other: &Self) -> Ordering {
        match (self.buffer.front(), other.buffer.front()) {
            (None, None) => Ordering::Equal,
            (None, _) => Ordering::Less,
            (_, None) => Ordering::Greater,
            (Some(this_event), Some(other_event)) => other_event.cmp(this_event),
        }
    }
}

impl PartialOrd for SortedSegment {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl PartialEq for SortedSegment {
    fn eq(&self, other: &Self) -> bool {
        self.buffer.front() == other.buffer.front()
    }
}

impl Eq for SortedSegment {}