use std::{
    fmt,
    marker::PhantomData,
    ops::ControlFlow,
    sync::{
        mpsc::{Receiver, SendError, SyncSender, TrySendError},
        Arc, Barrier,
    },
    task::Poll,
    time::{Duration, Instant},
};
use necsim_core::{
    impl_report,
    lineage::MigratingLineage,
    reporter::{
        boolean::{Boolean, False},
        Reporter,
    },
};
use necsim_core_bond::PositiveF64;
use necsim_impls_std::event_log::recorder::EventLogRecorder;
use necsim_partitioning_core::{partition::Partition, LocalPartition, MigrationMode};
use crate::vote::{AsyncVote, Vote};
#[allow(clippy::module_name_repetitions)]
pub struct ThreadsLocalPartition<R: Reporter> {
    partition: Partition,
    vote_any: Vote<bool>,
    vote_min_time: Vote<(PositiveF64, u32)>,
    vote_termination: AsyncVote<ControlFlow<(), ()>>,
    emigration_buffers: Box<[Vec<MigratingLineage>]>,
    emigration_channels: Box<[SyncSender<Vec<MigratingLineage>>]>,
    immigration_buffers: Vec<Vec<MigratingLineage>>,
    immigration_channel: Receiver<Vec<MigratingLineage>>,
    last_migration_times: Box<[Instant]>,
    communicated_since_last_termination_vote: bool,
    migration_interval: Duration,
    recorder: EventLogRecorder,
    local_remaining: u64,
    progress_channel: SyncSender<(u64, u32)>,
    last_report_time: Instant,
    progress_interval: Duration,
    sync_barrier: Arc<Barrier>,
    _marker: PhantomData<R>,
}
impl<R: Reporter> fmt::Debug for ThreadsLocalPartition<R> {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        fmt.debug_struct(stringify!(ThreadsLocalPartition)).finish()
    }
}
impl<R: Reporter> ThreadsLocalPartition<R> {
    #[allow(clippy::too_many_arguments)]
    #[must_use]
    pub(crate) fn new(
        partition: Partition,
        vote_any: &Vote<bool>,
        vote_min_time: &Vote<(PositiveF64, u32)>,
        vote_termination: &AsyncVote<ControlFlow<(), ()>>,
        emigration_channels: &[SyncSender<Vec<MigratingLineage>>],
        immigration_channel: Receiver<Vec<MigratingLineage>>,
        migration_interval: Duration,
        mut recorder: EventLogRecorder,
        progress_channel: SyncSender<(u64, u32)>,
        progress_interval: Duration,
        sync_barrier: &Arc<Barrier>,
    ) -> Self {
        recorder.set_event_filter(R::ReportSpeciation::VALUE, R::ReportDispersal::VALUE);
        let partition_size = partition.size().get() as usize;
        let mut emigration_buffers = Vec::with_capacity(partition_size);
        emigration_buffers.resize_with(partition_size, Vec::new);
        let now = Instant::now();
        Self {
            partition,
            vote_any: vote_any.clone(),
            vote_min_time: vote_min_time.clone(),
            vote_termination: vote_termination.clone(),
            emigration_buffers: emigration_buffers.into_boxed_slice(),
            emigration_channels: Vec::from(emigration_channels).into_boxed_slice(),
            immigration_buffers: Vec::new(),
            immigration_channel,
            last_migration_times: vec![
                now.checked_sub(migration_interval).unwrap_or(now);
                partition_size
            ]
            .into_boxed_slice(),
            communicated_since_last_termination_vote: false,
            migration_interval,
            recorder,
            local_remaining: 0,
            progress_channel,
            last_report_time: now.checked_sub(progress_interval).unwrap_or(now),
            progress_interval,
            sync_barrier: sync_barrier.clone(),
            _marker: PhantomData::<R>,
        }
    }
}
impl<'p, R: Reporter> LocalPartition<'p, R> for ThreadsLocalPartition<R> {
    type ImmigrantIterator<'a> = ImmigrantPopIterator<'a> where 'p: 'a, R: 'a;
    type IsLive = False;
    type Reporter = Self;
    fn get_reporter(&mut self) -> &mut Self::Reporter {
        self
    }
    fn get_partition(&self) -> Partition {
        self.partition
    }
    fn migrate_individuals<'a, E: Iterator<Item = (u32, MigratingLineage)>>(
        &'a mut self,
        emigrants: &mut E,
        emigration_mode: MigrationMode,
        immigration_mode: MigrationMode,
    ) -> Self::ImmigrantIterator<'a>
    where
        'p: 'a,
    {
        for (partition, emigrant) in emigrants {
            self.emigration_buffers[partition as usize].push(emigrant);
        }
        let self_rank_index = self.get_partition().rank() as usize;
        let now = Instant::now();
        if match immigration_mode {
            MigrationMode::Force => true,
            MigrationMode::Default => {
                now.duration_since(self.last_migration_times[self_rank_index])
                    >= self.migration_interval
            },
            MigrationMode::Hold => false,
        } {
            self.last_migration_times[self_rank_index] = now;
            self.immigration_buffers
                .extend(self.immigration_channel.try_iter());
        }
        for partition in self.partition.size().partitions() {
            let rank_index = partition.rank() as usize;
            if rank_index != self_rank_index
                && match emigration_mode {
                    MigrationMode::Force => true,
                    MigrationMode::Default => {
                        now.duration_since(self.last_migration_times[rank_index])
                            >= self.migration_interval
                    },
                    MigrationMode::Hold => false,
                }
            {
                let emigration_buffer = &mut self.emigration_buffers[rank_index];
                if !emigration_buffer.is_empty() {
                    let emigration_buffer_message = std::mem::take(emigration_buffer);
                    match self.emigration_channels[rank_index].try_send(emigration_buffer_message) {
                        Ok(()) => {
                            self.last_migration_times[rank_index] = now;
                            self.communicated_since_last_termination_vote = true;
                        },
                        Err(TrySendError::Full(emigration_buffer_message)) => {
                            *emigration_buffer = emigration_buffer_message;
                        },
                        Err(TrySendError::Disconnected(_)) => {
                            panic!("threads partitioning migration channel disconnected")
                        },
                    }
                }
            }
        }
        ImmigrantPopIterator::new(&mut self.immigration_buffers)
    }
    fn reduce_vote_any(&mut self, vote: bool) -> bool {
        self.vote_any.vote(|acc| match acc {
            None => vote,
            Some(acc) => *acc || vote,
        })
    }
    fn reduce_vote_min_time(
        &mut self,
        local_time: PositiveF64,
    ) -> Result<PositiveF64, PositiveF64> {
        let vote = (local_time, self.partition.rank());
        let result = self.vote_min_time.vote(|acc| match acc {
            None => vote,
            Some(acc) => vote.min(*acc),
        });
        if result.1 == self.partition.rank() {
            Ok(result.0)
        } else {
            Err(result.0)
        }
    }
    fn wait_for_termination(&mut self) -> ControlFlow<(), ()> {
        let mut local_wait = ControlFlow::Break(());
        for buffer in self.emigration_buffers.iter() {
            if !buffer.is_empty() {
                local_wait = ControlFlow::Continue(());
                break;
            }
        }
        if !self.immigration_buffers.is_empty() {
            local_wait = ControlFlow::Continue(());
        }
        if local_wait.is_continue() && !self.vote_termination.is_ongoing() {
            return ControlFlow::Continue(());
        }
        if self.communicated_since_last_termination_vote {
            local_wait = ControlFlow::Continue(());
        }
        let async_vote = self.vote_termination.vote(
            |global_wait| {
                self.communicated_since_last_termination_vote = false;
                match global_wait {
                    Some(ControlFlow::Continue(())) => ControlFlow::Continue(()),
                    Some(ControlFlow::Break(())) | None => local_wait,
                }
            },
            self.partition.rank(),
        );
        match async_vote {
            Poll::Pending => {
                std::thread::yield_now();
                ControlFlow::Continue(())
            },
            Poll::Ready(result) => result,
        }
    }
    fn report_progress_sync(&mut self, remaining: u64) {
        if let Err(SendError(_)) = self
            .progress_channel
            .send((remaining, self.partition.rank()))
        {
            panic!("threads partitioning sync progress channel disconnected");
        }
        self.sync_barrier.wait();
    }
}
impl<R: Reporter> Reporter for ThreadsLocalPartition<R> {
    impl_report!(speciation(&mut self, speciation: MaybeUsed<R::ReportSpeciation>) {
        self.recorder.record_speciation(speciation);
    });
    impl_report!(dispersal(&mut self, dispersal: MaybeUsed<R::ReportDispersal>) {
        self.recorder.record_dispersal(dispersal);
    });
    impl_report!(progress(&mut self, remaining: MaybeUsed<R::ReportProgress>) {
        if self.local_remaining == *remaining {
            return;
        }
        if !self.vote_termination.is_ongoing() {
            let now = Instant::now();
            if now.duration_since(self.last_report_time) >= self.progress_interval {
                match self.progress_channel.try_send((*remaining, self.partition.rank())) {
                    Ok(()) => {
                        self.last_report_time = now;
                        self.local_remaining = *remaining;
                    },
                    Err(TrySendError::Full(_)) => (),
                    Err(TrySendError::Disconnected(_)) =>  {
                        panic!("threads partitioning progress channel disconnected")
                    },
                }
            }
        }
    });
}
pub struct ImmigrantPopIterator<'i> {
    immigrants: &'i mut Vec<Vec<MigratingLineage>>,
}
impl<'i> ImmigrantPopIterator<'i> {
    fn new(immigrants: &'i mut Vec<Vec<MigratingLineage>>) -> Self {
        Self { immigrants }
    }
}
impl<'i> Iterator for ImmigrantPopIterator<'i> {
    type Item = MigratingLineage;
    fn next(&mut self) -> Option<Self::Item> {
        let mut next_immigrants = self.immigrants.last_mut()?;
        loop {
            if let Some(next) = next_immigrants.pop() {
                return Some(next);
            }
            self.immigrants.pop();
            next_immigrants = self.immigrants.last_mut()?;
        }
    }
    fn size_hint(&self) -> (usize, Option<usize>) {
        let len = self.immigrants.iter().map(Vec::len).sum();
        (len, Some(len))
    }
}