numcodecs_wasm_host_reproducible/
codec.rs

1use std::num::Wrapping;
2use std::sync::{Arc, Mutex};
3
4use numcodecs::{
5    AnyArray, AnyArrayView, AnyArrayViewMut, AnyCowArray, Codec, DynCodec, DynCodecType,
6};
7use numcodecs_wasm_host::{CodecError, RuntimeError, WasmCodec, WasmCodecComponent};
8use schemars::Schema;
9use serde::Serializer;
10use wasm_component_layer::{AsContextMut, Component, Instance, Linker, Store, TypedFunc};
11use wasm_runtime_layer::{backend::WasmEngine, Engine};
12
13use crate::transform::instcnt::PerfWitInterfaces;
14use crate::transform::transform_wasm_component;
15use crate::{engine::ReproducibleEngine, logging, stdio};
16
17#[derive(Debug, thiserror::Error)]
18/// Errors that can occur when using the [`ReproducibleWasmCodec`]
19pub enum ReproducibleWasmCodecError {
20    /// The codec's lock was poisoned
21    #[error("{codec_id} codec's lock was poisoned")]
22    Poisoned {
23        /// The codec's id
24        codec_id: Arc<str>,
25    },
26    /// The codec's WebAssembly runtime raised an error
27    #[error("{codec_id} codec's WebAssembly runtime raised an error")]
28    Runtime {
29        /// The codec's id
30        codec_id: Arc<str>,
31        /// The runtime error
32        source: RuntimeError,
33    },
34    /// The codec's implementation raised an error
35    #[error("{codec_id} codec's implementation raised an error")]
36    Codec {
37        /// The codec's id
38        codec_id: Arc<str>,
39        /// The codec error
40        source: CodecError,
41    },
42}
43
44/// Codec instantiated inside a WebAssembly component.
45///
46/// The codec is loaded such that its execution is reproducible across any
47/// platform. Importantly, each codec owns its own component instance such that
48/// two codecs cannot interfere.
49pub struct ReproducibleWasmCodec<E: WasmEngine>
50where
51    Store<(), ReproducibleEngine<E>>: Send,
52{
53    store: Mutex<Store<(), ReproducibleEngine<E>>>,
54    instance: Instance,
55    codec: WasmCodec,
56    ty: ReproducibleWasmCodecType<E>,
57    instruction_counter: TypedFunc<(), u64>,
58}
59
60impl<E: WasmEngine> ReproducibleWasmCodec<E>
61where
62    Store<(), ReproducibleEngine<E>>: Send,
63{
64    /// Try cloning the codec by recreating it from its configuration.
65    ///
66    /// `ReproducibleWasmCodec` implements [`Clone`] by calling this method and
67    /// panicking if it fails.
68    ///
69    /// # Errors
70    ///
71    /// Errors if serializing the codec configuration, constructing the new
72    /// codec, or interacting with the component fails.
73    pub fn try_clone(&self) -> Result<Self, serde_json::Error> {
74        let mut config = self.get_config(serde_json::value::Serializer)?;
75
76        if let Some(config) = config.as_object_mut() {
77            config.remove("id");
78        }
79
80        let codec: Self = self.ty.codec_from_config(config)?;
81
82        Ok(codec)
83    }
84
85    /// Try dropping the codec.
86    ///
87    /// `ReproducibleWasmCodec` implements [`Drop`] by calling this method and
88    /// ignoring any errors.
89    // That's not quite true but the effect is the same
90    ///
91    /// # Errors
92    ///
93    /// Errors if dropping the codec's resource or component, or interacting
94    /// with the component fails.
95    pub fn try_drop(mut self) -> Result<(), ReproducibleWasmCodecError> {
96        // keep in sync with drop
97        let mut store = self
98            .store
99            .get_mut()
100            .map_err(|_| ReproducibleWasmCodecError::Poisoned {
101                codec_id: self.ty.codec_id.clone(),
102            })?;
103
104        let result =
105            self.codec
106                .try_drop(&mut store)
107                .map_err(|source| ReproducibleWasmCodecError::Runtime {
108                    codec_id: self.ty.codec_id.clone(),
109                    source,
110                });
111        let results = try_drop_instance(store, &self.instance, &self.ty.codec_id);
112
113        result.and(results)
114    }
115
116    #[expect(clippy::significant_drop_tightening)]
117    /// Read the codec's instruction counter, which is based on the number of
118    /// WebAssembly bytecode instructions executed.
119    ///
120    /// The instruction counter is never reset and wraps around. Comparisons of
121    /// the counter values before and after, e.g. a call to [`Self::encode`],
122    /// should thus use wrapping arithmetic.
123    ///
124    /// # Errors
125    ///
126    /// Errors if interacting with the component fails.
127    pub fn instruction_counter(&self) -> Result<Wrapping<u64>, ReproducibleWasmCodecError> {
128        let mut store = self
129            .store
130            .lock()
131            .map_err(|_| ReproducibleWasmCodecError::Poisoned {
132                codec_id: self.ty.codec_id.clone(),
133            })?;
134
135        let cnt = self
136            .instruction_counter
137            .call(store.as_context_mut(), ())
138            .map_err(|err| ReproducibleWasmCodecError::Runtime {
139                codec_id: self.ty.codec_id.clone(),
140                source: RuntimeError::from(err),
141            })?;
142
143        Ok(Wrapping(cnt))
144    }
145}
146
147impl<E: WasmEngine> Clone for ReproducibleWasmCodec<E>
148where
149    Store<(), ReproducibleEngine<E>>: Send,
150{
151    fn clone(&self) -> Self {
152        #[expect(clippy::expect_used)]
153        self.try_clone()
154            .expect("cloning a wasm codec should not fail")
155    }
156}
157
158impl<E: WasmEngine> Drop for ReproducibleWasmCodec<E>
159where
160    Store<(), ReproducibleEngine<E>>: Send,
161{
162    fn drop(&mut self) {
163        // keep in sync with try_drop
164        let Ok(mut store) = self.store.get_mut() else {
165            return;
166        };
167
168        let result = self.codec.try_drop(&mut store);
169        std::mem::drop(result);
170
171        let results = self.instance.drop(store);
172        std::mem::drop(results);
173    }
174}
175
176impl<E: WasmEngine> Codec for ReproducibleWasmCodec<E>
177where
178    Store<(), ReproducibleEngine<E>>: Send,
179{
180    type Error = ReproducibleWasmCodecError;
181
182    #[expect(clippy::significant_drop_tightening)]
183    fn encode(&self, data: AnyCowArray) -> Result<AnyArray, Self::Error> {
184        let mut store = self
185            .store
186            .lock()
187            .map_err(|_| ReproducibleWasmCodecError::Poisoned {
188                codec_id: self.ty.codec_id.clone(),
189            })?;
190
191        let encoded = self
192            .codec
193            .encode(store.as_context_mut(), data)
194            .map_err(|err| ReproducibleWasmCodecError::Runtime {
195                codec_id: self.ty.codec_id.clone(),
196                source: err,
197            })?
198            .map_err(|err| ReproducibleWasmCodecError::Codec {
199                codec_id: self.ty.codec_id.clone(),
200                source: err,
201            })?;
202
203        Ok(encoded)
204    }
205
206    #[expect(clippy::significant_drop_tightening)]
207    fn decode(&self, encoded: AnyCowArray) -> Result<AnyArray, Self::Error> {
208        let mut store = self
209            .store
210            .lock()
211            .map_err(|_| ReproducibleWasmCodecError::Poisoned {
212                codec_id: self.ty.codec_id.clone(),
213            })?;
214
215        let decoded = self
216            .codec
217            .decode(store.as_context_mut(), encoded)
218            .map_err(|err| ReproducibleWasmCodecError::Runtime {
219                codec_id: self.ty.codec_id.clone(),
220                source: err,
221            })?
222            .map_err(|err| ReproducibleWasmCodecError::Codec {
223                codec_id: self.ty.codec_id.clone(),
224                source: err,
225            })?;
226
227        Ok(decoded)
228    }
229
230    #[expect(clippy::significant_drop_tightening)]
231    fn decode_into(
232        &self,
233        encoded: AnyArrayView,
234        decoded: AnyArrayViewMut,
235    ) -> Result<(), Self::Error> {
236        let mut store = self
237            .store
238            .lock()
239            .map_err(|_| ReproducibleWasmCodecError::Poisoned {
240                codec_id: self.ty.codec_id.clone(),
241            })?;
242
243        self.codec
244            .decode_into(store.as_context_mut(), encoded, decoded)
245            .map_err(|err| ReproducibleWasmCodecError::Runtime {
246                codec_id: self.ty.codec_id.clone(),
247                source: err,
248            })?
249            .map_err(|err| ReproducibleWasmCodecError::Codec {
250                codec_id: self.ty.codec_id.clone(),
251                source: err,
252            })?;
253
254        Ok(())
255    }
256}
257
258impl<E: WasmEngine> DynCodec for ReproducibleWasmCodec<E>
259where
260    Store<(), ReproducibleEngine<E>>: Send,
261{
262    type Type = ReproducibleWasmCodecType<E>;
263
264    fn ty(&self) -> Self::Type {
265        ReproducibleWasmCodecType {
266            codec_id: self.ty.codec_id.clone(),
267            codec_config_schema: self.ty.codec_config_schema.clone(),
268            component: self.ty.component.clone(),
269            component_instantiater: self.ty.component_instantiater.clone(),
270        }
271    }
272
273    fn get_config<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
274        let mut store = self
275            .store
276            .lock()
277            .map_err(|_| ReproducibleWasmCodecError::Poisoned {
278                codec_id: self.ty.codec_id.clone(),
279            })
280            .map_err(serde::ser::Error::custom)?;
281
282        self.codec.get_config(store.as_context_mut(), serializer)
283    }
284}
285
286/// Type object for a codec instantiated inside a WebAssembly component.
287pub struct ReproducibleWasmCodecType<E: WasmEngine>
288where
289    Store<(), ReproducibleEngine<E>>: Send,
290{
291    pub(super) codec_id: Arc<str>,
292    pub(super) codec_config_schema: Arc<Schema>,
293    pub(super) component: Component,
294    #[expect(clippy::type_complexity)]
295    pub(super) component_instantiater: Arc<
296        dyn Send
297            + Sync
298            + Fn(
299                &Component,
300                &str,
301            ) -> Result<
302                (
303                    Store<(), ReproducibleEngine<E>>,
304                    Instance,
305                    WasmCodecComponent,
306                ),
307                ReproducibleWasmCodecError,
308            >,
309    >,
310}
311
312impl<E: WasmEngine> ReproducibleWasmCodecType<E>
313where
314    Store<(), ReproducibleEngine<E>>: Send,
315{
316    /// Load a [`DynCodecType`] from a binary `wasm_component`, which will be
317    /// executed in the provided core WebAssembly `engine`.
318    ///
319    /// # Errors
320    ///
321    /// Errors if the `wasm_component` does not export the `numcodecs:abc/codec`
322    /// interface or if interacting with the component fails.
323    pub fn new(
324        engine: E,
325        wasm_component: impl Into<Vec<u8>>,
326    ) -> Result<Self, ReproducibleWasmCodecError>
327    where
328        E: Send + Sync,
329        Store<(), ReproducibleEngine<E>>: Send + Sync,
330    {
331        let wasm_component = transform_wasm_component(wasm_component).map_err(|err| {
332            ReproducibleWasmCodecError::Runtime {
333                codec_id: Arc::from("<unknown>"),
334                source: RuntimeError::from(err),
335            }
336        })?;
337
338        let engine = Engine::new(ReproducibleEngine::new(engine));
339        let component = Component::new(&engine, &wasm_component).map_err(|err| {
340            ReproducibleWasmCodecError::Runtime {
341                codec_id: Arc::from("<unknown>"),
342                source: RuntimeError::from(err),
343            }
344        })?;
345
346        let component_instantiater = Arc::new(move |component: &Component, codec_id: &str| {
347            let mut store = Store::new(&engine, ());
348
349            let mut linker = Linker::default();
350            stdio::add_to_linker(&mut linker, &mut store).map_err(|err| {
351                ReproducibleWasmCodecError::Runtime {
352                    codec_id: Arc::from(codec_id),
353                    source: RuntimeError::from(err),
354                }
355            })?;
356            logging::add_to_linker(&mut linker, &mut store).map_err(|err| {
357                ReproducibleWasmCodecError::Runtime {
358                    codec_id: Arc::from(codec_id),
359                    source: RuntimeError::from(err),
360                }
361            })?;
362
363            let instance = linker.instantiate(&mut store, component).map_err(|err| {
364                ReproducibleWasmCodecError::Runtime {
365                    codec_id: Arc::from(codec_id),
366                    source: RuntimeError::from(err),
367                }
368            })?;
369
370            let component =
371                WasmCodecComponent::new(&mut store, instance.clone()).map_err(|source| {
372                    ReproducibleWasmCodecError::Runtime {
373                        codec_id: Arc::from(codec_id),
374                        source,
375                    }
376                })?;
377
378            Ok((store, instance, component))
379        });
380
381        let (codec_id, codec_config_schema) = {
382            let (mut store, instance, ty): (_, _, WasmCodecComponent) =
383                (component_instantiater)(&component, "<unknown>")?;
384
385            let codec_id = Arc::from(ty.codec_id());
386            let codec_config_schema = Arc::from(ty.codec_config_schema().clone());
387
388            try_drop_instance(&mut store, &instance, &codec_id)?;
389
390            (codec_id, codec_config_schema)
391        };
392
393        Ok(Self {
394            codec_id,
395            codec_config_schema,
396            component,
397            component_instantiater,
398        })
399    }
400}
401
402impl<E: WasmEngine> DynCodecType for ReproducibleWasmCodecType<E>
403where
404    Store<(), ReproducibleEngine<E>>: Send,
405{
406    type Codec = ReproducibleWasmCodec<E>;
407
408    fn codec_id(&self) -> &str {
409        &self.codec_id
410    }
411
412    fn codec_from_config<'de, D: serde::Deserializer<'de>>(
413        &self,
414        config: D,
415    ) -> Result<Self::Codec, D::Error> {
416        let (mut store, instance, component) =
417            (self.component_instantiater)(&self.component, &self.codec_id)
418                .map_err(serde::de::Error::custom)?;
419        let codec = component.codec_from_config(store.as_context_mut(), config)?;
420
421        let PerfWitInterfaces {
422            perf: perf_interface,
423            instruction_counter,
424        } = PerfWitInterfaces::get();
425        let Some(perf_interface) = instance.exports().instance(perf_interface) else {
426            return Err(serde::de::Error::custom(
427                "WASM component does not contain an interface to read the instruction counter",
428            ));
429        };
430        let Some(instruction_counter) = perf_interface.func(instruction_counter) else {
431            return Err(serde::de::Error::custom(
432                "WASM component interface does not contain a function to read the instruction counter"
433            ));
434        };
435        let instruction_counter = instruction_counter.typed().map_err(|err| {
436            serde::de::Error::custom(format!(
437                "WASM component instruction counter function has the wrong signature: {err}"
438            ))
439        })?;
440
441        Ok(ReproducibleWasmCodec {
442            store: Mutex::new(store),
443            instance,
444            codec,
445            ty: Self {
446                codec_id: self.codec_id.clone(),
447                codec_config_schema: self.codec_config_schema.clone(),
448                component: self.component.clone(),
449                component_instantiater: self.component_instantiater.clone(),
450            },
451            instruction_counter,
452        })
453    }
454
455    fn codec_config_schema(&self) -> Schema {
456        (*self.codec_config_schema).clone()
457    }
458}
459
460fn try_drop_instance<T, E: WasmEngine>(
461    store: &mut Store<T, E>,
462    instance: &Instance,
463    codec_id: &str,
464) -> Result<(), ReproducibleWasmCodecError> {
465    let mut errors = instance
466        .drop(store)
467        .map_err(|err| ReproducibleWasmCodecError::Runtime {
468            codec_id: Arc::from(codec_id),
469            source: RuntimeError::from(err),
470        })?;
471
472    let Some(mut err) = errors.pop() else {
473        return Ok(());
474    };
475
476    if !errors.is_empty() {
477        err = err.context(format!("showing one of {} errors", errors.len() + 1));
478    }
479
480    Err(ReproducibleWasmCodecError::Runtime {
481        codec_id: Arc::from(codec_id),
482        source: RuntimeError::from(
483            err.context("dropping instance and all of its resources failed"),
484        ),
485    })
486}