numcodecs_wasm_host_reproducible/
codec.rs1use std::num::Wrapping;
2use std::sync::{Arc, Mutex};
3
4use numcodecs::{
5 AnyArray, AnyArrayView, AnyArrayViewMut, AnyCowArray, Codec, DynCodec, DynCodecType,
6};
7use numcodecs_wasm_host::{CodecError, RuntimeError, WasmCodec, WasmCodecComponent};
8use schemars::Schema;
9use serde::Serializer;
10use wasm_component_layer::{AsContextMut, Component, Instance, Linker, Store, TypedFunc};
11use wasm_runtime_layer::{backend::WasmEngine, Engine};
12
13use crate::transform::instcnt::PerfWitInterfaces;
14use crate::transform::transform_wasm_component;
15use crate::{engine::ReproducibleEngine, logging, stdio};
16
17#[derive(Debug, thiserror::Error)]
18pub enum ReproducibleWasmCodecError {
20 #[error("{codec_id} codec's lock was poisoned")]
22 Poisoned {
23 codec_id: Arc<str>,
25 },
26 #[error("{codec_id} codec's WebAssembly runtime raised an error")]
28 Runtime {
29 codec_id: Arc<str>,
31 source: RuntimeError,
33 },
34 #[error("{codec_id} codec's implementation raised an error")]
36 Codec {
37 codec_id: Arc<str>,
39 source: CodecError,
41 },
42}
43
44pub struct ReproducibleWasmCodec<E: WasmEngine>
50where
51 Store<(), ReproducibleEngine<E>>: Send,
52{
53 store: Mutex<Store<(), ReproducibleEngine<E>>>,
54 instance: Instance,
55 codec: WasmCodec,
56 ty: ReproducibleWasmCodecType<E>,
57 instruction_counter: TypedFunc<(), u64>,
58}
59
60impl<E: WasmEngine> ReproducibleWasmCodec<E>
61where
62 Store<(), ReproducibleEngine<E>>: Send,
63{
64 pub fn try_clone(&self) -> Result<Self, serde_json::Error> {
74 let mut config = self.get_config(serde_json::value::Serializer)?;
75
76 if let Some(config) = config.as_object_mut() {
77 config.remove("id");
78 }
79
80 let codec: Self = self.ty.codec_from_config(config)?;
81
82 Ok(codec)
83 }
84
85 pub fn try_drop(mut self) -> Result<(), ReproducibleWasmCodecError> {
96 let mut store = self
98 .store
99 .get_mut()
100 .map_err(|_| ReproducibleWasmCodecError::Poisoned {
101 codec_id: self.ty.codec_id.clone(),
102 })?;
103
104 let result =
105 self.codec
106 .try_drop(&mut store)
107 .map_err(|source| ReproducibleWasmCodecError::Runtime {
108 codec_id: self.ty.codec_id.clone(),
109 source,
110 });
111 let results = try_drop_instance(store, &self.instance, &self.ty.codec_id);
112
113 result.and(results)
114 }
115
116 #[expect(clippy::significant_drop_tightening)]
117 pub fn instruction_counter(&self) -> Result<Wrapping<u64>, ReproducibleWasmCodecError> {
128 let mut store = self
129 .store
130 .lock()
131 .map_err(|_| ReproducibleWasmCodecError::Poisoned {
132 codec_id: self.ty.codec_id.clone(),
133 })?;
134
135 let cnt = self
136 .instruction_counter
137 .call(store.as_context_mut(), ())
138 .map_err(|err| ReproducibleWasmCodecError::Runtime {
139 codec_id: self.ty.codec_id.clone(),
140 source: RuntimeError::from(err),
141 })?;
142
143 Ok(Wrapping(cnt))
144 }
145}
146
147impl<E: WasmEngine> Clone for ReproducibleWasmCodec<E>
148where
149 Store<(), ReproducibleEngine<E>>: Send,
150{
151 fn clone(&self) -> Self {
152 #[expect(clippy::expect_used)]
153 self.try_clone()
154 .expect("cloning a wasm codec should not fail")
155 }
156}
157
158impl<E: WasmEngine> Drop for ReproducibleWasmCodec<E>
159where
160 Store<(), ReproducibleEngine<E>>: Send,
161{
162 fn drop(&mut self) {
163 let Ok(mut store) = self.store.get_mut() else {
165 return;
166 };
167
168 let result = self.codec.try_drop(&mut store);
169 std::mem::drop(result);
170
171 let results = self.instance.drop(store);
172 std::mem::drop(results);
173 }
174}
175
176impl<E: WasmEngine> Codec for ReproducibleWasmCodec<E>
177where
178 Store<(), ReproducibleEngine<E>>: Send,
179{
180 type Error = ReproducibleWasmCodecError;
181
182 #[expect(clippy::significant_drop_tightening)]
183 fn encode(&self, data: AnyCowArray) -> Result<AnyArray, Self::Error> {
184 let mut store = self
185 .store
186 .lock()
187 .map_err(|_| ReproducibleWasmCodecError::Poisoned {
188 codec_id: self.ty.codec_id.clone(),
189 })?;
190
191 let encoded = self
192 .codec
193 .encode(store.as_context_mut(), data)
194 .map_err(|err| ReproducibleWasmCodecError::Runtime {
195 codec_id: self.ty.codec_id.clone(),
196 source: err,
197 })?
198 .map_err(|err| ReproducibleWasmCodecError::Codec {
199 codec_id: self.ty.codec_id.clone(),
200 source: err,
201 })?;
202
203 Ok(encoded)
204 }
205
206 #[expect(clippy::significant_drop_tightening)]
207 fn decode(&self, encoded: AnyCowArray) -> Result<AnyArray, Self::Error> {
208 let mut store = self
209 .store
210 .lock()
211 .map_err(|_| ReproducibleWasmCodecError::Poisoned {
212 codec_id: self.ty.codec_id.clone(),
213 })?;
214
215 let decoded = self
216 .codec
217 .decode(store.as_context_mut(), encoded)
218 .map_err(|err| ReproducibleWasmCodecError::Runtime {
219 codec_id: self.ty.codec_id.clone(),
220 source: err,
221 })?
222 .map_err(|err| ReproducibleWasmCodecError::Codec {
223 codec_id: self.ty.codec_id.clone(),
224 source: err,
225 })?;
226
227 Ok(decoded)
228 }
229
230 #[expect(clippy::significant_drop_tightening)]
231 fn decode_into(
232 &self,
233 encoded: AnyArrayView,
234 decoded: AnyArrayViewMut,
235 ) -> Result<(), Self::Error> {
236 let mut store = self
237 .store
238 .lock()
239 .map_err(|_| ReproducibleWasmCodecError::Poisoned {
240 codec_id: self.ty.codec_id.clone(),
241 })?;
242
243 self.codec
244 .decode_into(store.as_context_mut(), encoded, decoded)
245 .map_err(|err| ReproducibleWasmCodecError::Runtime {
246 codec_id: self.ty.codec_id.clone(),
247 source: err,
248 })?
249 .map_err(|err| ReproducibleWasmCodecError::Codec {
250 codec_id: self.ty.codec_id.clone(),
251 source: err,
252 })?;
253
254 Ok(())
255 }
256}
257
258impl<E: WasmEngine> DynCodec for ReproducibleWasmCodec<E>
259where
260 Store<(), ReproducibleEngine<E>>: Send,
261{
262 type Type = ReproducibleWasmCodecType<E>;
263
264 fn ty(&self) -> Self::Type {
265 ReproducibleWasmCodecType {
266 codec_id: self.ty.codec_id.clone(),
267 codec_config_schema: self.ty.codec_config_schema.clone(),
268 component: self.ty.component.clone(),
269 component_instantiater: self.ty.component_instantiater.clone(),
270 }
271 }
272
273 fn get_config<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
274 let mut store = self
275 .store
276 .lock()
277 .map_err(|_| ReproducibleWasmCodecError::Poisoned {
278 codec_id: self.ty.codec_id.clone(),
279 })
280 .map_err(serde::ser::Error::custom)?;
281
282 self.codec.get_config(store.as_context_mut(), serializer)
283 }
284}
285
286pub struct ReproducibleWasmCodecType<E: WasmEngine>
288where
289 Store<(), ReproducibleEngine<E>>: Send,
290{
291 pub(super) codec_id: Arc<str>,
292 pub(super) codec_config_schema: Arc<Schema>,
293 pub(super) component: Component,
294 #[expect(clippy::type_complexity)]
295 pub(super) component_instantiater: Arc<
296 dyn Send
297 + Sync
298 + Fn(
299 &Component,
300 &str,
301 ) -> Result<
302 (
303 Store<(), ReproducibleEngine<E>>,
304 Instance,
305 WasmCodecComponent,
306 ),
307 ReproducibleWasmCodecError,
308 >,
309 >,
310}
311
312impl<E: WasmEngine> ReproducibleWasmCodecType<E>
313where
314 Store<(), ReproducibleEngine<E>>: Send,
315{
316 pub fn new(
324 engine: E,
325 wasm_component: impl Into<Vec<u8>>,
326 ) -> Result<Self, ReproducibleWasmCodecError>
327 where
328 E: Send + Sync,
329 Store<(), ReproducibleEngine<E>>: Send + Sync,
330 {
331 let wasm_component = transform_wasm_component(wasm_component).map_err(|err| {
332 ReproducibleWasmCodecError::Runtime {
333 codec_id: Arc::from("<unknown>"),
334 source: RuntimeError::from(err),
335 }
336 })?;
337
338 let engine = Engine::new(ReproducibleEngine::new(engine));
339 let component = Component::new(&engine, &wasm_component).map_err(|err| {
340 ReproducibleWasmCodecError::Runtime {
341 codec_id: Arc::from("<unknown>"),
342 source: RuntimeError::from(err),
343 }
344 })?;
345
346 let component_instantiater = Arc::new(move |component: &Component, codec_id: &str| {
347 let mut store = Store::new(&engine, ());
348
349 let mut linker = Linker::default();
350 stdio::add_to_linker(&mut linker, &mut store).map_err(|err| {
351 ReproducibleWasmCodecError::Runtime {
352 codec_id: Arc::from(codec_id),
353 source: RuntimeError::from(err),
354 }
355 })?;
356 logging::add_to_linker(&mut linker, &mut store).map_err(|err| {
357 ReproducibleWasmCodecError::Runtime {
358 codec_id: Arc::from(codec_id),
359 source: RuntimeError::from(err),
360 }
361 })?;
362
363 let instance = linker.instantiate(&mut store, component).map_err(|err| {
364 ReproducibleWasmCodecError::Runtime {
365 codec_id: Arc::from(codec_id),
366 source: RuntimeError::from(err),
367 }
368 })?;
369
370 let component =
371 WasmCodecComponent::new(&mut store, instance.clone()).map_err(|source| {
372 ReproducibleWasmCodecError::Runtime {
373 codec_id: Arc::from(codec_id),
374 source,
375 }
376 })?;
377
378 Ok((store, instance, component))
379 });
380
381 let (codec_id, codec_config_schema) = {
382 let (mut store, instance, ty): (_, _, WasmCodecComponent) =
383 (component_instantiater)(&component, "<unknown>")?;
384
385 let codec_id = Arc::from(ty.codec_id());
386 let codec_config_schema = Arc::from(ty.codec_config_schema().clone());
387
388 try_drop_instance(&mut store, &instance, &codec_id)?;
389
390 (codec_id, codec_config_schema)
391 };
392
393 Ok(Self {
394 codec_id,
395 codec_config_schema,
396 component,
397 component_instantiater,
398 })
399 }
400}
401
402impl<E: WasmEngine> DynCodecType for ReproducibleWasmCodecType<E>
403where
404 Store<(), ReproducibleEngine<E>>: Send,
405{
406 type Codec = ReproducibleWasmCodec<E>;
407
408 fn codec_id(&self) -> &str {
409 &self.codec_id
410 }
411
412 fn codec_from_config<'de, D: serde::Deserializer<'de>>(
413 &self,
414 config: D,
415 ) -> Result<Self::Codec, D::Error> {
416 let (mut store, instance, component) =
417 (self.component_instantiater)(&self.component, &self.codec_id)
418 .map_err(serde::de::Error::custom)?;
419 let codec = component.codec_from_config(store.as_context_mut(), config)?;
420
421 let PerfWitInterfaces {
422 perf: perf_interface,
423 instruction_counter,
424 } = PerfWitInterfaces::get();
425 let Some(perf_interface) = instance.exports().instance(perf_interface) else {
426 return Err(serde::de::Error::custom(
427 "WASM component does not contain an interface to read the instruction counter",
428 ));
429 };
430 let Some(instruction_counter) = perf_interface.func(instruction_counter) else {
431 return Err(serde::de::Error::custom(
432 "WASM component interface does not contain a function to read the instruction counter"
433 ));
434 };
435 let instruction_counter = instruction_counter.typed().map_err(|err| {
436 serde::de::Error::custom(format!(
437 "WASM component instruction counter function has the wrong signature: {err}"
438 ))
439 })?;
440
441 Ok(ReproducibleWasmCodec {
442 store: Mutex::new(store),
443 instance,
444 codec,
445 ty: Self {
446 codec_id: self.codec_id.clone(),
447 codec_config_schema: self.codec_config_schema.clone(),
448 component: self.component.clone(),
449 component_instantiater: self.component_instantiater.clone(),
450 },
451 instruction_counter,
452 })
453 }
454
455 fn codec_config_schema(&self) -> Schema {
456 (*self.codec_config_schema).clone()
457 }
458}
459
460fn try_drop_instance<T, E: WasmEngine>(
461 store: &mut Store<T, E>,
462 instance: &Instance,
463 codec_id: &str,
464) -> Result<(), ReproducibleWasmCodecError> {
465 let mut errors = instance
466 .drop(store)
467 .map_err(|err| ReproducibleWasmCodecError::Runtime {
468 codec_id: Arc::from(codec_id),
469 source: RuntimeError::from(err),
470 })?;
471
472 let Some(mut err) = errors.pop() else {
473 return Ok(());
474 };
475
476 if !errors.is_empty() {
477 err = err.context(format!("showing one of {} errors", errors.len() + 1));
478 }
479
480 Err(ReproducibleWasmCodecError::Runtime {
481 codec_id: Arc::from(codec_id),
482 source: RuntimeError::from(
483 err.context("dropping instance and all of its resources failed"),
484 ),
485 })
486}