ndgrid/grid/
parallel_builder.rs

1//! Parallel grid builder
2
3use super::ParallelGridImpl;
4use crate::{
5    traits::{
6        Builder, Entity, GeometryBuilder, Grid, GridBuilder, ParallelBuilder, Topology,
7        TopologyBuilder,
8    },
9    types::{GraphPartitioner, Ownership},
10};
11use itertools::{Itertools, izip};
12use mpi::{
13    collective::SystemOperation,
14    traits::{Buffer, Communicator, CommunicatorCollectives, Equivalence},
15};
16use rlst::distributed_tools::{all_to_allv, scatterv, scatterv_root};
17use std::collections::{HashMap, HashSet};
18
19#[cfg(feature = "coupe")]
20use coupe::{KMeans, Partition, Point3D};
21
22#[cfg(feature = "scotch")]
23use scotch::{Architecture, Graph, Strategy, graph};
24
25/// A simple struct to hold chunked data. The data is a long
26/// array and `idx_bounds` is a vector sunch that `chunks[idx_bounds[i]..idx_bounds[i+1]]`
27/// is one chunk. The last element of `idx_bounds` is the length of the data array.
28struct ChunkedData<T: Equivalence + Copy> {
29    data: Vec<T>,
30    idx_bounds: Vec<usize>,
31}
32
33impl<T: Equivalence + Copy> ChunkedData<T> {
34    /// Return a vector with the number of elements per chunk.
35    fn counts(&self) -> Vec<usize> {
36        self.idx_bounds
37            .iter()
38            .tuple_windows()
39            .map(|(a, b)| b - a)
40            .collect()
41    }
42
43    /// Scatter data across processes
44    fn scatterv_root(&self, comm: &impl Communicator) -> Vec<T> {
45        scatterv_root(comm, &self.counts(), &self.data)
46    }
47}
48
49impl<B: Builder + GeometryBuilder + TopologyBuilder + GridBuilder> ParallelBuilder for B
50where
51    Vec<B::T>: Buffer,
52    B::T: Equivalence,
53    Vec<B::EntityDescriptor>: Buffer,
54    B::EntityDescriptor: Equivalence,
55    B::Grid: Sync,
56{
57    type ParallelGrid<'a, C: Communicator + 'a>
58        = ParallelGridImpl<'a, C, B::Grid>
59    where
60        Self: 'a;
61    fn create_parallel_grid_root<'a, C: Communicator>(
62        &self,
63        comm: &'a C,
64        partitioner: GraphPartitioner,
65    ) -> ParallelGridImpl<'a, C, B::Grid> {
66        // If we have only a single process we can just create a serial grid.
67        if comm.size() == 1 {
68            let serial_grid = self.create_grid();
69
70            // Need to fill ownership and global indices now.
71
72            let mut owners = HashMap::new();
73            let mut global_indices = HashMap::new();
74
75            for dim in 0..self.tdim() {
76                for t in serial_grid.entity_types(dim) {
77                    let entity_count = serial_grid.entity_iter(*t).count();
78                    owners.insert(*t, vec![Ownership::Owned; entity_count]);
79                    global_indices.insert(*t, (0..entity_count).collect_vec());
80                }
81            }
82
83            return ParallelGridImpl::new(comm, serial_grid, owners, global_indices);
84        }
85        // Partition the cells
86        let cell_owners = self.partition_cells(comm.size() as usize, partitioner);
87
88        // Each vertex is assigned the minimum process that has a cell that contains it.
89        // Note that the array is of the size of the points in the grid and contains garbage information
90        // for points that are not vertices.
91        let vertex_owners = self.assign_vertex_owners(comm.size() as usize, &cell_owners);
92
93        // This distributes cells, vertices and points to processes.
94        // Each process gets now only the cell that it owns via `cell_owners` but also its neighbours.
95        // Then all the corresponding vertices and points are also added to the corresponding cell.
96        // Each return value is a tuple consisting of a counts array that specifies how many indices are
97        // assigned to each process and the actual indices.
98        let (vertices_per_proc, points_per_proc, cells_per_proc) =
99            self.get_vertices_points_and_cells(comm.size() as usize, &cell_owners);
100
101        // Compute the chunks array for the coordinates associated with the points.
102        // The idx array for the coords is simply `self.gdim()` times the idx array for the points.
103        let coords_per_proc = ChunkedData {
104            data: {
105                let mut tmp =
106                    Vec::with_capacity(self.gdim() * points_per_proc.idx_bounds.last().unwrap());
107                points_per_proc
108                    .data
109                    .iter()
110                    .for_each(|point| tmp.extend(self.point(*point).iter()));
111                tmp
112            },
113            idx_bounds: points_per_proc
114                .idx_bounds
115                .iter()
116                .map(|x| self.gdim() * x)
117                .collect(),
118        };
119
120        // This compputes for each process the vertex owners.
121        let vertex_owners_per_proc = ChunkedData::<usize> {
122            data: {
123                let mut tmp = Vec::with_capacity(*vertices_per_proc.idx_bounds.last().unwrap());
124                vertices_per_proc
125                    .data
126                    .iter()
127                    .for_each(|v| tmp.push(vertex_owners[*v]));
128                tmp
129            },
130            idx_bounds: vertices_per_proc.idx_bounds.clone(),
131        };
132
133        // We now compute the cell information for each process.
134
135        // First we need the cell type.
136        let cell_types_per_proc = ChunkedData {
137            data: {
138                let mut tmp = Vec::with_capacity(*cells_per_proc.idx_bounds.last().unwrap());
139                cells_per_proc
140                    .data
141                    .iter()
142                    .for_each(|c| tmp.push(self.cell_type(*c)));
143                tmp
144            },
145            idx_bounds: cells_per_proc.idx_bounds.clone(),
146        };
147
148        // Now we need the cell degrees.
149        let cell_degrees_per_proc = ChunkedData {
150            data: {
151                let mut tmp = Vec::with_capacity(*cells_per_proc.idx_bounds.last().unwrap());
152                cells_per_proc
153                    .data
154                    .iter()
155                    .for_each(|c| tmp.push(self.cell_degree(*c)));
156                tmp
157            },
158            idx_bounds: cells_per_proc.idx_bounds.clone(),
159        };
160
161        // Now need the cell owners.
162        let cell_owners_per_proc = ChunkedData {
163            data: {
164                let mut tmp = Vec::with_capacity(*cells_per_proc.idx_bounds.last().unwrap());
165                cells_per_proc
166                    .data
167                    .iter()
168                    .for_each(|c| tmp.push(cell_owners[*c]));
169                tmp
170            },
171            idx_bounds: cells_per_proc.idx_bounds.clone(),
172        };
173
174        // Finally the cell points. These are more messy since each cell might have different numbers of points. Hence,
175        // we need to compute a new counts array.
176        let cell_points_per_proc = {
177            // First compute the total number of points needed so that we can pre-allocated the array.
178            let total_points = cells_per_proc
179                .data
180                .iter()
181                .map(|c| self.npts(self.cell_type(*c), self.cell_degree(*c)))
182                .sum();
183            let mut data = Vec::with_capacity(total_points);
184            let mut counts = vec![0; comm.size() as usize];
185            for (proc, (&chunk_ind_start, &chunk_ind_end)) in
186                cells_per_proc.idx_bounds.iter().tuple_windows().enumerate()
187            {
188                for c in cells_per_proc.data[chunk_ind_start..chunk_ind_end].iter() {
189                    let npts = self.npts(self.cell_type(*c), self.cell_degree(*c));
190                    counts[proc] += npts;
191                    data.extend(self.cell_points(*c).iter());
192                }
193            }
194            // Now turn the counts into an `idx_bounds` array.
195            let mut idx_bounds = counts
196                .iter()
197                .scan(0, |state, &x| {
198                    let old = *state;
199                    *state += x;
200                    Some(old)
201                })
202                .collect_vec();
203            idx_bounds.push(idx_bounds.last().unwrap() + counts.last().unwrap());
204
205            // Finally return the chunks
206            ChunkedData { data, idx_bounds }
207        };
208
209        // Now we send everything across the processes. Every _per_proc variable is scattered across
210        // the processes. After the scatter operation we have on each process the following data.
211        // - `point_indices` contains the indices of the points that are owned by the current process.
212        // - `coordinates` contains the coordinates of the points that are owned by the current process.
213        // - `vertex_indices` contains the indices of the vertices that are owned by the current process.
214        // - `vertex_owners` contains the owners of the vertices that are owned by the current process.
215        // - `cell_indices` contains the indices of the cells that are owned by the current process.
216        // - `cell_points` contains the indices of the points of the cells that are owned by the current process.
217        // - `cell_types` contains the types of the cells that are owned by the current process.
218        // - `cell_degrees` contains the degrees of the cells that are owned by the current process.
219        // - `cell_owners` contains the owners of the cells that are owned by the current process.
220        let point_indices = points_per_proc.scatterv_root(comm);
221        let coordinates = coords_per_proc.scatterv_root(comm);
222        let vertex_indices = vertices_per_proc.scatterv_root(comm);
223        let vertex_owners = vertex_owners_per_proc.scatterv_root(comm);
224        let cell_indices = cells_per_proc.scatterv_root(comm);
225        let cell_points = cell_points_per_proc.scatterv_root(comm);
226        let cell_types = cell_types_per_proc.scatterv_root(comm);
227        let cell_degrees = cell_degrees_per_proc.scatterv_root(comm);
228        let cell_owners = cell_owners_per_proc.scatterv_root(comm);
229
230        // This is executed on all ranks and creates the local grid.
231        self.create_parallel_grid_internal(
232            comm,
233            point_indices,
234            coordinates,
235            vertex_indices,
236            vertex_owners,
237            cell_indices,
238            cell_points,
239            cell_types,
240            cell_degrees,
241            cell_owners,
242        )
243    }
244    fn create_parallel_grid<'a, C: Communicator>(
245        &self,
246        comm: &'a C,
247        root_rank: i32,
248    ) -> ParallelGridImpl<'a, C, B::Grid> {
249        // First we receive all the data.
250        let root_rank = root_rank as usize;
251        let point_indices = scatterv(comm, root_rank);
252        let coordinates = scatterv(comm, root_rank);
253        let vertex_indices = scatterv(comm, root_rank);
254        let vertex_owners = scatterv(comm, root_rank);
255        let cell_indices = scatterv(comm, root_rank);
256        let cell_points = scatterv(comm, root_rank);
257        let cell_types = scatterv(comm, root_rank);
258        let cell_degrees = scatterv(comm, root_rank);
259        let cell_owners = scatterv(comm, root_rank);
260
261        // Now we reassemble the grid.
262
263        self.create_parallel_grid_internal(
264            comm,
265            point_indices,
266            coordinates,
267            vertex_indices,
268            vertex_owners,
269            cell_indices,
270            cell_points,
271            cell_types,
272            cell_degrees,
273            cell_owners,
274        )
275    }
276}
277
278trait ParallelBuilderFunctions: Builder + GeometryBuilder + TopologyBuilder + GridBuilder {
279    //! Parallel builder functions
280    //!
281    //! These functions are included in a trait so that they can be implemented for an arbitrary builder B below
282
283    /// Internal function to create parallel grid
284    #[allow(clippy::too_many_arguments)]
285    fn create_parallel_grid_internal<'a, C: Communicator>(
286        &self,
287        comm: &'a C,
288        point_indices: Vec<usize>,
289        coordinates: Vec<Self::T>,
290        vertex_indices: Vec<usize>,
291        vertex_owners: Vec<usize>,
292        cell_indices: Vec<usize>,
293        cell_points: Vec<usize>,
294        cell_types: Vec<Self::EntityDescriptor>,
295        cell_degrees: Vec<usize>,
296        cell_owners: Vec<usize>,
297    ) -> ParallelGridImpl<'a, C, Self::Grid>
298    where
299        Self::Grid: Sync,
300    {
301        let rank = comm.rank() as usize;
302        // First we want to reorder everything so that owned data comes first in the arrays.
303
304        // Reorder the vertices.
305        let (vertex_indices, vertex_owners) = {
306            let mut new_indices = Vec::<usize>::with_capacity(vertex_indices.len());
307            let mut new_owners = Vec::<usize>::with_capacity(vertex_owners.len());
308
309            for (&v, &o) in izip!(&vertex_indices, &vertex_owners).filter(|&(_, &o)| o == rank) {
310                new_indices.push(v);
311                new_owners.push(o);
312            }
313
314            for (&v, &o) in izip!(&vertex_indices, &vertex_owners).filter(|&(_, &o)| o != rank) {
315                new_indices.push(v);
316                new_owners.push(o);
317            }
318
319            (new_indices, new_owners)
320        };
321
322        // Reorder the cell information. However, things are a bit messy with the cell points as each cell
323        // may have a different number of points attached. Hence, we need to compute a new counts array for the cell points.
324
325        let cell_point_owners = {
326            let mut new_owners = Vec::<usize>::with_capacity(cell_points.len());
327            // We now iterate through the cells and their owners and assign owners accordingly for the points.
328            for (o, ct, cd) in izip!(cell_owners.iter(), cell_types.iter(), cell_degrees.iter()) {
329                let npts = self.npts(*ct, *cd);
330                for _ in 0..npts {
331                    new_owners.push(*o);
332                }
333            }
334            new_owners
335        };
336
337        // First we reorder the cells, degrees, types and owners.
338
339        let (cell_indices, cell_types, cell_degrees, cell_owners) = {
340            let mut new_indices = Vec::<usize>::with_capacity(cell_indices.len());
341            let mut new_types = Vec::<Self::EntityDescriptor>::with_capacity(cell_types.len());
342            let mut new_degrees = Vec::<usize>::with_capacity(cell_degrees.len());
343            let mut new_owners = Vec::<usize>::with_capacity(cell_owners.len());
344
345            for (cell_index, cell_type, cell_degree, cell_owner) in
346                izip!(&cell_indices, &cell_types, &cell_degrees, &cell_owners,)
347                    .filter(|&(_, _, _, &o)| o == rank)
348            {
349                new_indices.push(*cell_index);
350                new_types.push(*cell_type);
351                new_degrees.push(*cell_degree);
352                new_owners.push(*cell_owner);
353            }
354
355            for (cell_index, cell_type, cell_degree, cell_owner) in
356                izip!(&cell_indices, &cell_types, &cell_degrees, &cell_owners,)
357                    .filter(|&(_, _, _, &o)| o != rank)
358            {
359                new_indices.push(*cell_index);
360                new_types.push(*cell_type);
361                new_degrees.push(*cell_degree);
362                new_owners.push(*cell_owner);
363            }
364
365            (new_indices, new_types, new_degrees, new_owners)
366        };
367
368        // Finally, we reorder the cell points.
369        let cell_points = {
370            let mut new_points = Vec::<usize>::with_capacity(cell_points.len());
371            for &p in izip!(&cell_points, &cell_point_owners)
372                .filter(|&(_, &o)| o == rank)
373                .map(|(p, _)| p)
374            {
375                new_points.push(p);
376            }
377
378            for &p in izip!(&cell_points, &cell_point_owners)
379                .filter(|&(_, &o)| o != rank)
380                .map(|(p, _)| p)
381            {
382                new_points.push(p);
383            }
384            new_points
385        };
386
387        // Everything is properly sorted now. Can generate the local grids.
388        // Need to check those routines. Implementation is not efficient right now.
389        let geometry = self.create_geometry(
390            &point_indices,
391            &coordinates,
392            &cell_points,
393            &cell_types,
394            &cell_degrees,
395        );
396        let topology = self.create_topology(
397            vertex_indices.clone(),
398            cell_indices.clone(),
399            &cell_points,
400            &cell_types,
401        );
402
403        let serial_grid = self.create_grid_from_topology_geometry(topology, geometry);
404
405        // Create global indices and proper ownership information with ghost process and local index
406        // on ghost process.
407        // After execution `vertex_global_indices` and `cell_global_indices` have the global indices
408        // of the `vertex_indices` and `cell_indices` arrays.
409        // The global indices are not necessarily identical to the original grid indices.
410        let (vertex_global_indices, vertex_owners) =
411            synchronize_entities(comm, 1, &vertex_indices, &vertex_owners);
412        let (cell_global_indices, cell_owners) =
413            synchronize_entities(comm, 1, &cell_indices, &cell_owners);
414
415        // For later we need a map from vertex global indices to vertex owners.
416        let mut vertex_global_indices_to_owners = HashMap::<usize, usize>::new();
417        for (global_index, owner) in izip!(&vertex_global_indices, &vertex_owners) {
418            let owner_rank = match *owner {
419                Ownership::Owned => rank,
420                Ownership::Ghost(p, _) => p,
421                _ => panic!("Unsupported ownership: {owner:?}"),
422            };
423            vertex_global_indices_to_owners.insert(*global_index, owner_rank);
424        }
425
426        let point_type = serial_grid.entity_types(0)[0];
427        let mut owners = HashMap::new();
428        owners.insert(point_type, vertex_owners);
429        let mut global_indices = HashMap::new();
430        global_indices.insert(point_type, vertex_global_indices.clone());
431        for dim in 1..self.tdim() {
432            for t in serial_grid.entity_types(dim) {
433                // We now iterate through the entities of varying dimensions and assign global indices and ownership.
434                // First we need to get out the vertices assigned with each entity and figure out the ownership of the entity.
435                // Ownership is determined by the minimum process that owns one of the vertices of the entity.
436
437                // We get out the chunk length by probing the first entity. This only works because the grid has a single
438                // element type.
439                let chunk_length = serial_grid
440                    .entity(*t, 0)
441                    .unwrap()
442                    .topology()
443                    .sub_entity_iter(point_type)
444                    .count();
445
446                // We have the chunk length. So iterate through to get all the vertices.
447                let number_of_entities = serial_grid.entity_iter(*t).count();
448                let mut entities = Vec::with_capacity(number_of_entities * chunk_length);
449                let mut entity_ranks = Vec::with_capacity(number_of_entities * chunk_length);
450                for entity in serial_grid.entity_iter(*t) {
451                    // We iterate through the vertices of the entity and get the global indices of the vertices.
452                    // This works because the topology returns positions into the vertex array.
453                    // Hence, we can use those indices to map to the global indices.
454                    // We also sort everything since later we use the sorted global indices as keys in a hash map
455                    // within the `synchronize_entities` function.
456                    let vertices = entity
457                        .topology()
458                        .sub_entity_iter(point_type)
459                        .map(|v| vertex_global_indices[v])
460                        .sorted()
461                        .collect_vec();
462                    let owner = *vertices
463                        .iter()
464                        .map(|v| vertex_global_indices_to_owners.get(v).unwrap())
465                        .min()
466                        .unwrap();
467                    entities.extend(vertices);
468                    entity_ranks.push(owner);
469                }
470
471                // We now have the entities and their owners. We can synchronize the entities to get the global
472                // indices and ownership information.
473
474                let (entity_global_indices, entity_owners) =
475                    synchronize_entities(comm, chunk_length, &entities, &entity_ranks);
476
477                owners.insert(*t, entity_owners);
478                global_indices.insert(*t, entity_global_indices);
479            }
480        }
481        for (cell_type, owner, index) in izip!(cell_types, cell_owners, cell_global_indices) {
482            owners.entry(cell_type).or_insert(vec![]).push(owner);
483            global_indices
484                .entry(cell_type)
485                .or_insert(vec![])
486                .push(index);
487        }
488
489        ParallelGridImpl::new(comm, serial_grid, owners, global_indices)
490    }
491
492    /// Partition the cells
493    fn partition_cells(&self, nprocesses: usize, partitioner: GraphPartitioner) -> Vec<usize> {
494        // Create an initial partitioning that roughly assigns the same
495        // number of cells to each process.
496        let mut partition = vec![];
497        for i in 0..nprocesses {
498            while partition.len() < self.cell_count() * (i + 1) / nprocesses {
499                partition.push(i);
500            }
501        }
502
503        match partitioner {
504            GraphPartitioner::None => partition,
505            GraphPartitioner::Manual(p) => p,
506            #[cfg(feature = "coupe")]
507            GraphPartitioner::Coupe => {
508                // Compute the midpoints of each cell. If the geometric dimension is smaller than 3
509                // then the remaining dimensions are just set to 0.
510                let midpoints = (0..self.cell_count())
511                    .map(|i| {
512                        let v = self.cell_points(i);
513                        Point3D::new(
514                            if self.gdim() > 0 {
515                                num::cast::<Self::T, f64>(
516                                    v.iter().map(|j| self.point(*j)[0]).sum::<Self::T>(),
517                                )
518                                .unwrap()
519                                    / v.len() as f64
520                            } else {
521                                0.0
522                            },
523                            if self.gdim() > 1 {
524                                num::cast::<Self::T, f64>(
525                                    v.iter().map(|j| self.point(*j)[1]).sum::<Self::T>(),
526                                )
527                                .unwrap()
528                                    / v.len() as f64
529                            } else {
530                                0.0
531                            },
532                            if self.gdim() > 2 {
533                                num::cast::<Self::T, f64>(
534                                    v.iter().map(|j| self.point(*j)[2]).sum::<Self::T>(),
535                                )
536                                .unwrap()
537                                    / v.len() as f64
538                            } else {
539                                0.0
540                            },
541                        )
542                    })
543                    .collect::<Vec<_>>();
544
545                // Assign equal weights to each cell
546                let weights = vec![1.0; self.point_count()];
547
548                // Run the Coupe KMeans algorithm to create the partitioning. Maybe replace
549                // by Metis at some point.
550                KMeans {
551                    delta_threshold: 0.0,
552                    ..Default::default()
553                }
554                .partition(&mut partition, (&midpoints, &weights))
555                .unwrap();
556                // Return the new partitioning.
557                partition
558            }
559            #[cfg(feature = "scotch")]
560            GraphPartitioner::Scotch => {
561                let mut vertex_to_cells = HashMap::<usize, HashSet<i32>>::new();
562                for i in 0..self.cell_count() {
563                    for v in self.cell_vertices(i) {
564                        vertex_to_cells.entry(*v).or_default().insert(i as i32);
565                    }
566                }
567
568                let mut endpoints = vec![0];
569                let mut connectivity = vec![];
570                for i in 0..self.cell_count() {
571                    let end = endpoints[endpoints.len() - 1] as usize;
572                    for v in self.cell_vertices(i) {
573                        for c in vertex_to_cells.get(v).unwrap() {
574                            if !connectivity[end..].contains(c) {
575                                connectivity.push(*c)
576                            }
577                        }
578                    }
579                    endpoints.push(connectivity.len() as i32);
580                }
581
582                let mut strategy = Strategy::new(); // use the default strategy.
583                let arch = Architecture::complete(nprocesses as i32);
584
585                let data = graph::Data::new(
586                    0,
587                    &endpoints[..endpoints.len() - 1],
588                    &endpoints[1..],
589                    &[],
590                    &[],
591                    &connectivity,
592                    &[],
593                );
594                let mut graph = Graph::build(&data).expect("Could not build graph");
595                let (vertnbr, _edgenbr) = graph.size();
596                let mut parttab = vec![0; vertnbr as usize];
597                let _ = graph
598                    .mapping(&arch, &mut parttab)
599                    .compute(&mut strategy)
600                    .expect("");
601                parttab.iter().map(|i| *i as usize).collect::<Vec<_>>()
602            }
603        }
604    }
605
606    /// Assign vertex owners
607    fn assign_vertex_owners(&self, nprocesses: usize, cell_owners: &[usize]) -> Vec<usize> {
608        // Initialise empty array with number of processes as value and length same as number of points.
609        let mut vertex_owners = vec![nprocesses; self.point_count()];
610
611        // Each vertex is owned by the minimum process that owns a cell that contains the vertex.
612        for (i, owner) in cell_owners.iter().enumerate() {
613            for v in self.cell_vertices(i) {
614                vertex_owners[*v] = std::cmp::min(vertex_owners[*v], *owner);
615            }
616        }
617
618        // Return the vertex owners. Note that the array contains garbage information
619        // for the points of the grids that are not vertices.
620        vertex_owners
621    }
622
623    /// Compute the vertices and cells that each process needs access to.
624    /// The function returns three tuples, vertices, points, and cells.
625    /// The first vector of each tuple contains the number of entities on each process
626    /// and is of length `nprocesses`. The second tuple contains a flattened array
627    /// of all the entity indices aligned according to the counts in the first array.
628    #[allow(clippy::type_complexity)]
629    fn get_vertices_points_and_cells(
630        &self,
631        nprocesses: usize,
632        cell_owners: &[usize],
633    ) -> (ChunkedData<usize>, ChunkedData<usize>, ChunkedData<usize>) {
634        // The following maps each vertex index to a set of cells that contain it.
635        let mut vertex_to_cells = HashMap::<usize, HashSet<usize>>::new();
636
637        // For each cell, go through its vertices and add the cell to the cell-list of the vertex.
638        for i in 0..self.cell_count() {
639            for v in self.cell_vertices(i) {
640                vertex_to_cells.entry(*v).or_default().insert(i);
641            }
642        }
643
644        // This instantiates a vector with `nprocesses` empty sets.
645        let map_creator =
646            || -> Vec<HashSet<usize>> { (0..nprocesses).map(|_| HashSet::new()).collect_vec() };
647
648        // Each of these is a map from process to a set of indices.
649        let mut vertices = map_creator();
650        let mut points = map_creator();
651        let mut cells = map_creator();
652
653        // This assigns cell to processes. It iterates through a cell and associates
654        // not only the cell itself to the process that owns it but also adds all the
655        // neighboring cells to the same process. This means that processes own not
656        // only the original cells from `cell_owners` but also all the neighbouring cells.
657        // Also this means that a cell can live on multiple processes.
658        for (i, owner) in cell_owners.iter().enumerate() {
659            for v in self.cell_vertices(i) {
660                // Add all cells that are connected to the vertex to the same process.
661                cells[*owner].extend(vertex_to_cells.get(v).expect("Vertex not found."));
662            }
663        }
664
665        // Now go through and add the points and vertices to the same processes that also the cells are
666        // assigned to.
667        for (proc, cells) in cells.iter().enumerate() {
668            for cell in cells {
669                for v in self.cell_vertices(*cell) {
670                    vertices[proc].insert(*v);
671                }
672                for p in self.cell_points(*cell) {
673                    points[proc].insert(*p);
674                }
675            }
676        }
677
678        // The following function flattens an array of sets and returns the counts and the flattened array
679        // as ChunkedData struct.
680        let flatten = |data: Vec<HashSet<usize>>| -> ChunkedData<usize> {
681            let mut idx_bounds = data
682                .iter()
683                .scan(0, |acc, x| {
684                    let old = *acc;
685                    *acc += x.len();
686                    Some(old)
687                })
688                .collect_vec();
689            idx_bounds.push(idx_bounds.last().unwrap() + data.last().unwrap().len());
690            let mut tmp = Vec::<usize>::with_capacity(*idx_bounds.last().unwrap());
691            for s in data {
692                // Each set itself has a random ordering. This makes runs of the method
693                // not reproducible. The easiest fix is to sort the set before adding it to `tmp`.
694                let mut to_sort = Vec::<usize>::with_capacity(s.len());
695                to_sort.extend(s.iter());
696                to_sort.sort();
697                tmp.extend(to_sort);
698            }
699            ChunkedData {
700                data: tmp,
701                idx_bounds,
702            }
703        };
704
705        (flatten(vertices), flatten(points), flatten(cells))
706    }
707}
708
709impl<B: Builder + GeometryBuilder + TopologyBuilder + GridBuilder> ParallelBuilderFunctions for B
710where
711    Vec<B::T>: Buffer,
712    B::T: Equivalence,
713    Vec<B::EntityDescriptor>: Buffer,
714    B::EntityDescriptor: Equivalence,
715{
716}
717
718/// This routine synchronizes indices across processes.
719///
720/// - Chunk length: An entity can have more than one index. This is the number of indices per entity.
721/// - indices: The set of all indices.
722/// - owners: The owning rank of each index.
723///
724/// The output is a tuple consisting of the associated global indices and the ownership information.
725fn synchronize_entities(
726    comm: &impl Communicator,
727    chunk_length: usize,
728    entities: &[usize],
729    owners: &[usize],
730) -> (Vec<usize>, Vec<Ownership>) {
731    let rank = comm.rank() as usize;
732    let size = comm.size() as usize;
733
734    // First count the local indices.
735    let number_of_local_entities = owners.iter().filter(|&&o| o == rank).count();
736    let mut current_global_index: usize = 0;
737
738    // We now do an exclusive scan to get the right start point for the number of global indices.
739
740    comm.exclusive_scan_into(
741        &number_of_local_entities,
742        &mut current_global_index,
743        SystemOperation::sum(),
744    );
745
746    // Each process now has the start offset for its global indices. We now iterate through
747    // the entities. We assign a global index if the index is owned by the current process.
748    // If not we save the entity, its owning process and the local index into the `entities` array.
749    let mut global_indices = vec![0; entities.len()];
750
751    let mut ghosts = Vec::<Vec<usize>>::default();
752    let mut local_indices_of_ghosts = Vec::<usize>::default();
753    let mut owners_of_ghosts = Vec::<usize>::default();
754
755    for (pos, (chunk, owner)) in izip!(&entities.iter().chunks(chunk_length), owners).enumerate() {
756        let c = chunk.copied().collect_vec();
757        if *owner == rank {
758            global_indices[pos] = current_global_index;
759            current_global_index += 1;
760        } else {
761            ghosts.push(c);
762            local_indices_of_ghosts.push(pos);
763            owners_of_ghosts.push(*owner);
764        }
765    }
766
767    // We want to resort the ghost by process. This makes the communication a lot easier.
768    let sort_indices = (0..ghosts.len())
769        .sorted_by_key(|&i| owners_of_ghosts[i])
770        .collect_vec();
771    let ghosts = sort_indices
772        .iter()
773        .map(|&i| ghosts[i].clone())
774        .collect::<Vec<_>>();
775    let local_indices_of_ghosts = sort_indices
776        .iter()
777        .map(|&i| local_indices_of_ghosts[i])
778        .collect::<Vec<_>>();
779    let owners_of_ghosts = sort_indices
780        .iter()
781        .map(|&i| owners_of_ghosts[i])
782        .collect::<Vec<_>>();
783
784    // Now we have to send the ghost indices to the owning process. For this
785    // each process first needs to know how many global indices it gets.
786
787    let mut counts = vec![0; size];
788    for o in owners_of_ghosts.iter() {
789        counts[*o] += chunk_length;
790    }
791
792    // Now send the counts around via an all-to-all communication.
793
794    let (recv_counts, recv_data) = all_to_allv(
795        comm,
796        &counts,
797        &ghosts.iter().flatten().copied().collect_vec(),
798    );
799
800    // Turn `recv_data` back into a chunked vector
801    let recv_data = recv_data
802        .iter()
803        .chunks(chunk_length)
804        .into_iter()
805        .map(|c| c.copied().collect_vec())
806        .collect_vec();
807
808    // We now have the ghosts on the owning processes. We iterate through and assign the corresponding local indices
809    // and send those back to the original processes.
810    // The difficulty is to assign the local index to each received ghost index. We need a map from indices to local
811    // indices. That is best done with a HashMap.
812    let send_back_local_indices = {
813        let mut map = HashMap::<Vec<usize>, usize>::new();
814        for (pos, chunk) in entities.iter().chunks(chunk_length).into_iter().enumerate() {
815            let c = chunk.copied().collect_vec();
816            map.insert(c, pos);
817        }
818
819        recv_data.iter().map(|i| *map.get(i).unwrap()).collect_vec()
820    };
821
822    // Now we can actually send the indices back. We can almost use recv_counts as the new send_counts.
823    // The issue is that the recv_counts are multiplied by the chunk length. We have to reverse this.
824
825    let recv_counts = recv_counts.iter().map(|i| *i / chunk_length).collect_vec();
826
827    let (_, remote_local_indices_of_ghosts) =
828        all_to_allv(comm, &recv_counts, &send_back_local_indices);
829
830    // We now have to do exactly the same thing with the global indices so that the original process knows the
831    // global indices of its ghosts.
832
833    let send_back_global_indices = {
834        let mut map = HashMap::<Vec<usize>, usize>::new();
835        for (pos, chunk) in entities.iter().chunks(chunk_length).into_iter().enumerate() {
836            map.insert(chunk.copied().collect_vec(), global_indices[pos]);
837        }
838
839        recv_data.iter().map(|i| *map.get(i).unwrap()).collect_vec()
840    };
841
842    let (_, remote_global_indices_of_ghosts) =
843        all_to_allv(comm, &recv_counts, &send_back_global_indices);
844
845    // We can now iterate through the ghosts and assign the correct global indices.
846    for (pos, ghost_local_index) in local_indices_of_ghosts.iter().enumerate() {
847        global_indices[*ghost_local_index] = remote_global_indices_of_ghosts[pos];
848    }
849
850    // Finally, we setup the ownership information
851
852    let mut ownership = vec![Ownership::Owned; entities.len()];
853
854    for (pos, ghost_local_index) in local_indices_of_ghosts.iter().enumerate() {
855        ownership[*ghost_local_index] =
856            Ownership::Ghost(owners_of_ghosts[pos], remote_local_indices_of_ghosts[pos]);
857    }
858
859    (global_indices, ownership)
860}