catlog/stdlib/analyses/
reachability.rs

1//! Reachability analyses of models.
2
3use itertools::Itertools;
4use std::collections::HashMap;
5
6use crate::dbl::modal::model::{ModalDblModel, ModalOb};
7use crate::one::category::FgCategory;
8use crate::zero::QualifiedName;
9
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12#[cfg(feature = "serde-wasm")]
13use tsify::Tsify;
14
15/// Data defining a reachability problem for a Petri net.
16#[derive(Clone)]
17#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
18#[cfg_attr(feature = "serde-wasm", derive(Tsify))]
19#[cfg_attr(
20    feature = "serde-wasm",
21    tsify(into_wasm_abi, from_wasm_abi, hashmap_as_object)
22)]
23pub struct ReachabilityProblemData {
24    /// Map from place IDs to number of initial token of that type.
25    pub tokens: HashMap<QualifiedName, i32>,
26
27    /// Map from place IDs to number of forbidden tokens of that type.
28    pub forbidden: HashMap<QualifiedName, i32>,
29}
30
31/// The "Region Algebra for Petri Nets" algorithm from Ch 31 of
32/// ([Clarke et al 2018](crate::refs::HandbookModelChecking)):
33/// "Symbolic Model Checking in Non Boolean Domains".
34pub fn subreachability(m: &ModalDblModel, data: ReachabilityProblemData) -> bool {
35    // Convert model into a pair of matrices
36    //--------------------------------------
37
38    // Get a canonical ordering of the objects
39    let ob_vec: Vec<_> = m.ob_generators().sorted().collect();
40    let ob_inv: HashMap<_, _> = ob_vec.iter().enumerate().map(|(x, y)| (y.clone(), x)).collect();
41    let n_p = ob_vec.len();
42    if n_p == 0 {
43        return true;
44    }
45
46    // Get a canonical ordering of the homs
47    let hom_vec: Vec<_> = m.mor_generators().sorted().collect();
48
49    let hom_inv: HashMap<_, _> = hom_vec.iter().enumerate().map(|(x, y)| (y.clone(), x)).collect();
50    let n_t = hom_vec.len();
51
52    // Populate the I/O matrices from the hom src/tgt data
53    let mut i_mat = vec![vec![0; n_t]; n_p];
54    let mut o_mat = vec![vec![0; n_t]; n_p];
55
56    for e in m.mor_generators() {
57        let e_idx = *hom_inv.get(&e).unwrap();
58        if let Some(vs) = m.mor_generator_dom(&e).collect_product(None) {
59            for v in vs.iter() {
60                if let ModalOb::Generator(u) = v {
61                    i_mat[*ob_inv.get(u).unwrap()][e_idx] += 1;
62                }
63            }
64        }
65
66        if let Some(vs) = m.mor_generator_cod(&e).collect_product(None) {
67            for v in vs.iter() {
68                if let ModalOb::Generator(u) = v {
69                    o_mat[*ob_inv.get(u).unwrap()][e_idx] += 1;
70                }
71            }
72        }
73    }
74    let (i_mat_, o_mat_) = (&i_mat, &o_mat);
75
76    // Parse input data
77    //-----------------
78    let mut f: Vec<Vec<_>> =
79        vec![ob_vec.iter().map(|u| *data.forbidden.get(u).unwrap_or(&0)).collect()];
80    let init: Vec<_> = ob_vec.iter().map(|u| *data.tokens.get(u).unwrap_or(&0)).collect();
81
82    // Apply recursive algorithm until fix point
83    //------------------------------------------
84    loop {
85        // For each transition + region (in `f`) pair `(t,v)`, compute the
86        // region that accesses `v` via firing `t`.
87        let pre: Vec<Vec<_>> = (0..n_t)
88            .flat_map(|t| {
89                f.iter().map(move |v| {
90                    (0..n_p).map(move |p| {
91                        std::cmp::max(i_mat_[p][t], v[p] - (o_mat_[p][t] - i_mat_[p][t]))
92                    })
93                })
94            })
95            .map(|z| z.collect())
96            .collect();
97
98        // Filter `pre` for regions which are not already in `f`.
99        let newstuff: Vec<Vec<_>> = pre
100            .into_iter()
101            .filter(|v| f.iter().all(|old| (0..n_p).any(|p| v[p] < old[p])))
102            .unique()
103            .collect();
104
105        // We have terminated when there is nothing new generated by `pre`
106        if newstuff.is_empty() {
107            break;
108        }
109
110        // Update f with new stuff and remove extraneous old stuff
111        f.retain(|v| newstuff.iter().all(|n| (0..n_p).any(|p| v[p] < n[p])));
112        f.extend(newstuff);
113    }
114
115    // Check whether input tokening lies within the region which can access
116    // the forbidden state, `init`.
117    let init_in_forbbiden = f.iter().any(|v| (0..n_p).all(|p| v[p] <= init[p]));
118    !init_in_forbbiden
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124    use crate::dbl::{model::*, theory::*};
125    use crate::stdlib::th_sym_monoidal_category;
126    use crate::zero::name;
127    use std::rc::Rc;
128
129    /// The example petri net has the following structure
130    ///                      t1 t2  t3   
131    /// let i_mat = vec![vec![0, 1, 0],  p1
132    ///                  vec![0, 1, 0],  p2
133    ///                  vec![0, 0, 1]]; p3
134    ///
135    /// let o_mat = vec![vec![1, 0, 0],  p1
136    ///                  vec![0, 0, 1],  p2
137    ///                  vec![0, 1, 0]]; p3
138    ///
139    /// (WARNING: the petri net is drawn incorrectly in Handbook of Model Checking)
140    ///
141    /// Let the forbidden state be (0,0,2).
142    ///
143    /// The algorithm terminates in four steps:
144    /// [(0,0,2)] -> [(0,0,2),(1,1,1)] -> [(0,0,2),(0,1,1),(2,2,0)]
145    /// -> [(0,0,2),(0,1,1),(0,2,0)]
146    /// So the three ways one can reach the forbidden state are:
147    /// 1.) starting in the forbidden state (or any superset)
148    /// 2.) having two tokens in p2
149    /// 3.) having one token in each p2 and p3
150    ///
151    /// Consider using algorithm from "Minimal Coverability Tree Construction
152    /// Made Complete and Efficient" for a more efficient algorithm which allows
153    /// "inf" as a possible specification of an invalid state.
154    #[test]
155    fn validate_subreachability() {
156        // Define a petri net
157        let th = Rc::new(th_sym_monoidal_category());
158        let (ob_type, op) = (ModalObType::new(name("Object")), name("tensor"));
159        let mut model = ModalDblModel::new(th);
160        let (p1, p2, p3, t1, t2, t3) =
161            (name("p1"), name("p2"), name("p3"), name("t1"), name("t2"), name("t3"));
162        model.add_ob(p1.clone(), ob_type.clone());
163        model.add_ob(p2.clone(), ob_type.clone());
164        model.add_ob(p3.clone(), ob_type.clone());
165        let [x, y, z] = [p1.clone(), p2.clone(), p3.clone()].map(ModalOb::from);
166        model.add_mor(
167            t1,
168            ModalOb::App(ModalOb::List(List::Symmetric, vec![]).into(), op.clone()),
169            ModalOb::App(ModalOb::List(List::Symmetric, vec![x.clone()]).into(), op.clone()),
170            ModalMorType::Zero(ob_type.clone()),
171        );
172        model.add_mor(
173            t2,
174            ModalOb::App(
175                ModalOb::List(List::Symmetric, vec![x.clone(), y.clone()]).into(),
176                op.clone(),
177            ),
178            ModalOb::App(ModalOb::List(List::Symmetric, vec![z.clone()]).into(), op.clone()),
179            ModalMorType::Zero(ob_type.clone()),
180        );
181        model.add_mor(
182            t3,
183            ModalOb::App(ModalOb::List(List::Symmetric, vec![z.clone()]).into(), op.clone()),
184            ModalOb::App(ModalOb::List(List::Symmetric, vec![y.clone()]).into(), op.clone()),
185            ModalMorType::Zero(ob_type),
186        );
187
188        // Test starting configurations, see if (0,0,2) is reachable subtokening
189        fn test_input(m: &ModalDblModel, x1: i32, x2: i32, x3: i32, expect: bool) {
190            let (p1, p2, p3) = (name("p1"), name("p2"), name("p3"));
191            let forbidden = HashMap::from_iter([(p1.clone(), 0), (p2.clone(), 0), (p3.clone(), 2)]);
192
193            let data = ReachabilityProblemData {
194                tokens: HashMap::from_iter([(p1, x1), (p2, x2), (p3, x3)]),
195                forbidden: forbidden.clone(),
196            };
197            assert_eq!(subreachability(m, data), expect);
198        }
199
200        test_input(&model, 0, 0, 2, false);
201        test_input(&model, 0, 1, 1, false);
202        test_input(&model, 0, 2, 0, false);
203        test_input(&model, 1, 0, 1, true);
204        test_input(&model, 1, 1, 0, true);
205    }
206}