1use downcast_rs::{impl_downcast, Downcast};
3use dyn_clone::{clone_trait_object, DynClone};
4use std::sync::atomic::{AtomicU64, Ordering};
5
6pub trait Isolation:
17 seal::Sealed + Downcast + DynClone + std::fmt::Debug + Send + Sync + 'static
18{
19 fn compatible(&self, other: &dyn Isolation) -> bool;
37
38 fn join(&self, other: &dyn Isolation) -> Option<Box<dyn Isolation>>;
57}
58
59mod seal {
61 pub trait Sealed {}
63 impl<T: super::IsolationHelper> Sealed for T {}
64}
65
66impl_downcast!(Isolation);
67clone_trait_object!(Isolation);
68impl<T: Isolation> From<T> for Box<dyn Isolation> {
69 fn from(isolation: T) -> Self {
70 Box::new(isolation)
71 }
72}
73
74impl<T: IsolationHelper + Clone + std::fmt::Debug + Send + Sync + 'static> Isolation for T {
75 fn compatible(&self, other: &dyn Isolation) -> bool {
76 if let Some(other) = other.as_any().downcast_ref() {
77 self.compatible_same_type(other)
78 } else {
79 false
80 }
81 }
82
83 fn join(&self, other: &dyn Isolation) -> Option<Box<dyn Isolation>> {
84 if let Some(other) = other.as_any().downcast_ref() {
85 self.join_same_type(other)
86 .map(|res| Box::new(res) as Box<dyn Isolation>)
87 } else {
88 None
89 }
90 }
91}
92
93pub trait IsolationHelper: Sized {
105 fn compatible_same_type(&self, other: &Self) -> bool;
112
113 fn join_same_type(&self, other: &Self) -> Option<Self>;
117}
118
119#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
168pub struct IsolationToken(u64);
169
170#[allow(clippy::new_without_default)]
171impl IsolationToken {
172 pub fn new() -> Self {
181 static COUNTER: AtomicU64 = AtomicU64::new(1);
183 let token = COUNTER.fetch_add(1, Ordering::Relaxed);
186 assert!(token < u64::MAX);
187 IsolationToken(token)
188 }
189
190 pub fn no_isolation() -> Self {
196 IsolationToken(0)
197 }
198}
199
200impl IsolationHelper for IsolationToken {
201 fn compatible_same_type(&self, other: &Self) -> bool {
202 self == other
203 }
204 fn join_same_type(&self, other: &Self) -> Option<Self> {
205 if self.compatible_same_type(other) {
206 Some(*self)
207 } else {
208 None
209 }
210 }
211}
212
213macro_rules! tuple_impls {
215 ($(
216 $Tuple:ident {
217 $(($idx:tt) -> $T:ident)+
218 }
219 )+) => {
220 $(
221 impl<$($T:IsolationHelper),+> IsolationHelper for ($($T,)+) {
222 fn compatible_same_type(&self, other: &Self) -> bool {
223 $(self.$idx.compatible_same_type(&other.$idx))&&+
224 }
225
226 fn join_same_type(&self, other: &Self) -> Option<Self> {
227 Some((
228 $(self.$idx.join_same_type(&other.$idx)?,)+
229 ))
230 }
231 }
232 )+
233 }
234}
235
236tuple_impls! {
237 Tuple1 {
238 (0) -> A
239 }
240 Tuple2 {
241 (0) -> A
242 (1) -> B
243 }
244 Tuple3 {
245 (0) -> A
246 (1) -> B
247 (2) -> C
248 }
249 Tuple4 {
250 (0) -> A
251 (1) -> B
252 (2) -> C
253 (3) -> D
254 }
255 Tuple5 {
256 (0) -> A
257 (1) -> B
258 (2) -> C
259 (3) -> D
260 (4) -> E
261 }
262 Tuple6 {
263 (0) -> A
264 (1) -> B
265 (2) -> C
266 (3) -> D
267 (4) -> E
268 (5) -> F
269 }
270 Tuple7 {
271 (0) -> A
272 (1) -> B
273 (2) -> C
274 (3) -> D
275 (4) -> E
276 (5) -> F
277 (6) -> G
278 }
279 Tuple8 {
280 (0) -> A
281 (1) -> B
282 (2) -> C
283 (3) -> D
284 (4) -> E
285 (5) -> F
286 (6) -> G
287 (7) -> H
288 }
289 Tuple9 {
290 (0) -> A
291 (1) -> B
292 (2) -> C
293 (3) -> D
294 (4) -> E
295 (5) -> F
296 (6) -> G
297 (7) -> H
298 (8) -> I
299 }
300 Tuple10 {
301 (0) -> A
302 (1) -> B
303 (2) -> C
304 (3) -> D
305 (4) -> E
306 (5) -> F
307 (6) -> G
308 (7) -> H
309 (8) -> I
310 (9) -> J
311 }
312 Tuple11 {
313 (0) -> A
314 (1) -> B
315 (2) -> C
316 (3) -> D
317 (4) -> E
318 (5) -> F
319 (6) -> G
320 (7) -> H
321 (8) -> I
322 (9) -> J
323 (10) -> K
324 }
325 Tuple12 {
326 (0) -> A
327 (1) -> B
328 (2) -> C
329 (3) -> D
330 (4) -> E
331 (5) -> F
332 (6) -> G
333 (7) -> H
334 (8) -> I
335 (9) -> J
336 (10) -> K
337 (11) -> L
338 }
339}
340
341#[derive(Clone, Debug, derive_builder::Builder)]
346pub struct StreamIsolation {
347 #[builder(default = "Box::new(IsolationToken::no_isolation())")]
349 stream_isolation: Box<dyn Isolation>,
350 #[builder(default = "IsolationToken::no_isolation()")]
353 owner_token: IsolationToken,
354}
355
356impl StreamIsolation {
357 pub fn no_isolation() -> Self {
359 StreamIsolationBuilder::new()
360 .build()
361 .expect("Bug constructing StreamIsolation")
362 }
363
364 pub fn builder() -> StreamIsolationBuilder {
367 StreamIsolationBuilder::new()
368 }
369}
370
371impl IsolationHelper for StreamIsolation {
372 fn compatible_same_type(&self, other: &StreamIsolation) -> bool {
373 self.owner_token == other.owner_token
374 && self
375 .stream_isolation
376 .compatible(other.stream_isolation.as_ref())
377 }
378
379 fn join_same_type(&self, other: &StreamIsolation) -> Option<StreamIsolation> {
380 if self.owner_token != other.owner_token {
381 return None;
382 }
383 self.stream_isolation
384 .join(other.stream_isolation.as_ref())
385 .map(|stream_isolation| StreamIsolation {
386 stream_isolation,
387 owner_token: self.owner_token,
388 })
389 }
390}
391
392impl StreamIsolationBuilder {
393 pub fn new() -> Self {
395 StreamIsolationBuilder::default()
396 }
397}
398
399#[cfg(test)]
400pub(crate) mod test {
401 #![allow(clippy::unwrap_used)]
402 use super::*;
403
404 pub(crate) trait IsolationTokenEq {
407 fn isol_eq(&self, other: &Self) -> bool;
410 }
411
412 macro_rules! assert_isoleq {
413 { $arg1:expr, $arg2:expr } => {
414 assert!($arg1.isol_eq(&$arg2))
415 }
416 }
417 pub(crate) use assert_isoleq;
418
419 impl IsolationTokenEq for IsolationToken {
420 fn isol_eq(&self, other: &Self) -> bool {
421 self == other
422 }
423 }
424
425 impl<T: IsolationTokenEq> IsolationTokenEq for Option<T> {
426 fn isol_eq(&self, other: &Self) -> bool {
427 match (self, other) {
428 (Some(this), Some(other)) => this.isol_eq(other),
429 (None, None) => true,
430 _ => false,
431 }
432 }
433 }
434
435 impl<T: IsolationTokenEq + std::fmt::Debug> IsolationTokenEq for Vec<T> {
436 fn isol_eq(&self, other: &Self) -> bool {
437 if self.len() != other.len() {
438 return false;
439 }
440 self.iter()
441 .zip(other.iter())
442 .all(|(this, other)| this.isol_eq(other))
443 }
444 }
445
446 impl IsolationTokenEq for dyn Isolation {
447 fn isol_eq(&self, other: &Self) -> bool {
448 let this = self.as_any().downcast_ref::<IsolationToken>();
449 let other = other.as_any().downcast_ref::<IsolationToken>();
450 match (this, other) {
451 (Some(this), Some(other)) => this == other,
452 _ => false,
453 }
454 }
455 }
456
457 impl IsolationTokenEq for StreamIsolation {
458 fn isol_eq(&self, other: &Self) -> bool {
459 self.stream_isolation
460 .isol_eq(other.stream_isolation.as_ref())
461 && self.owner_token == other.owner_token
462 }
463 }
464
465 #[derive(PartialEq, Clone, Copy, Debug, Eq)]
466 struct OtherIsolation(usize);
467
468 impl IsolationHelper for OtherIsolation {
469 fn compatible_same_type(&self, other: &Self) -> bool {
470 self == other
471 }
472 fn join_same_type(&self, other: &Self) -> Option<Self> {
473 if self.compatible_same_type(other) {
474 Some(*self)
475 } else {
476 None
477 }
478 }
479 }
480
481 #[test]
482 fn isolation_token() {
483 let token_1 = IsolationToken::new();
484 let token_2 = IsolationToken::new();
485
486 assert!(token_1.compatible_same_type(&token_1));
487 assert!(token_2.compatible_same_type(&token_2));
488 assert!(!token_1.compatible_same_type(&token_2));
489
490 assert_eq!(token_1.join_same_type(&token_1), Some(token_1));
491 assert_eq!(token_2.join_same_type(&token_2), Some(token_2));
492 assert_eq!(token_1.join_same_type(&token_2), None);
493 }
494
495 #[test]
496 fn isolation_trait() {
497 let token_1: Box<dyn Isolation> = Box::new(IsolationToken::new());
498 let token_2: Box<dyn Isolation> = Box::new(IsolationToken::new());
499 let other_1: Box<dyn Isolation> = Box::new(OtherIsolation(0));
500 let other_2: Box<dyn Isolation> = Box::new(OtherIsolation(1));
501
502 assert!(token_1.compatible(token_1.as_ref()));
503 assert!(token_2.compatible(token_2.as_ref()));
504 assert!(!token_1.compatible(token_2.as_ref()));
505
506 assert!(other_1.compatible(other_1.as_ref()));
507 assert!(other_2.compatible(other_2.as_ref()));
508 assert!(!other_1.compatible(other_2.as_ref()));
509
510 assert!(!token_1.compatible(other_1.as_ref()));
511 assert!(!other_1.compatible(token_1.as_ref()));
512
513 assert!(token_1.join(token_1.as_ref()).is_some());
514 assert!(token_1.join(token_2.as_ref()).is_none());
515
516 assert!(other_1.join(other_1.as_ref()).is_some());
517 assert!(other_1.join(other_2.as_ref()).is_none());
518
519 assert!(token_1.join(other_1.as_ref()).is_none());
520 assert!(other_1.join(token_1.as_ref()).is_none());
521 }
522
523 #[test]
524 fn isolation_tuple() {
525 let token_1 = IsolationToken::new();
526 let token_2 = IsolationToken::new();
527 let other_1 = OtherIsolation(0);
528 let other_2 = OtherIsolation(1);
529
530 let token_12: Box<dyn Isolation> = Box::new((token_1, token_2));
531 let token_21: Box<dyn Isolation> = Box::new((token_2, token_1));
532 let mix_11: Box<dyn Isolation> = Box::new((token_1, other_1));
533 let mix_12: Box<dyn Isolation> = Box::new((token_1, other_2));
534 let revmix_11: Box<dyn Isolation> = Box::new((other_1, token_1));
535
536 let join_token = token_12.join(token_12.as_ref()).unwrap();
537 assert!(join_token.compatible(token_12.as_ref()));
538 let join_mix = mix_12.join(mix_12.as_ref()).unwrap();
539 assert!(join_mix.compatible(mix_12.as_ref()));
540
541 let isol_list = [token_12, token_21, mix_11, mix_12, revmix_11];
542
543 for (i, isol1) in isol_list.iter().enumerate() {
544 for (j, isol2) in isol_list.iter().enumerate() {
545 assert_eq!(isol1.compatible(isol2.as_ref()), i == j);
546 }
547 }
548 }
549
550 #[test]
551 fn build_isolation() {
552 let no_isolation = StreamIsolation::no_isolation();
553 let no_isolation2 = StreamIsolation::builder()
554 .owner_token(IsolationToken::no_isolation())
555 .stream_isolation(Box::new(IsolationToken::no_isolation()))
556 .build()
557 .unwrap();
558 assert_eq!(no_isolation.owner_token, no_isolation2.owner_token);
559 assert_eq!(
560 no_isolation
561 .stream_isolation
562 .as_ref()
563 .as_any()
564 .downcast_ref::<IsolationToken>(),
565 no_isolation2
566 .stream_isolation
567 .as_ref()
568 .as_any()
569 .downcast_ref::<IsolationToken>()
570 );
571 assert!(no_isolation.compatible(&no_isolation2));
572
573 let tok = IsolationToken::new();
574 let some_isolation = StreamIsolation::builder().owner_token(tok).build().unwrap();
575 let some_isolation2 = StreamIsolation::builder()
576 .stream_isolation(Box::new(tok))
577 .build()
578 .unwrap();
579 assert!(!no_isolation.compatible(&some_isolation));
580 assert!(!no_isolation.compatible(&some_isolation2));
581 assert!(!some_isolation.compatible(&some_isolation2));
582 assert!(some_isolation.compatible(&some_isolation));
583 }
584}