1#![expect(clippy::multiple_crate_versions)] use std::{borrow::Cow, fmt, num::NonZeroUsize};
23
24use ndarray::{Array, Array1, ArrayBase, ArrayViewMut, Data, Dimension, ShapeError};
25use numcodecs::{
26 AnyArray, AnyArrayAssignError, AnyArrayDType, AnyArrayView, AnyArrayViewMut, AnyCowArray,
27 Codec, StaticCodec, StaticCodecConfig, StaticCodecVersion,
28};
29use schemars::{JsonSchema, JsonSchema_repr};
30use serde::{Deserialize, Serialize};
31use serde_repr::{Deserialize_repr, Serialize_repr};
32use thiserror::Error;
33
34#[cfg(test)]
35use ::serde_json as _;
36
37type PcodecVersion = StaticCodecVersion<0, 1, 0>;
38
39#[derive(Clone, Serialize, Deserialize, JsonSchema)]
40#[schemars(deny_unknown_fields)] pub struct Pcodec {
43 pub level: PcoCompressionLevel,
46 #[serde(flatten)]
48 pub mode: PcoModeSpec,
49 #[serde(flatten)]
51 pub delta: PcoDeltaSpec,
52 #[serde(flatten)]
54 pub paging: PcoPagingSpec,
55 #[serde(default, rename = "_version")]
57 pub version: PcodecVersion,
58}
59
60#[derive(
61 Copy, Clone, Debug, Default, PartialEq, Eq, Serialize_repr, Deserialize_repr, JsonSchema_repr,
62)]
63#[repr(u8)]
64#[expect(missing_docs)]
71pub enum PcoCompressionLevel {
72 Level0 = 0,
73 Level1 = 1,
74 Level2 = 2,
75 Level3 = 3,
76 Level4 = 4,
77 Level5 = 5,
78 Level6 = 6,
79 Level7 = 7,
80 #[default]
81 Level8 = 8,
82 Level9 = 9,
83 Level10 = 10,
84 Level11 = 11,
85 Level12 = 12,
86}
87
88#[derive(Copy, Clone, Debug, Default, PartialEq, Serialize, Deserialize, JsonSchema)]
89#[schemars(deny_unknown_fields)] #[serde(tag = "mode", rename_all = "kebab-case")]
91pub enum PcoModeSpec {
93 #[default]
94 Auto,
99 Classic,
101 TryFloatMult {
105 float_mult_base: f64,
107 },
108 TryFloatQuant {
113 float_quant_bits: u32,
115 },
116 TryIntMult {
120 int_mult_base: u64,
122 },
123}
124
125#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
126#[schemars(deny_unknown_fields)] #[serde(tag = "delta", rename_all = "kebab-case")]
128pub enum PcoDeltaSpec {
130 #[default]
131 Auto,
136 None,
141 TryConsecutive {
147 delta_encoding_order: PcoDeltaEncodingOrder,
149 },
150 TryLookback,
156}
157
158#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr, JsonSchema_repr)]
159#[repr(u8)]
160#[expect(missing_docs)]
164pub enum PcoDeltaEncodingOrder {
165 Order0 = 0,
166 Order1 = 1,
167 Order2 = 2,
168 Order3 = 3,
169 Order4 = 4,
170 Order5 = 5,
171 Order6 = 6,
172 Order7 = 7,
173}
174
175#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
176#[schemars(deny_unknown_fields)] #[serde(tag = "paging", rename_all = "kebab-case")]
178pub enum PcoPagingSpec {
180 EqualPagesUpTo {
185 #[serde(default = "default_equal_pages_up_to")]
186 equal_pages_up_to: NonZeroUsize,
188 },
189}
190
191impl Default for PcoPagingSpec {
192 fn default() -> Self {
193 Self::EqualPagesUpTo {
194 equal_pages_up_to: default_equal_pages_up_to(),
195 }
196 }
197}
198
199const fn default_equal_pages_up_to() -> NonZeroUsize {
200 NonZeroUsize::MIN.saturating_add(pco::DEFAULT_MAX_PAGE_N.saturating_sub(1))
201}
202
203impl Codec for Pcodec {
204 type Error = PcodecError;
205
206 fn encode(&self, data: AnyCowArray) -> Result<AnyArray, Self::Error> {
207 match data {
208 AnyCowArray::U16(data) => Ok(AnyArray::U8(
209 Array1::from(compress(
210 data,
211 self.level,
212 self.mode,
213 self.delta,
214 self.paging,
215 )?)
216 .into_dyn(),
217 )),
218 AnyCowArray::U32(data) => Ok(AnyArray::U8(
219 Array1::from(compress(
220 data,
221 self.level,
222 self.mode,
223 self.delta,
224 self.paging,
225 )?)
226 .into_dyn(),
227 )),
228 AnyCowArray::U64(data) => Ok(AnyArray::U8(
229 Array1::from(compress(
230 data,
231 self.level,
232 self.mode,
233 self.delta,
234 self.paging,
235 )?)
236 .into_dyn(),
237 )),
238 AnyCowArray::I16(data) => Ok(AnyArray::U8(
239 Array1::from(compress(
240 data,
241 self.level,
242 self.mode,
243 self.delta,
244 self.paging,
245 )?)
246 .into_dyn(),
247 )),
248 AnyCowArray::I32(data) => Ok(AnyArray::U8(
249 Array1::from(compress(
250 data,
251 self.level,
252 self.mode,
253 self.delta,
254 self.paging,
255 )?)
256 .into_dyn(),
257 )),
258 AnyCowArray::I64(data) => Ok(AnyArray::U8(
259 Array1::from(compress(
260 data,
261 self.level,
262 self.mode,
263 self.delta,
264 self.paging,
265 )?)
266 .into_dyn(),
267 )),
268 AnyCowArray::F32(data) => Ok(AnyArray::U8(
269 Array1::from(compress(
270 data,
271 self.level,
272 self.mode,
273 self.delta,
274 self.paging,
275 )?)
276 .into_dyn(),
277 )),
278 AnyCowArray::F64(data) => Ok(AnyArray::U8(
279 Array1::from(compress(
280 data,
281 self.level,
282 self.mode,
283 self.delta,
284 self.paging,
285 )?)
286 .into_dyn(),
287 )),
288 encoded => Err(PcodecError::UnsupportedDtype(encoded.dtype())),
289 }
290 }
291
292 fn decode(&self, encoded: AnyCowArray) -> Result<AnyArray, Self::Error> {
293 let AnyCowArray::U8(encoded) = encoded else {
294 return Err(PcodecError::EncodedDataNotBytes {
295 dtype: encoded.dtype(),
296 });
297 };
298
299 if !matches!(encoded.shape(), [_]) {
300 return Err(PcodecError::EncodedDataNotOneDimensional {
301 shape: encoded.shape().to_vec(),
302 });
303 }
304
305 decompress(&AnyCowArray::U8(encoded).as_bytes())
306 }
307
308 fn decode_into(
309 &self,
310 encoded: AnyArrayView,
311 decoded: AnyArrayViewMut,
312 ) -> Result<(), Self::Error> {
313 let AnyArrayView::U8(encoded) = encoded else {
314 return Err(PcodecError::EncodedDataNotBytes {
315 dtype: encoded.dtype(),
316 });
317 };
318
319 if !matches!(encoded.shape(), [_]) {
320 return Err(PcodecError::EncodedDataNotOneDimensional {
321 shape: encoded.shape().to_vec(),
322 });
323 }
324
325 let encoded = AnyArrayView::U8(encoded);
326 let encoded = encoded.as_bytes();
327
328 match decoded {
329 AnyArrayViewMut::U16(decoded) => decompress_into(&encoded, decoded),
330 AnyArrayViewMut::U32(decoded) => decompress_into(&encoded, decoded),
331 AnyArrayViewMut::U64(decoded) => decompress_into(&encoded, decoded),
332 AnyArrayViewMut::I16(decoded) => decompress_into(&encoded, decoded),
333 AnyArrayViewMut::I32(decoded) => decompress_into(&encoded, decoded),
334 AnyArrayViewMut::I64(decoded) => decompress_into(&encoded, decoded),
335 AnyArrayViewMut::F32(decoded) => decompress_into(&encoded, decoded),
336 AnyArrayViewMut::F64(decoded) => decompress_into(&encoded, decoded),
337 decoded => Err(PcodecError::UnsupportedDtype(decoded.dtype())),
338 }
339 }
340}
341
342impl StaticCodec for Pcodec {
343 const CODEC_ID: &'static str = "pco.rs";
344
345 type Config<'de> = Self;
346
347 fn from_config(config: Self::Config<'_>) -> Self {
348 config
349 }
350
351 fn get_config(&self) -> StaticCodecConfig<Self> {
352 StaticCodecConfig::from(self)
353 }
354}
355
356#[derive(Debug, Error)]
357pub enum PcodecError {
359 #[error("Pco does not support the dtype {0}")]
361 UnsupportedDtype(AnyArrayDType),
362 #[error("Pco failed to encode the header")]
364 HeaderEncodeFailed {
365 source: PcoHeaderError,
367 },
368 #[error("Pco failed to encode the data")]
370 PcoEncodeFailed {
371 source: PcoCodingError,
373 },
374 #[error(
377 "Pco can only decode one-dimensional byte arrays but received an array of dtype {dtype}"
378 )]
379 EncodedDataNotBytes {
380 dtype: AnyArrayDType,
382 },
383 #[error("Pco can only decode one-dimensional byte arrays but received a byte array of shape {shape:?}")]
386 EncodedDataNotOneDimensional {
387 shape: Vec<usize>,
389 },
390 #[error("Pco failed to decode the header")]
392 HeaderDecodeFailed {
393 source: PcoHeaderError,
395 },
396 #[error("Pco failed to decode the data")]
398 PcoDecodeFailed {
399 source: PcoCodingError,
401 },
402 #[error("Pco decoded an invalid array shape header which does not fit the decoded data")]
405 DecodeInvalidShapeHeader {
406 #[from]
408 source: ShapeError,
409 },
410 #[error("Pco cannot decode into the provided array")]
412 MismatchedDecodeIntoArray {
413 #[from]
415 source: AnyArrayAssignError,
416 },
417}
418
419#[derive(Debug, Error)]
420#[error(transparent)]
421pub struct PcoHeaderError(postcard::Error);
423
424#[derive(Debug, Error)]
425#[error(transparent)]
426pub struct PcoCodingError(pco::errors::PcoError);
428
429#[expect(clippy::needless_pass_by_value)]
430pub fn compress<T: PcoElement, S: Data<Elem = T>, D: Dimension>(
439 data: ArrayBase<S, D>,
440 level: PcoCompressionLevel,
441 mode: PcoModeSpec,
442 delta: PcoDeltaSpec,
443 paging: PcoPagingSpec,
444) -> Result<Vec<u8>, PcodecError> {
445 let mut encoded_bytes = postcard::to_extend(
446 &CompressionHeader {
447 dtype: <T as PcoElement>::DTYPE,
448 shape: Cow::Borrowed(data.shape()),
449 version: StaticCodecVersion,
450 },
451 Vec::new(),
452 )
453 .map_err(|err| PcodecError::HeaderEncodeFailed {
454 source: PcoHeaderError(err),
455 })?;
456
457 let data_owned;
458 #[expect(clippy::option_if_let_else)]
459 let data = if let Some(slice) = data.as_slice() {
460 slice
461 } else {
462 data_owned = data.into_iter().copied().collect::<Vec<T>>();
463 data_owned.as_slice()
464 };
465
466 let config = pco::ChunkConfig::default()
467 .with_compression_level(level as usize)
468 .with_mode_spec(match mode {
469 PcoModeSpec::Auto => pco::ModeSpec::Auto,
470 PcoModeSpec::Classic => pco::ModeSpec::Classic,
471 PcoModeSpec::TryFloatMult { float_mult_base } => {
472 pco::ModeSpec::TryFloatMult(float_mult_base)
473 }
474 PcoModeSpec::TryFloatQuant { float_quant_bits } => {
475 pco::ModeSpec::TryFloatQuant(float_quant_bits)
476 }
477 PcoModeSpec::TryIntMult { int_mult_base } => pco::ModeSpec::TryIntMult(int_mult_base),
478 })
479 .with_delta_spec(match delta {
480 PcoDeltaSpec::Auto => pco::DeltaSpec::Auto,
481 PcoDeltaSpec::None => pco::DeltaSpec::None,
482 PcoDeltaSpec::TryConsecutive {
483 delta_encoding_order,
484 } => pco::DeltaSpec::TryConsecutive(delta_encoding_order as usize),
485 PcoDeltaSpec::TryLookback => pco::DeltaSpec::TryLookback,
486 })
487 .with_paging_spec(match paging {
488 PcoPagingSpec::EqualPagesUpTo { equal_pages_up_to } => {
489 pco::PagingSpec::EqualPagesUpTo(equal_pages_up_to.get())
490 }
491 });
492
493 let encoded = pco::standalone::simple_compress(data, &config).map_err(|err| {
494 PcodecError::PcoEncodeFailed {
495 source: PcoCodingError(err),
496 }
497 })?;
498 encoded_bytes.extend_from_slice(&encoded);
499
500 Ok(encoded_bytes)
501}
502
503pub fn decompress(encoded: &[u8]) -> Result<AnyArray, PcodecError> {
511 let (header, data) =
512 postcard::take_from_bytes::<CompressionHeader>(encoded).map_err(|err| {
513 PcodecError::HeaderDecodeFailed {
514 source: PcoHeaderError(err),
515 }
516 })?;
517
518 let decoded = match header.dtype {
519 PcoDType::U16 => AnyArray::U16(Array::from_shape_vec(
520 &*header.shape,
521 pco::standalone::simple_decompress(data).map_err(|err| {
522 PcodecError::PcoDecodeFailed {
523 source: PcoCodingError(err),
524 }
525 })?,
526 )?),
527 PcoDType::U32 => AnyArray::U32(Array::from_shape_vec(
528 &*header.shape,
529 pco::standalone::simple_decompress(data).map_err(|err| {
530 PcodecError::PcoDecodeFailed {
531 source: PcoCodingError(err),
532 }
533 })?,
534 )?),
535 PcoDType::U64 => AnyArray::U64(Array::from_shape_vec(
536 &*header.shape,
537 pco::standalone::simple_decompress(data).map_err(|err| {
538 PcodecError::PcoDecodeFailed {
539 source: PcoCodingError(err),
540 }
541 })?,
542 )?),
543 PcoDType::I16 => AnyArray::I16(Array::from_shape_vec(
544 &*header.shape,
545 pco::standalone::simple_decompress(data).map_err(|err| {
546 PcodecError::PcoDecodeFailed {
547 source: PcoCodingError(err),
548 }
549 })?,
550 )?),
551 PcoDType::I32 => AnyArray::I32(Array::from_shape_vec(
552 &*header.shape,
553 pco::standalone::simple_decompress(data).map_err(|err| {
554 PcodecError::PcoDecodeFailed {
555 source: PcoCodingError(err),
556 }
557 })?,
558 )?),
559 PcoDType::I64 => AnyArray::I64(Array::from_shape_vec(
560 &*header.shape,
561 pco::standalone::simple_decompress(data).map_err(|err| {
562 PcodecError::PcoDecodeFailed {
563 source: PcoCodingError(err),
564 }
565 })?,
566 )?),
567 PcoDType::F32 => AnyArray::F32(Array::from_shape_vec(
568 &*header.shape,
569 pco::standalone::simple_decompress(data).map_err(|err| {
570 PcodecError::PcoDecodeFailed {
571 source: PcoCodingError(err),
572 }
573 })?,
574 )?),
575 PcoDType::F64 => AnyArray::F64(Array::from_shape_vec(
576 &*header.shape,
577 pco::standalone::simple_decompress(data).map_err(|err| {
578 PcodecError::PcoDecodeFailed {
579 source: PcoCodingError(err),
580 }
581 })?,
582 )?),
583 };
584
585 Ok(decoded)
586}
587
588pub fn decompress_into<T: PcoElement, D: Dimension>(
600 encoded: &[u8],
601 mut decoded: ArrayViewMut<T, D>,
602) -> Result<(), PcodecError> {
603 let (header, data) =
604 postcard::take_from_bytes::<CompressionHeader>(encoded).map_err(|err| {
605 PcodecError::HeaderDecodeFailed {
606 source: PcoHeaderError(err),
607 }
608 })?;
609
610 if T::DTYPE != header.dtype {
611 return Err(PcodecError::MismatchedDecodeIntoArray {
612 source: AnyArrayAssignError::DTypeMismatch {
613 src: header.dtype.into_dtype(),
614 dst: T::DTYPE.into_dtype(),
615 },
616 });
617 }
618
619 if decoded.shape() != &*header.shape {
620 return Err(PcodecError::MismatchedDecodeIntoArray {
621 source: AnyArrayAssignError::ShapeMismatch {
622 src: header.shape.into_owned(),
623 dst: decoded.shape().to_vec(),
624 },
625 });
626 }
627
628 if let Some(slice) = decoded.as_slice_mut() {
629 pco::standalone::simple_decompress_into(data, slice).map_err(|err| {
630 PcodecError::PcoDecodeFailed {
631 source: PcoCodingError(err),
632 }
633 })?;
634 return Ok(());
635 }
636
637 let dec =
638 pco::standalone::simple_decompress(data).map_err(|err| PcodecError::PcoDecodeFailed {
639 source: PcoCodingError(err),
640 })?;
641
642 if dec.len() != decoded.len() {
643 return Err(PcodecError::DecodeInvalidShapeHeader {
644 source: ShapeError::from_kind(ndarray::ErrorKind::IncompatibleShape),
645 });
646 }
647
648 decoded.iter_mut().zip(dec).for_each(|(o, d)| *o = d);
649
650 Ok(())
651}
652
653pub trait PcoElement: Copy + pco::data_types::Number {
655 const DTYPE: PcoDType;
657}
658
659impl PcoElement for u16 {
660 const DTYPE: PcoDType = PcoDType::U16;
661}
662
663impl PcoElement for u32 {
664 const DTYPE: PcoDType = PcoDType::U32;
665}
666
667impl PcoElement for u64 {
668 const DTYPE: PcoDType = PcoDType::U64;
669}
670
671impl PcoElement for i16 {
672 const DTYPE: PcoDType = PcoDType::I16;
673}
674
675impl PcoElement for i32 {
676 const DTYPE: PcoDType = PcoDType::I32;
677}
678
679impl PcoElement for i64 {
680 const DTYPE: PcoDType = PcoDType::I64;
681}
682
683impl PcoElement for f32 {
684 const DTYPE: PcoDType = PcoDType::F32;
685}
686
687impl PcoElement for f64 {
688 const DTYPE: PcoDType = PcoDType::F64;
689}
690
691#[derive(Serialize, Deserialize)]
692struct CompressionHeader<'a> {
693 dtype: PcoDType,
694 #[serde(borrow)]
695 shape: Cow<'a, [usize]>,
696 version: PcodecVersion,
697}
698
699#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
701#[expect(missing_docs)]
702pub enum PcoDType {
703 #[serde(rename = "u16", alias = "uint16")]
704 U16,
705 #[serde(rename = "u32", alias = "uint32")]
706 U32,
707 #[serde(rename = "u64", alias = "uint64")]
708 U64,
709 #[serde(rename = "i16", alias = "int16")]
710 I16,
711 #[serde(rename = "i32", alias = "int32")]
712 I32,
713 #[serde(rename = "i64", alias = "int64")]
714 I64,
715 #[serde(rename = "f32", alias = "float32")]
716 F32,
717 #[serde(rename = "f64", alias = "float64")]
718 F64,
719}
720
721impl PcoDType {
722 #[must_use]
723 pub const fn into_dtype(self) -> AnyArrayDType {
725 match self {
726 Self::U16 => AnyArrayDType::U16,
727 Self::U32 => AnyArrayDType::U32,
728 Self::U64 => AnyArrayDType::U64,
729 Self::I16 => AnyArrayDType::I16,
730 Self::I32 => AnyArrayDType::I32,
731 Self::I64 => AnyArrayDType::I64,
732 Self::F32 => AnyArrayDType::F32,
733 Self::F64 => AnyArrayDType::F64,
734 }
735 }
736}
737
738impl fmt::Display for PcoDType {
739 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
740 fmt.write_str(match self {
741 Self::U16 => "u16",
742 Self::U32 => "u32",
743 Self::U64 => "u64",
744 Self::I16 => "i16",
745 Self::I32 => "i32",
746 Self::I64 => "i64",
747 Self::F32 => "f32",
748 Self::F64 => "f64",
749 })
750 }
751}