ndgrid/grid/
parallel_builder.rs

1//! Parallel grid builder
2
3use super::ParallelGridImpl;
4use crate::{
5    traits::{
6        Builder, Entity, GeometryBuilder, Grid, GridBuilder, ParallelBuilder, Topology,
7        TopologyBuilder,
8    },
9    types::{GraphPartitioner, Ownership},
10};
11use itertools::{Itertools, izip};
12use mpi::{
13    collective::SystemOperation,
14    traits::{Buffer, Communicator, CommunicatorCollectives, Equivalence, Root},
15};
16use std::collections::{HashMap, HashSet};
17
18#[cfg(feature = "coupe")]
19use coupe::{KMeans, Partition, Point3D};
20
21#[cfg(feature = "scotch")]
22use scotch::{Architecture, Graph, Strategy, graph};
23
24// A simple struct to hold chunked data. The data is a long
25// array and `idx_bounds` is a vector sunch that `chunks[idx_bounds[i]..idx_bounds[i+1]]`
26// is one chunk. The last element of `idx_bounds` is the length of the data array.
27struct ChunkedData<T: Equivalence + Copy> {
28    data: Vec<T>,
29    idx_bounds: Vec<usize>,
30}
31
32impl<T: Equivalence + Copy> ChunkedData<T> {
33    // Return a vector with the number of elements per chunk.
34    fn counts(&self) -> Vec<usize> {
35        self.idx_bounds
36            .iter()
37            .tuple_windows()
38            .map(|(a, b)| b - a)
39            .collect()
40    }
41}
42
43impl<B: Builder + GeometryBuilder + TopologyBuilder + GridBuilder> ParallelBuilder for B
44where
45    Vec<B::T>: Buffer,
46    B::T: Equivalence,
47    Vec<B::EntityDescriptor>: Buffer,
48    B::EntityDescriptor: Equivalence,
49    B::Grid: Sync,
50{
51    type ParallelGrid<'a, C: Communicator + 'a>
52        = ParallelGridImpl<'a, C, B::Grid>
53    where
54        Self: 'a;
55    fn create_parallel_grid_root<'a, C: Communicator>(
56        &self,
57        comm: &'a C,
58        partitioner: GraphPartitioner,
59    ) -> ParallelGridImpl<'a, C, B::Grid> {
60        // If we have only a single process we can just create a serial grid.
61        if comm.size() == 1 {
62            let serial_grid = self.create_grid();
63
64            // Need to fill ownership and global indices now.
65
66            let mut owners = HashMap::new();
67            let mut global_indices = HashMap::new();
68
69            for dim in 0..self.tdim() {
70                for t in serial_grid.entity_types(dim) {
71                    let entity_count = serial_grid.entity_iter(*t).count();
72                    owners.insert(*t, vec![Ownership::Owned; entity_count]);
73                    global_indices.insert(*t, (0..entity_count).collect_vec());
74                }
75            }
76
77            return ParallelGridImpl::new(comm, serial_grid, owners, global_indices);
78        }
79        // Partition the cells
80        let cell_owners = self.partition_cells(comm.size() as usize, partitioner);
81
82        // Each vertex is assigned the minimum process that has a cell that contains it.
83        // Note that the array is of the size of the points in the grid and contains garbage information
84        // for points that are not vertices.
85        let vertex_owners = self.assign_vertex_owners(comm.size() as usize, &cell_owners);
86
87        // This distributes cells, vertices and points to processes.
88        // Each process gets now only the cell that it owns via `cell_owners` but also its neighbours.
89        // Then all the corresponding vertices and points are also added to the corresponding cell.
90        // Each return value is a tuple consisting of a counts array that specifies how many indices are
91        // assigned to each process and the actual indices.
92        let (vertices_per_proc, points_per_proc, cells_per_proc) =
93            self.get_vertices_points_and_cells(comm.size() as usize, &cell_owners);
94
95        // Compute the chunks array for the coordinates associated with the points.
96        // The idx array for the coords is simply `self.gdim()` times the idx array for the points.
97        let coords_per_proc = ChunkedData {
98            data: {
99                let mut tmp =
100                    Vec::with_capacity(self.gdim() * points_per_proc.idx_bounds.last().unwrap());
101                points_per_proc
102                    .data
103                    .iter()
104                    .for_each(|point| tmp.extend(self.point(*point).iter()));
105                tmp
106            },
107            idx_bounds: points_per_proc
108                .idx_bounds
109                .iter()
110                .map(|x| self.gdim() * x)
111                .collect(),
112        };
113
114        // This compputes for each process the vertex owners.
115        let vertex_owners_per_proc = ChunkedData::<usize> {
116            data: {
117                let mut tmp = Vec::with_capacity(*vertices_per_proc.idx_bounds.last().unwrap());
118                vertices_per_proc
119                    .data
120                    .iter()
121                    .for_each(|v| tmp.push(vertex_owners[*v]));
122                tmp
123            },
124            idx_bounds: vertices_per_proc.idx_bounds.clone(),
125        };
126
127        // We now compute the cell information for each process.
128
129        // First we need the cell type.
130        let cell_types_per_proc = ChunkedData {
131            data: {
132                let mut tmp = Vec::with_capacity(*cells_per_proc.idx_bounds.last().unwrap());
133                cells_per_proc
134                    .data
135                    .iter()
136                    .for_each(|c| tmp.push(self.cell_type(*c)));
137                tmp
138            },
139            idx_bounds: cells_per_proc.idx_bounds.clone(),
140        };
141
142        // Now we need the cell degrees.
143        let cell_degrees_per_proc = ChunkedData {
144            data: {
145                let mut tmp = Vec::with_capacity(*cells_per_proc.idx_bounds.last().unwrap());
146                cells_per_proc
147                    .data
148                    .iter()
149                    .for_each(|c| tmp.push(self.cell_degree(*c)));
150                tmp
151            },
152            idx_bounds: cells_per_proc.idx_bounds.clone(),
153        };
154
155        // Now need the cell owners.
156        let cell_owners_per_proc = ChunkedData {
157            data: {
158                let mut tmp = Vec::with_capacity(*cells_per_proc.idx_bounds.last().unwrap());
159                cells_per_proc
160                    .data
161                    .iter()
162                    .for_each(|c| tmp.push(cell_owners[*c]));
163                tmp
164            },
165            idx_bounds: cells_per_proc.idx_bounds.clone(),
166        };
167
168        // Finally the cell points. These are more messy since each cell might have different numbers of points. Hence,
169        // we need to compute a new counts array.
170        let cell_points_per_proc = {
171            // First compute the total number of points needed so that we can pre-allocated the array.
172            let total_points = cells_per_proc
173                .data
174                .iter()
175                .map(|c| self.npts(self.cell_type(*c), self.cell_degree(*c)))
176                .sum();
177            let mut data = Vec::with_capacity(total_points);
178            let mut counts = vec![0; comm.size() as usize];
179            for (proc, (&chunk_ind_start, &chunk_ind_end)) in
180                cells_per_proc.idx_bounds.iter().tuple_windows().enumerate()
181            {
182                for c in cells_per_proc.data[chunk_ind_start..chunk_ind_end].iter() {
183                    let npts = self.npts(self.cell_type(*c), self.cell_degree(*c));
184                    counts[proc] += npts;
185                    data.extend(self.cell_points(*c).iter());
186                }
187            }
188            // Now turn the counts into an `idx_bounds` array.
189            let mut idx_bounds = counts
190                .iter()
191                .scan(0, |state, &x| {
192                    let old = *state;
193                    *state += x;
194                    Some(old)
195                })
196                .collect_vec();
197            idx_bounds.push(idx_bounds.last().unwrap() + counts.last().unwrap());
198
199            // Finally return the chunks
200            ChunkedData { data, idx_bounds }
201        };
202
203        // Now we send everything across the processes. Every _per_proc variable is scattered across
204        // the processes. After the scatter operation we have on each process the following data.
205        // - `point_indices` contains the indices of the points that are owned by the current process.
206        // - `coordinates` contains the coordinates of the points that are owned by the current process.
207        // - `vertex_indices` contains the indices of the vertices that are owned by the current process.
208        // - `vertex_owners` contains the owners of the vertices that are owned by the current process.
209        // - `cell_indices` contains the indices of the cells that are owned by the current process.
210        // - `cell_points` contains the indices of the points of the cells that are owned by the current process.
211        // - `cell_types` contains the types of the cells that are owned by the current process.
212        // - `cell_degrees` contains the degrees of the cells that are owned by the current process.
213        // - `cell_owners` contains the owners of the cells that are owned by the current process.
214        let point_indices = scatterv_root(comm, &points_per_proc);
215        let coordinates = scatterv_root(comm, &coords_per_proc);
216        let vertex_indices = scatterv_root(comm, &vertices_per_proc);
217        let vertex_owners = scatterv_root(comm, &vertex_owners_per_proc);
218        let cell_indices = scatterv_root(comm, &cells_per_proc);
219        let cell_points = scatterv_root(comm, &cell_points_per_proc);
220        let cell_types = scatterv_root(comm, &cell_types_per_proc);
221        let cell_degrees = scatterv_root(comm, &cell_degrees_per_proc);
222        let cell_owners = scatterv_root(comm, &cell_owners_per_proc);
223
224        // This is executed on all ranks and creates the local grid.
225        self.create_parallel_grid_internal(
226            comm,
227            point_indices,
228            coordinates,
229            vertex_indices,
230            vertex_owners,
231            cell_indices,
232            cell_points,
233            cell_types,
234            cell_degrees,
235            cell_owners,
236        )
237    }
238    fn create_parallel_grid<'a, C: Communicator>(
239        &self,
240        comm: &'a C,
241        root_rank: i32,
242    ) -> ParallelGridImpl<'a, C, B::Grid> {
243        // First we receive all the data.
244        let root_rank = root_rank as usize;
245        let point_indices = scatterv(comm, root_rank);
246        let coordinates = scatterv(comm, root_rank);
247        let vertex_indices = scatterv(comm, root_rank);
248        let vertex_owners = scatterv(comm, root_rank);
249        let cell_indices = scatterv(comm, root_rank);
250        let cell_points = scatterv(comm, root_rank);
251        let cell_types = scatterv(comm, root_rank);
252        let cell_degrees = scatterv(comm, root_rank);
253        let cell_owners = scatterv(comm, root_rank);
254
255        // Now we reassemble the grid.
256
257        self.create_parallel_grid_internal(
258            comm,
259            point_indices,
260            coordinates,
261            vertex_indices,
262            vertex_owners,
263            cell_indices,
264            cell_points,
265            cell_types,
266            cell_degrees,
267            cell_owners,
268        )
269    }
270}
271
272trait ParallelBuilderFunctions: Builder + GeometryBuilder + TopologyBuilder + GridBuilder {
273    //! Parallel builder functions
274    //!
275    //! These functions are included in a trait so that they can be implemented for an arbitrary builder B below
276
277    /// Intrernal function to create parallel grid
278    #[allow(clippy::too_many_arguments)]
279    fn create_parallel_grid_internal<'a, C: Communicator>(
280        &self,
281        comm: &'a C,
282        point_indices: Vec<usize>,
283        coordinates: Vec<Self::T>,
284        vertex_indices: Vec<usize>,
285        vertex_owners: Vec<usize>,
286        cell_indices: Vec<usize>,
287        cell_points: Vec<usize>,
288        cell_types: Vec<Self::EntityDescriptor>,
289        cell_degrees: Vec<usize>,
290        cell_owners: Vec<usize>,
291    ) -> ParallelGridImpl<'a, C, Self::Grid>
292    where
293        Self::Grid: Sync,
294    {
295        let rank = comm.rank() as usize;
296        // First we want to reorder everything so that owned data comes first in the arrays.
297
298        // Reorder the vertices.
299        let (vertex_indices, vertex_owners) = {
300            let mut new_indices = Vec::<usize>::with_capacity(vertex_indices.len());
301            let mut new_owners = Vec::<usize>::with_capacity(vertex_owners.len());
302
303            for (&v, &o) in izip!(&vertex_indices, &vertex_owners).filter(|&(_, &o)| o == rank) {
304                new_indices.push(v);
305                new_owners.push(o);
306            }
307
308            for (&v, &o) in izip!(&vertex_indices, &vertex_owners).filter(|&(_, &o)| o != rank) {
309                new_indices.push(v);
310                new_owners.push(o);
311            }
312
313            (new_indices, new_owners)
314        };
315
316        // Reorder the cell information. However, things are a bit messy with the cell points as each cell
317        // may have a different number of points attached. Hence, we need to compute a new counts array for the cell points.
318
319        let cell_point_owners = {
320            let mut new_owners = Vec::<usize>::with_capacity(cell_points.len());
321            // We now iterate through the cells and their owners and assign owners accordingly for the points.
322            for (o, ct, cd) in izip!(cell_owners.iter(), cell_types.iter(), cell_degrees.iter()) {
323                let npts = self.npts(*ct, *cd);
324                for _ in 0..npts {
325                    new_owners.push(*o);
326                }
327            }
328            new_owners
329        };
330
331        // First we reorder the cells, degrees, types and owners.
332
333        let (cell_indices, cell_types, cell_degrees, cell_owners) = {
334            let mut new_indices = Vec::<usize>::with_capacity(cell_indices.len());
335            let mut new_types = Vec::<Self::EntityDescriptor>::with_capacity(cell_types.len());
336            let mut new_degrees = Vec::<usize>::with_capacity(cell_degrees.len());
337            let mut new_owners = Vec::<usize>::with_capacity(cell_owners.len());
338
339            for (cell_index, cell_type, cell_degree, cell_owner) in
340                izip!(&cell_indices, &cell_types, &cell_degrees, &cell_owners,)
341                    .filter(|&(_, _, _, &o)| o == rank)
342            {
343                new_indices.push(*cell_index);
344                new_types.push(*cell_type);
345                new_degrees.push(*cell_degree);
346                new_owners.push(*cell_owner);
347            }
348
349            for (cell_index, cell_type, cell_degree, cell_owner) in
350                izip!(&cell_indices, &cell_types, &cell_degrees, &cell_owners,)
351                    .filter(|&(_, _, _, &o)| o != rank)
352            {
353                new_indices.push(*cell_index);
354                new_types.push(*cell_type);
355                new_degrees.push(*cell_degree);
356                new_owners.push(*cell_owner);
357            }
358
359            (new_indices, new_types, new_degrees, new_owners)
360        };
361
362        // Finally, we reorder the cell points.
363        let cell_points = {
364            let mut new_points = Vec::<usize>::with_capacity(cell_points.len());
365            for &p in izip!(&cell_points, &cell_point_owners)
366                .filter(|&(_, &o)| o == rank)
367                .map(|(p, _)| p)
368            {
369                new_points.push(p);
370            }
371
372            for &p in izip!(&cell_points, &cell_point_owners)
373                .filter(|&(_, &o)| o != rank)
374                .map(|(p, _)| p)
375            {
376                new_points.push(p);
377            }
378            new_points
379        };
380
381        // Everything is properly sorted now. Can generate the local grids.
382        // Need to check those routines. Implementation is not efficient right now.
383        let geometry = self.create_geometry(
384            &point_indices,
385            &coordinates,
386            &cell_points,
387            &cell_types,
388            &cell_degrees,
389        );
390        let topology = self.create_topology(
391            vertex_indices.clone(),
392            cell_indices.clone(),
393            &cell_points,
394            &cell_types,
395        );
396
397        let serial_grid = self.create_grid_from_topology_geometry(topology, geometry);
398
399        // Create global indices and proper ownership information with ghost process and local index
400        // on ghost process.
401        // After execution `vertex_global_indices` and `cell_global_indices` have the global indices
402        // of the `vertex_indices` and `cell_indices` arrays.
403        // The global indices are not necessarily identical to the original grid indices.
404        let (vertex_global_indices, vertex_owners) =
405            synchronize_entities(comm, 1, &vertex_indices, &vertex_owners);
406        let (cell_global_indices, cell_owners) =
407            synchronize_entities(comm, 1, &cell_indices, &cell_owners);
408
409        // For later we need a map from vertex global indices to vertex owners.
410        let mut vertex_global_indices_to_owners = HashMap::<usize, usize>::new();
411        for (global_index, owner) in izip!(&vertex_global_indices, &vertex_owners) {
412            let owner_rank = match *owner {
413                Ownership::Owned => rank,
414                Ownership::Ghost(p, _) => p,
415                _ => panic!("Unsupported ownership: {owner:?}"),
416            };
417            vertex_global_indices_to_owners.insert(*global_index, owner_rank);
418        }
419
420        let point_type = serial_grid.entity_types(0)[0];
421        let mut owners = HashMap::new();
422        owners.insert(point_type, vertex_owners);
423        let mut global_indices = HashMap::new();
424        global_indices.insert(point_type, vertex_global_indices.clone());
425        for dim in 1..self.tdim() {
426            for t in serial_grid.entity_types(dim) {
427                // We now iterate through the entities of varying dimensions and assign global indices and ownership.
428                // First we need to get out the vertices assigned with each entity and figure out the ownership of the entity.
429                // Ownership is determined by the minimum process that owns one of the vertices of the entity.
430
431                // We get out the chunk length by probing the first entity. This only works because the grid has a single
432                // element type.
433                let chunk_length = serial_grid
434                    .entity(*t, 0)
435                    .unwrap()
436                    .topology()
437                    .sub_entity_iter(point_type)
438                    .count();
439
440                // We have the chunk length. So iterate through to get all the vertices.
441                let number_of_entities = serial_grid.entity_iter(*t).count();
442                let mut entities = Vec::with_capacity(number_of_entities * chunk_length);
443                let mut entity_ranks = Vec::with_capacity(number_of_entities * chunk_length);
444                for entity in serial_grid.entity_iter(*t) {
445                    // We iterate through the vertices of the entity and get the global indices of the vertices.
446                    // This works because the topology returns positions into the vertex array.
447                    // Hence, we can use those indices to map to the global indices.
448                    // We also sort everything since later we use the sorted global indices as keys in a hash map
449                    // within the `synchronize_entities` function.
450                    let vertices = entity
451                        .topology()
452                        .sub_entity_iter(point_type)
453                        .map(|v| vertex_global_indices[v])
454                        .sorted()
455                        .collect_vec();
456                    let owner = *vertices
457                        .iter()
458                        .map(|v| vertex_global_indices_to_owners.get(v).unwrap())
459                        .min()
460                        .unwrap();
461                    entities.extend(vertices);
462                    entity_ranks.push(owner);
463                }
464
465                // We now have the entities and their owners. We can synchronize the entities to get the global
466                // indices and ownership information.
467
468                let (entity_global_indices, entity_owners) =
469                    synchronize_entities(comm, chunk_length, &entities, &entity_ranks);
470
471                owners.insert(*t, entity_owners);
472                global_indices.insert(*t, entity_global_indices);
473            }
474        }
475        for (cell_type, owner, index) in izip!(cell_types, cell_owners, cell_global_indices) {
476            owners.entry(cell_type).or_insert(vec![]).push(owner);
477            global_indices
478                .entry(cell_type)
479                .or_insert(vec![])
480                .push(index);
481        }
482
483        ParallelGridImpl::new(comm, serial_grid, owners, global_indices)
484    }
485
486    /// Partition the cells
487    fn partition_cells(&self, nprocesses: usize, partitioner: GraphPartitioner) -> Vec<usize> {
488        // Create an initial partitioning that roughly assigns the same
489        // number of cells to each process.
490        let mut partition = vec![];
491        for i in 0..nprocesses {
492            while partition.len() < self.cell_count() * (i + 1) / nprocesses {
493                partition.push(i);
494            }
495        }
496
497        match partitioner {
498            GraphPartitioner::None => partition,
499            GraphPartitioner::Manual(p) => p,
500            #[cfg(feature = "coupe")]
501            GraphPartitioner::Coupe => {
502                // Compute the midpoints of each cell. If the geometric dimension is smaller than 3
503                // then the remaining dimensions are just set to 0.
504                let midpoints = (0..self.cell_count())
505                    .map(|i| {
506                        let v = self.cell_points(i);
507                        Point3D::new(
508                            if self.gdim() > 0 {
509                                num::cast::<Self::T, f64>(
510                                    v.iter().map(|j| self.point(*j)[0]).sum::<Self::T>(),
511                                )
512                                .unwrap()
513                                    / v.len() as f64
514                            } else {
515                                0.0
516                            },
517                            if self.gdim() > 1 {
518                                num::cast::<Self::T, f64>(
519                                    v.iter().map(|j| self.point(*j)[1]).sum::<Self::T>(),
520                                )
521                                .unwrap()
522                                    / v.len() as f64
523                            } else {
524                                0.0
525                            },
526                            if self.gdim() > 2 {
527                                num::cast::<Self::T, f64>(
528                                    v.iter().map(|j| self.point(*j)[2]).sum::<Self::T>(),
529                                )
530                                .unwrap()
531                                    / v.len() as f64
532                            } else {
533                                0.0
534                            },
535                        )
536                    })
537                    .collect::<Vec<_>>();
538
539                // Assign equal weights to each cell
540                let weights = vec![1.0; self.point_count()];
541
542                // Run the Coupe KMeans algorithm to create the partitioning. Maybe replace
543                // by Metis at some point.
544                KMeans {
545                    delta_threshold: 0.0,
546                    ..Default::default()
547                }
548                .partition(&mut partition, (&midpoints, &weights))
549                .unwrap();
550                // Return the new partitioning.
551                partition
552            }
553            #[cfg(feature = "scotch")]
554            GraphPartitioner::Scotch => {
555                let mut vertex_to_cells = HashMap::<usize, HashSet<i32>>::new();
556                for i in 0..self.cell_count() {
557                    for v in self.cell_vertices(i) {
558                        vertex_to_cells.entry(*v).or_default().insert(i as i32);
559                    }
560                }
561
562                let mut endpoints = vec![0];
563                let mut connectivity = vec![];
564                for i in 0..self.cell_count() {
565                    let end = endpoints[endpoints.len() - 1] as usize;
566                    for v in self.cell_vertices(i) {
567                        for c in vertex_to_cells.get(v).unwrap() {
568                            if !connectivity[end..].contains(c) {
569                                connectivity.push(*c)
570                            }
571                        }
572                    }
573                    endpoints.push(connectivity.len() as i32);
574                }
575
576                let mut strategy = Strategy::new(); // use the default strategy.
577                let arch = Architecture::complete(nprocesses as i32);
578
579                let data = graph::Data::new(
580                    0,
581                    &endpoints[..endpoints.len() - 1],
582                    &endpoints[1..],
583                    &[],
584                    &[],
585                    &connectivity,
586                    &[],
587                );
588                let mut graph = Graph::build(&data).expect("Could not build graph");
589                let (vertnbr, _edgenbr) = graph.size();
590                let mut parttab = vec![0; vertnbr as usize];
591                let _ = graph
592                    .mapping(&arch, &mut parttab)
593                    .compute(&mut strategy)
594                    .expect("");
595                parttab.iter().map(|i| *i as usize).collect::<Vec<_>>()
596            }
597        }
598    }
599
600    /// Assign vertex owners
601    fn assign_vertex_owners(&self, nprocesses: usize, cell_owners: &[usize]) -> Vec<usize> {
602        // Initialise empty array with number of processes as value and length same as number of points.
603        let mut vertex_owners = vec![nprocesses; self.point_count()];
604
605        // Each vertex is owned by the minimum process that owns a cell that contains the vertex.
606        for (i, owner) in cell_owners.iter().enumerate() {
607            for v in self.cell_vertices(i) {
608                vertex_owners[*v] = std::cmp::min(vertex_owners[*v], *owner);
609            }
610        }
611
612        // Return the vertex owners. Note that the array contains garbage information
613        // for the points of the grids that are not vertices.
614        vertex_owners
615    }
616
617    /// Compute the vertices and cells that each process needs access to.
618    /// The function returns three tuples, vertices, points, and cells.
619    /// The first vector of each tuple contains the number of entities on each process
620    /// and is of length `nprocesses`. The second tuple contains a flattened array
621    /// of all the entity indices aligned according to the counts in the first array.
622    #[allow(clippy::type_complexity)]
623    fn get_vertices_points_and_cells(
624        &self,
625        nprocesses: usize,
626        cell_owners: &[usize],
627    ) -> (ChunkedData<usize>, ChunkedData<usize>, ChunkedData<usize>) {
628        // The following maps each vertex index to a set of cells that contain it.
629        let mut vertex_to_cells = HashMap::<usize, HashSet<usize>>::new();
630
631        // For each cell, go through its vertices and add the cell to the cell-list of the vertex.
632        for i in 0..self.cell_count() {
633            for v in self.cell_vertices(i) {
634                vertex_to_cells.entry(*v).or_default().insert(i);
635            }
636        }
637
638        // This instantiates a vector with `nprocesses` empty sets.
639        let map_creator =
640            || -> Vec<HashSet<usize>> { (0..nprocesses).map(|_| HashSet::new()).collect_vec() };
641
642        // Each of these is a map from process to a set of indices.
643        let mut vertices = map_creator();
644        let mut points = map_creator();
645        let mut cells = map_creator();
646
647        // This assigns cell to processes. It iterates through a cell and associates
648        // not only the cell itself to the process that owns it but also adds all the
649        // neighboring cells to the same process. This means that processes own not
650        // only the original cells from `cell_owners` but also all the neighbouring cells.
651        // Also this means that a cell can live on multiple processes.
652        for (i, owner) in cell_owners.iter().enumerate() {
653            for v in self.cell_vertices(i) {
654                // Add all cells that are connected to the vertex to the same process.
655                cells[*owner].extend(vertex_to_cells.get(v).expect("Vertex not found."));
656            }
657        }
658
659        // Now go through and add the points and vertices to the same processes that also the cells are
660        // assigned to.
661        for (proc, cells) in cells.iter().enumerate() {
662            for cell in cells {
663                for v in self.cell_vertices(*cell) {
664                    vertices[proc].insert(*v);
665                }
666                for p in self.cell_points(*cell) {
667                    points[proc].insert(*p);
668                }
669            }
670        }
671
672        // The following function flattens an array of sets and returns the counts and the flattened array
673        // as ChunkedData struct.
674        let flatten = |data: Vec<HashSet<usize>>| -> ChunkedData<usize> {
675            let mut idx_bounds = data
676                .iter()
677                .scan(0, |acc, x| {
678                    let old = *acc;
679                    *acc += x.len();
680                    Some(old)
681                })
682                .collect_vec();
683            idx_bounds.push(idx_bounds.last().unwrap() + data.last().unwrap().len());
684            let mut tmp = Vec::<usize>::with_capacity(*idx_bounds.last().unwrap());
685            for s in data {
686                // Each set itself has a random ordering. This makes runs of the method
687                // not reproducible. The easiest fix is to sort the set before adding it to `tmp`.
688                let mut to_sort = Vec::<usize>::with_capacity(s.len());
689                to_sort.extend(s.iter());
690                to_sort.sort();
691                tmp.extend(to_sort);
692            }
693            ChunkedData {
694                data: tmp,
695                idx_bounds,
696            }
697        };
698
699        (flatten(vertices), flatten(points), flatten(cells))
700    }
701}
702
703impl<B: Builder + GeometryBuilder + TopologyBuilder + GridBuilder> ParallelBuilderFunctions for B
704where
705    Vec<B::T>: Buffer,
706    B::T: Equivalence,
707    Vec<B::EntityDescriptor>: Buffer,
708    B::EntityDescriptor: Equivalence,
709{
710}
711
712fn scatterv_root<T: Equivalence + Copy>(
713    comm: &impl Communicator,
714    chunks_per_proc: &ChunkedData<T>,
715) -> Vec<T> {
716    let rank = comm.rank() as usize;
717    let size = comm.size() as usize;
718
719    let send_counts = chunks_per_proc
720        .counts()
721        .iter()
722        .map(|&x| x as i32)
723        .collect_vec();
724
725    let mut recv_count: i32 = 0;
726    let mut recvbuf: Vec<T> = Vec::<T>::with_capacity(send_counts[rank] as usize);
727    // This avoids having the pre-initialise the array. We simply transmute the spare capacity
728    // into a valid reference and later manually set the length of the array to the full capacity.
729    let recvbuf_ref: &mut [T] = unsafe { std::mem::transmute(recvbuf.spare_capacity_mut()) };
730
731    // The idx-bounds are one too long as their last element is the total number of
732    // elements. We don't want this for the displacements.
733    let displacements = chunks_per_proc.idx_bounds[0..size]
734        .iter()
735        .map(|&x| x as i32)
736        .collect_vec();
737
738    // Now scatter the counts to each process.
739    comm.this_process()
740        .scatter_into_root(&send_counts, &mut recv_count);
741
742    // We now prepare the send partition of the variable length data.
743    let send_partition =
744        mpi::datatype::Partition::new(&chunks_per_proc.data, &send_counts[..], &displacements[..]);
745
746    // And now we send the partition.
747    comm.this_process()
748        .scatter_varcount_into_root(&send_partition, recvbuf_ref);
749
750    unsafe { recvbuf.set_len(send_counts[rank] as usize) };
751
752    recvbuf
753}
754
755// Receive the scattered data from `root`.
756fn scatterv<T: Equivalence + Copy>(comm: &impl Communicator, root: usize) -> Vec<T> {
757    let mut recv_count: i32 = 0;
758
759    // First we need to receive the number of elements that we are about to get.
760    comm.process_at_rank(root as i32)
761        .scatter_into(&mut recv_count);
762
763    // We prepare an unitialized buffer to receive the data.
764    let mut recvbuf: Vec<T> = Vec::<T>::with_capacity(recv_count as usize);
765    // This avoids having the pre-initialise the array. We simply transmute the spare capacity
766    // into a valid reference and later manually set the length of the array to the full capacity.
767    let recvbuf_ref: &mut [T] = unsafe { std::mem::transmute(recvbuf.spare_capacity_mut()) };
768
769    // And finally we receive the data.
770    comm.process_at_rank(root as i32)
771        .scatter_varcount_into(recvbuf_ref);
772
773    // Don't forget to manually set the length of the vector to the correct value.
774    unsafe { recvbuf.set_len(recv_count as usize) };
775    recvbuf
776}
777
778// This routine synchronizes indices across processes.
779// - Chunk length: An entity can have more than one index. This is the number of indices per entity.
780// - indices: The set of all indices.
781// - owners: The owning rank of each index.
782// The output is a tuple consisting of the associated global indices and the ownership information.
783fn synchronize_entities(
784    comm: &impl Communicator,
785    chunk_length: usize,
786    entities: &[usize],
787    owners: &[usize],
788) -> (Vec<usize>, Vec<Ownership>) {
789    let rank = comm.rank() as usize;
790    let size = comm.size() as usize;
791
792    // First count the local indices.
793    let number_of_local_entities = owners.iter().filter(|&&o| o == rank).count();
794    let mut current_global_index: usize = 0;
795
796    // We now do an exclusive scan to get the right start point for the number of global indices.
797
798    comm.exclusive_scan_into(
799        &number_of_local_entities,
800        &mut current_global_index,
801        SystemOperation::sum(),
802    );
803
804    // Each process now has the start offset for its global indices. We now iterate through
805    // the entities. We assign a global index if the index is owned by the current process.
806    // If not we save the entity, its owning process and the local index into the `entities` array.
807    let mut global_indices = vec![0; entities.len()];
808
809    let mut ghosts = Vec::<Vec<usize>>::default();
810    let mut local_indices_of_ghosts = Vec::<usize>::default();
811    let mut owners_of_ghosts = Vec::<usize>::default();
812
813    for (pos, (chunk, owner)) in izip!(&entities.iter().chunks(chunk_length), owners).enumerate() {
814        let c = chunk.copied().collect_vec();
815        if *owner == rank {
816            global_indices[pos] = current_global_index;
817            current_global_index += 1;
818        } else {
819            ghosts.push(c);
820            local_indices_of_ghosts.push(pos);
821            owners_of_ghosts.push(*owner);
822        }
823    }
824
825    // We want to resort the ghost by process. This makes the communication a lot easier.
826    let sort_indices = (0..ghosts.len())
827        .sorted_by_key(|&i| owners_of_ghosts[i])
828        .collect_vec();
829    let ghosts = sort_indices
830        .iter()
831        .map(|&i| ghosts[i].clone())
832        .collect::<Vec<_>>();
833    let local_indices_of_ghosts = sort_indices
834        .iter()
835        .map(|&i| local_indices_of_ghosts[i])
836        .collect::<Vec<_>>();
837    let owners_of_ghosts = sort_indices
838        .iter()
839        .map(|&i| owners_of_ghosts[i])
840        .collect::<Vec<_>>();
841
842    // Now we have to send the ghost indices to the owning process. For this
843    // each process first needs to know how many global indices it gets.
844
845    let mut counts = vec![0; size];
846    for o in owners_of_ghosts.iter() {
847        counts[*o] += chunk_length;
848    }
849
850    // Now send the counts around via an all-to-all communication.
851
852    let (recv_counts, recv_data) = all_to_all_varcount(
853        comm,
854        &counts,
855        &ghosts.iter().flatten().copied().collect_vec(),
856    );
857
858    // Turn `recv_data` back into a chunked vector
859    let recv_data = recv_data
860        .iter()
861        .chunks(chunk_length)
862        .into_iter()
863        .map(|c| c.copied().collect_vec())
864        .collect_vec();
865
866    // We now have the ghosts on the owning processes. We iterate through and assign the corresponding local indices
867    // and send those back to the original processes.
868    // The difficulty is to assign the local index to each received ghost index. We need a map from indices to local
869    // indices. That is best done with a HashMap.
870    let send_back_local_indices = {
871        let mut map = HashMap::<Vec<usize>, usize>::new();
872        for (pos, chunk) in entities.iter().chunks(chunk_length).into_iter().enumerate() {
873            let c = chunk.copied().collect_vec();
874            map.insert(c, pos);
875        }
876
877        recv_data.iter().map(|i| *map.get(i).unwrap()).collect_vec()
878    };
879
880    // Now we can actually send the indices back. We can almost use recv_counts as the new send_counts.
881    // The issue is that the recv_counts are multiplied by the chunk length. We have to reverse this.
882
883    let recv_counts = recv_counts.iter().map(|i| *i / chunk_length).collect_vec();
884
885    let (_, remote_local_indices_of_ghosts) =
886        all_to_all_varcount(comm, &recv_counts, &send_back_local_indices);
887
888    // We now have to do exactly the same thing with the global indices so that the original process knows the
889    // global indices of its ghosts.
890
891    let send_back_global_indices = {
892        let mut map = HashMap::<Vec<usize>, usize>::new();
893        for (pos, chunk) in entities.iter().chunks(chunk_length).into_iter().enumerate() {
894            map.insert(chunk.copied().collect_vec(), global_indices[pos]);
895        }
896
897        recv_data.iter().map(|i| *map.get(i).unwrap()).collect_vec()
898    };
899
900    let (_, remote_global_indices_of_ghosts) =
901        all_to_all_varcount(comm, &recv_counts, &send_back_global_indices);
902
903    // We can now iterate through the ghosts and assign the correct global indices.
904    for (pos, ghost_local_index) in local_indices_of_ghosts.iter().enumerate() {
905        global_indices[*ghost_local_index] = remote_global_indices_of_ghosts[pos];
906    }
907
908    // Finally, we setup the ownership information
909
910    let mut ownership = vec![Ownership::Owned; entities.len()];
911
912    for (pos, ghost_local_index) in local_indices_of_ghosts.iter().enumerate() {
913        ownership[*ghost_local_index] =
914            Ownership::Ghost(owners_of_ghosts[pos], remote_local_indices_of_ghosts[pos]);
915    }
916
917    (global_indices, ownership)
918}
919
920// Performs an all-to-all communication.
921// Returns the receive counts from each processor
922// and the received data.
923fn all_to_all_varcount<T: Equivalence>(
924    comm: &impl Communicator,
925    counts: &[usize],
926    data: &[T],
927) -> (Vec<usize>, Vec<T>) {
928    // We need the counts as i32 types.
929
930    let counts = counts.iter().map(|&x| x as i32).collect_vec();
931
932    // First send around the counts via an all-to-all
933    let mut recv_counts = vec![0; comm.size() as usize];
934    comm.all_to_all_into(&counts, &mut recv_counts);
935
936    // Now we can prepare the actual data. We have to allocate the data and compute the send partition and the receive partition.
937
938    let mut receive_data = Vec::<T>::with_capacity(recv_counts.iter().sum::<i32>() as usize);
939    let receive_buf: &mut [T] = unsafe { std::mem::transmute(receive_data.spare_capacity_mut()) };
940
941    let send_displacements = counts
942        .iter()
943        .scan(0, |acc, &x| {
944            let old = *acc;
945            *acc += x;
946            Some(old)
947        })
948        .collect_vec();
949
950    let receive_displacements = recv_counts
951        .iter()
952        .scan(0, |acc, &x| {
953            let old = *acc;
954            *acc += x;
955            Some(old)
956        })
957        .collect_vec();
958
959    let send_partition = mpi::datatype::Partition::new(data, counts, send_displacements);
960    let mut receive_partition =
961        mpi::datatype::PartitionMut::new(receive_buf, &recv_counts[..], receive_displacements);
962
963    comm.all_to_all_varcount_into(&send_partition, &mut receive_partition);
964
965    unsafe { receive_data.set_len(recv_counts.iter().sum::<i32>() as usize) };
966
967    (
968        recv_counts.iter().map(|i| *i as usize).collect_vec(),
969        receive_data,
970    )
971}