use alloc::vec::Vec;
use core::ops::ControlFlow;
use necsim_core::{
cogs::{
backup::BackedUp, ActiveLineageSampler, Backup, CoalescenceSampler, DispersalSampler,
EventSampler, Habitat, LocallyCoherentLineageStore, MathsCore, RngCore,
SpeciationProbability, TurnoverRate,
},
lineage::MigratingLineage,
reporter::Reporter,
simulation::Simulation,
};
use necsim_core_bond::{NonNegativeF64, PositiveF64};
use necsim_partitioning_core::{LocalPartition, MigrationMode};
use crate::{
cogs::{
emigration_exit::domain::DomainEmigrationExit,
immigration_entry::buffered::BufferedImmigrationEntry,
},
decomposition::Decomposition,
parallelisation::Status,
};
use super::reporter::BufferingReporterProxy;
#[allow(clippy::type_complexity)]
pub fn simulate<
'p,
M: MathsCore,
H: Habitat<M>,
G: RngCore<M>,
S: LocallyCoherentLineageStore<M, H>,
D: DispersalSampler<M, H, G>,
C: CoalescenceSampler<M, H, S>,
T: TurnoverRate<M, H>,
N: SpeciationProbability<M, H>,
O: Decomposition<M, H>,
E: EventSampler<M, H, G, S, DomainEmigrationExit<M, H, O>, D, C, T, N>,
A: ActiveLineageSampler<
M,
H,
G,
S,
DomainEmigrationExit<M, H, O>,
D,
C,
T,
N,
E,
BufferedImmigrationEntry,
>,
P: Reporter,
L: LocalPartition<'p, P>,
>(
simulation: &mut Simulation<
M,
H,
G,
S,
DomainEmigrationExit<M, H, O>,
D,
C,
T,
N,
E,
BufferedImmigrationEntry,
A,
>,
independent_time_slice: PositiveF64,
local_partition: &mut L,
) -> (Status, NonNegativeF64, u64) {
local_partition.report_progress_sync(simulation.get_balanced_remaining_work().0);
let mut global_safe_time = NonNegativeF64::zero();
let mut simulation_backup = simulation.backup();
let mut last_immigrants: Vec<BackedUp<MigratingLineage>> = Vec::new();
let mut immigrants: Vec<MigratingLineage> = Vec::new();
let mut total_steps = 0_u64;
let mut proxy = BufferingReporterProxy::from(local_partition);
while proxy
.local_partition()
.reduce_vote_any(!simulation.is_done())
{
let next_safe_time = global_safe_time + independent_time_slice;
loop {
let (_, new_steps) = simulation.simulate_incremental_early_stop(
|_, _, next_event_time, _| {
if next_event_time >= next_safe_time {
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
},
&mut proxy,
);
total_steps += new_steps;
immigrants.extend(proxy.local_partition().migrate_individuals(
simulation.emigration_exit_mut(),
MigrationMode::Default,
MigrationMode::Default,
));
while proxy.local_partition().wait_for_termination().is_continue() {
immigrants.extend(proxy.local_partition().migrate_individuals(
&mut core::iter::empty(),
MigrationMode::Force,
MigrationMode::Force,
));
}
immigrants.sort_unstable();
if proxy
.local_partition()
.reduce_vote_any(immigrants != last_immigrants)
{
*simulation = simulation_backup.resume();
proxy.clear_events();
last_immigrants.clear();
for immigrant in &immigrants {
last_immigrants.push(immigrant.backup());
}
simulation
.immigration_entry_mut()
.extend(immigrants.drain(..));
} else {
immigrants.clear();
last_immigrants.clear();
break;
}
}
proxy.report_events();
simulation_backup = simulation.backup();
global_safe_time = next_safe_time.into();
}
proxy.local_partition().report_progress_sync(0_u64);
let local_time = simulation.active_lineage_sampler().get_last_event_time();
let local_steps = total_steps;
(Status::Done, local_time, local_steps)
}