use std::{collections::BTreeMap, fs::File, io::BufWriter};
use arrow2::{
array::{FixedSizeBinaryArray, PrimitiveArray},
bitmap::MutableBitmap,
buffer::Buffer,
chunk::Chunk,
datatypes::{DataType, Field, Schema},
io::ipc::write::{FileWriter, WriteOptions},
};
use necsim_core::{landscape::IndexedLocation, lineage::GlobalLineageReference};
use necsim_core_bond::PositiveF64;
use crate::{LastEventState, SpeciesIdentity};
use super::IndividualSpeciesFeatherReporter;
impl IndividualSpeciesFeatherReporter {
pub(super) fn store_individual_origin(
&mut self,
lineage: &GlobalLineageReference,
origin: &IndexedLocation,
) {
self.origins.insert(lineage.clone(), origin.clone());
}
pub(super) fn store_individual_speciation(
&mut self,
lineage: &GlobalLineageReference,
origin: &IndexedLocation,
time: PositiveF64,
) {
let mut parent = lineage;
while let Some(parent_parent) = self.parents.get(parent) {
parent = parent_parent;
}
self.species.insert(
parent.clone(),
SpeciesIdentity::from_speciation(origin, time),
);
}
pub(super) fn store_individual_coalescence(
&mut self,
child: &GlobalLineageReference,
parent: &GlobalLineageReference,
) {
let mut child = child;
while let Some(child_parent) = self.parents.get(child) {
child = child_parent;
}
let child = child.clone();
let mut parent = parent;
while let Some(parent_parent) = self.parents.get(parent) {
parent = parent_parent;
}
let parent = parent.clone();
if child != parent {
self.parents.insert(child, parent);
}
}
pub(super) fn output_to_dataframe(mut self) -> arrow2::error::Result<()> {
let file = File::options()
.write(true)
.truncate(true)
.open(&self.output)?;
let writer = BufWriter::new(file);
let expected_fields = vec![
Field::new("id", DataType::UInt64, false),
Field::new("x", DataType::UInt32, false),
Field::new("y", DataType::UInt32, false),
Field::new("i", DataType::UInt32, false),
Field::new("parent", DataType::UInt64, false),
Field::new("species", DataType::FixedSizeBinary(24), true),
];
let mut metadata = BTreeMap::new();
metadata.insert(
String::from("last-event"),
LastEventState {
last_parent_prior_time: self.last_parent_prior_time.clone(),
last_speciation_event: self.last_speciation_event.clone(),
last_dispersal_event: self.last_dispersal_event.clone(),
}
.into_string()
.map_err(|()| {
std::io::Error::new(
std::io::ErrorKind::Other,
"failed to write metadata to species dataframe",
)
})?,
);
let mut writer = FileWriter::new(
writer,
Schema {
fields: expected_fields,
metadata,
},
None,
WriteOptions { compression: None },
);
writer.start()?;
let mut ids = Vec::with_capacity(self.origins.len());
let mut xs = Vec::with_capacity(self.origins.len());
let mut ys = Vec::with_capacity(self.origins.len());
let mut is = Vec::with_capacity(self.origins.len());
let mut parents = Vec::with_capacity(self.origins.len());
for (lineage, origin) in &self.origins {
ids.push(unsafe { lineage.clone().into_inner() });
xs.push(origin.location().x());
ys.push(origin.location().y());
is.push(origin.index());
parents.push(unsafe {
self.parents
.get(lineage)
.unwrap_or(lineage)
.clone()
.into_inner()
});
}
let mut species = Vec::with_capacity(self.origins.len() * 24);
let mut has_speciated = MutableBitmap::from_len_zeroed(self.origins.len());
let mut family = Vec::new();
for (i, lineage) in self.origins.keys().enumerate() {
let mut ancestor = lineage.clone();
while let Some(ancestor_parent) = self.parents.get(&ancestor) {
family.push(ancestor.clone());
ancestor = ancestor_parent.clone();
}
for child in family.drain(..) {
self.parents.insert(child, ancestor.clone());
}
if let Some(identity) = self.species.get(&ancestor) {
species.extend_from_slice(&**identity);
has_speciated.set(i, true);
} else {
species.extend_from_slice(&[0; 24]);
}
}
let ids = PrimitiveArray::from_vec(ids);
let xs = PrimitiveArray::from_vec(xs);
let ys = PrimitiveArray::from_vec(ys);
let is = PrimitiveArray::from_vec(is);
let parents = PrimitiveArray::from_vec(parents);
let species = FixedSizeBinaryArray::try_new(
DataType::FixedSizeBinary(24),
Buffer::from(species),
Some(has_speciated.into()),
)?;
let chunk = Chunk::try_new(vec![
ids.boxed(),
xs.boxed(),
ys.boxed(),
is.boxed(),
parents.boxed(),
species.boxed(),
])?;
writer.write(&chunk, None)?;
writer.finish()
}
}