Skip to content

Index

MCFSolverInterface

Bases: Protocol

Interface for an MCF solver implementation.

A solver can be initialized with a planar graph and can be repeatedly invoked with different input. See the documentation of PlanarGraphInterface for restrictions on supported graphs.

Attributes:

Name Type Description
npoints int

Number of points in primal graph.

nedges int

Number of edges in primal graph.

ncycles int

Number of cycles in primal graph.

cycle_length int

Length of cycles in primal graph.

points ArrayLike

2D array of size (npoints, 2) with coordinates.

edges ArrayLike

2D array of size (nedges, 2) where values indicate index into points array. Edges always go from lower index to higher index.

cycles ArrayLike

2D array of size (ncycles, cycle_length) where values indicate index points array. All cycles are oriented in one direction.

dual_edges ArrayLike

2D array of size (nedges, 2) where values indicate 1-index into cycles array. zero indicates a boundary edge in primal graph.

dual_edge_dir ArrayLike

2D array of size (nedges, 2) where values orientation of edge into 1-indexed cycles array. zero indicates a boundary edge in primal graph.

Source code in src/spurt/mcf/_interface.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
@runtime_checkable
class MCFSolverInterface(Protocol):
    """Interface for an MCF solver implementation.

    A solver can be initialized with a planar graph and
    can be repeatedly invoked with different input. See
    the documentation of PlanarGraphInterface for restrictions
    on supported graphs.


    Attributes
    ----------
    npoints: int
        Number of points in primal graph.
    nedges: int
        Number of edges in primal graph.
    ncycles: int
        Number of cycles in primal graph.
    cycle_length: int
        Length of cycles in primal graph.
    points: ArrayLike
        2D array of size (npoints, 2) with coordinates.
    edges: ArrayLike
        2D array of size (nedges, 2) where values indicate index into points
        array. Edges always go from lower index to higher index.
    cycles: ArrayLike
        2D array of size (ncycles, cycle_length) where values indicate index
        points array. All cycles are oriented in one direction.
    dual_edges: ArrayLike
        2D array of size (nedges, 2) where values indicate 1-index into cycles
        array. zero indicates a boundary edge in primal graph.
    dual_edge_dir: ArrayLike
        2D array of size (nedges, 2) where values orientation of edge into
        1-indexed cycles array. zero indicates a boundary edge in primal
        graph.
    """

    def __init__(self, graph: PlanarGraphInterface):
        """Initialize solver with the graph."""

    @property
    def npoints(self) -> int:
        """Number of points in primal graph."""

    @property
    def nedges(self) -> int:
        """Number of links in primal graph."""

    @property
    def ncycles(self) -> int:
        """Number of cycles in primal graph."""

    @property
    def cycle_length(self) -> int:
        """Length of cycles in primal graph."""

    @property
    def points(self) -> int:
        """Points in primal graph."""

    @property
    def edges(self) -> ArrayLike:
        """Edges in primal graph."""

    @property
    def cycles(self) -> ArrayLike:
        """Cycles in primal graph."""

    @property
    def dual_edges(self) -> ArrayLike:
        """Returns dual edges of the graph.

        This is of size (nedges, 2)
        """

    @property
    def dual_edge_dir(self) -> ArrayLike:
        """Array containing orientation of edge in cycle.

        This is of size (nedges, 2).
        """

    def unwrap_one(
        self, wrapdata: ArrayLike, cost: ArrayLike, revcost: ArrayLike | None = None
    ) -> tuple[ArrayLike, ArrayLike]:
        """Solver should return unwrapped phase and flows on the edges.

        Parameters
        ----------
        wrapdata: ArrayLike
            1D array as input in radians or as complex numbers. Same size as
            number of points in the graph.
        cost: ArrayLike
            1D array of nonnegative integer costs. Same size as number of edges in
            the graph. Represents forward directional cost when used in
            combination with revcost.
        revcost: ArrayLike | None
            1D array of nonnegative integer costs on links in reverse direction.
            Same size as number of edges in the graph. cost is used if not
            provided.

        Returns
        -------
        unwdata: ArrayLike
            1D array of unwrapped phase in radians. Same size as number of
            points in the graph.
        flows: ArrayLike
            1D array of integer flows along each of the edges in the graph.
        """

    def residues_to_flows(
        self, residues: ArrayLike, cost: ArrayLike, revcost: ArrayLike | None = None
    ) -> ArrayLike:
        """Solver should return integer flows corresponding to given residues.

        Parameters
        ----------
        residues: ArrayLike
            1D array of integer residues. Same size as number of cycles in
            graph plus one to accommodate the grounding node. Array must sum to
            zero.
        cost: ArrayLike
            1D array of nonnegative integer costs. Same size as number of edges
            in the graph. Represents forward directional cost when used in
            combination with revcost.
        revcost: ArrayLike | None
            1D array of nonnegative integer costs on links in reverse direction.
            Same size as number of edges in the graph. cost is used if not
            provided.

        Returns
        -------
        flows: ArrayLike
            1D array of integer flows along each of the edges in the graph.
        """

    def compute_residues(self, wrapdata: ArrayLike) -> ArrayLike:
        """Solver should return residues corresponding to input wrapped data.

        Parameters
        ----------
        wrapdata: ArrayLike
            1D array as input in radians or complex numbers. Same size as
            number of points in the graph.

        Returns
        -------
        residues: ArrayLike
            1D array of integer residues corresponding to the cycles in the
            graph. Includes the grounding node.
        """

    def compute_residues_from_gradients(self, graddata: ArrayLike) -> ArrayLike:
        """Solver should return residues corresponding to edge gradients.

        Parameters
        ----------
        graddata: ArrayLike
            1D array as input in radians. Same size as number of edges in the
            primal graph.

        Returns
        -------
        residues: ArrayLike
            1D array of integer residues corresponding to the cycles in the
            graph. Includes the grounding node.
        """

    def residues_to_flows_many(
        self,
        residues: ArrayLike,
        cost: ArrayLike,
        revcost: ArrayLike | None = None,
        worker_count: int | None = None,
        chunksize: int | None = 1,
    ) -> ArrayLike:
        """Solver should return integer flows corresponding to given residues.

        Parameters
        ----------
        residues: ArrayLike
            2D array of integer residues of size (nruns, ncycles + 1). Each row
            of the array must sum to zero.
        cost: ArrayLike
            1D array of nonnegative integer costs. Same size as number of edges
            in the graph. Represents forward directional cost when used in
            combination with revcost.
        revcost: ArrayLike | None
            1D array of nonnegative integer costs on links in reverse direction.
            Same size as number of edges in the graph. cost is used if not
            provided.

        Returns
        -------
        flows: ArrayLike
            2D array of integer flows of along each of the edges in the graph.
            Returned array is of size (nruns, nedges).
        """

cycle_length property

Length of cycles in primal graph.

cycles property

Cycles in primal graph.

dual_edge_dir property

Array containing orientation of edge in cycle.

This is of size (nedges, 2).

dual_edges property

Returns dual edges of the graph.

This is of size (nedges, 2)

edges property

Edges in primal graph.

ncycles property

Number of cycles in primal graph.

nedges property

Number of links in primal graph.

npoints property

Number of points in primal graph.

points property

Points in primal graph.

__init__(graph)

Initialize solver with the graph.

Source code in src/spurt/mcf/_interface.py
53
54
def __init__(self, graph: PlanarGraphInterface):
    """Initialize solver with the graph."""

compute_residues(wrapdata)

Solver should return residues corresponding to input wrapped data.

Parameters:

Name Type Description Default
wrapdata ArrayLike

1D array as input in radians or complex numbers. Same size as number of points in the graph.

required

Returns:

Name Type Description
residues ArrayLike

1D array of integer residues corresponding to the cycles in the graph. Includes the grounding node.

Source code in src/spurt/mcf/_interface.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def compute_residues(self, wrapdata: ArrayLike) -> ArrayLike:
    """Solver should return residues corresponding to input wrapped data.

    Parameters
    ----------
    wrapdata: ArrayLike
        1D array as input in radians or complex numbers. Same size as
        number of points in the graph.

    Returns
    -------
    residues: ArrayLike
        1D array of integer residues corresponding to the cycles in the
        graph. Includes the grounding node.
    """

compute_residues_from_gradients(graddata)

Solver should return residues corresponding to edge gradients.

Parameters:

Name Type Description Default
graddata ArrayLike

1D array as input in radians. Same size as number of edges in the primal graph.

required

Returns:

Name Type Description
residues ArrayLike

1D array of integer residues corresponding to the cycles in the graph. Includes the grounding node.

Source code in src/spurt/mcf/_interface.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def compute_residues_from_gradients(self, graddata: ArrayLike) -> ArrayLike:
    """Solver should return residues corresponding to edge gradients.

    Parameters
    ----------
    graddata: ArrayLike
        1D array as input in radians. Same size as number of edges in the
        primal graph.

    Returns
    -------
    residues: ArrayLike
        1D array of integer residues corresponding to the cycles in the
        graph. Includes the grounding node.
    """

residues_to_flows(residues, cost, revcost=None)

Solver should return integer flows corresponding to given residues.

Parameters:

Name Type Description Default
residues ArrayLike

1D array of integer residues. Same size as number of cycles in graph plus one to accommodate the grounding node. Array must sum to zero.

required
cost ArrayLike

1D array of nonnegative integer costs. Same size as number of edges in the graph. Represents forward directional cost when used in combination with revcost.

required
revcost ArrayLike | None

1D array of nonnegative integer costs on links in reverse direction. Same size as number of edges in the graph. cost is used if not provided.

None

Returns:

Name Type Description
flows ArrayLike

1D array of integer flows along each of the edges in the graph.

Source code in src/spurt/mcf/_interface.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def residues_to_flows(
    self, residues: ArrayLike, cost: ArrayLike, revcost: ArrayLike | None = None
) -> ArrayLike:
    """Solver should return integer flows corresponding to given residues.

    Parameters
    ----------
    residues: ArrayLike
        1D array of integer residues. Same size as number of cycles in
        graph plus one to accommodate the grounding node. Array must sum to
        zero.
    cost: ArrayLike
        1D array of nonnegative integer costs. Same size as number of edges
        in the graph. Represents forward directional cost when used in
        combination with revcost.
    revcost: ArrayLike | None
        1D array of nonnegative integer costs on links in reverse direction.
        Same size as number of edges in the graph. cost is used if not
        provided.

    Returns
    -------
    flows: ArrayLike
        1D array of integer flows along each of the edges in the graph.
    """

residues_to_flows_many(residues, cost, revcost=None, worker_count=None, chunksize=1)

Solver should return integer flows corresponding to given residues.

Parameters:

Name Type Description Default
residues ArrayLike

2D array of integer residues of size (nruns, ncycles + 1). Each row of the array must sum to zero.

required
cost ArrayLike

1D array of nonnegative integer costs. Same size as number of edges in the graph. Represents forward directional cost when used in combination with revcost.

required
revcost ArrayLike | None

1D array of nonnegative integer costs on links in reverse direction. Same size as number of edges in the graph. cost is used if not provided.

None

Returns:

Name Type Description
flows ArrayLike

2D array of integer flows of along each of the edges in the graph. Returned array is of size (nruns, nedges).

Source code in src/spurt/mcf/_interface.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def residues_to_flows_many(
    self,
    residues: ArrayLike,
    cost: ArrayLike,
    revcost: ArrayLike | None = None,
    worker_count: int | None = None,
    chunksize: int | None = 1,
) -> ArrayLike:
    """Solver should return integer flows corresponding to given residues.

    Parameters
    ----------
    residues: ArrayLike
        2D array of integer residues of size (nruns, ncycles + 1). Each row
        of the array must sum to zero.
    cost: ArrayLike
        1D array of nonnegative integer costs. Same size as number of edges
        in the graph. Represents forward directional cost when used in
        combination with revcost.
    revcost: ArrayLike | None
        1D array of nonnegative integer costs on links in reverse direction.
        Same size as number of edges in the graph. cost is used if not
        provided.

    Returns
    -------
    flows: ArrayLike
        2D array of integer flows of along each of the edges in the graph.
        Returned array is of size (nruns, nedges).
    """

unwrap_one(wrapdata, cost, revcost=None)

Solver should return unwrapped phase and flows on the edges.

Parameters:

Name Type Description Default
wrapdata ArrayLike

1D array as input in radians or as complex numbers. Same size as number of points in the graph.

required
cost ArrayLike

1D array of nonnegative integer costs. Same size as number of edges in the graph. Represents forward directional cost when used in combination with revcost.

required
revcost ArrayLike | None

1D array of nonnegative integer costs on links in reverse direction. Same size as number of edges in the graph. cost is used if not provided.

None

Returns:

Name Type Description
unwdata ArrayLike

1D array of unwrapped phase in radians. Same size as number of points in the graph.

flows ArrayLike

1D array of integer flows along each of the edges in the graph.

Source code in src/spurt/mcf/_interface.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def unwrap_one(
    self, wrapdata: ArrayLike, cost: ArrayLike, revcost: ArrayLike | None = None
) -> tuple[ArrayLike, ArrayLike]:
    """Solver should return unwrapped phase and flows on the edges.

    Parameters
    ----------
    wrapdata: ArrayLike
        1D array as input in radians or as complex numbers. Same size as
        number of points in the graph.
    cost: ArrayLike
        1D array of nonnegative integer costs. Same size as number of edges in
        the graph. Represents forward directional cost when used in
        combination with revcost.
    revcost: ArrayLike | None
        1D array of nonnegative integer costs on links in reverse direction.
        Same size as number of edges in the graph. cost is used if not
        provided.

    Returns
    -------
    unwdata: ArrayLike
        1D array of unwrapped phase in radians. Same size as number of
        points in the graph.
    flows: ArrayLike
        1D array of integer flows along each of the edges in the graph.
    """

ORMCFSolver

Bases: MCFSolverInterface

Minimum cost flow solver interface.

Implementation of MCF solver using OR tools python bindings.

Source code in src/spurt/mcf/_ortools.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class ORMCFSolver(MCFSolverInterface):
    """Minimum cost flow solver interface.

    Implementation of MCF solver using OR tools python bindings.
    """

    def __init__(self, graph: PlanarGraphInterface):
        """Initialize solver using a planar graph.

        Initializes with information related to the dual graph as
        well to allow us to efficiently reuse the solver with
        multiple inputs
        """
        # We borrow the arrays and avoid copying
        self._graph: PlanarGraphInterface = graph

        # These are needed for MCF
        # Edges represent arcs between cycles
        self._dual_edges: np.ndarray = np.zeros((self.nedges, 2), dtype=np.int32)
        # One-to-one correspondence with _dual_edges and represents
        # relative orientation of an edge within a cycle
        # 1 implies increasing/ fwd direction, -1 implies decreasing/reverse
        # direction and zero denotes an edge to the grounding node
        self._dual_edge_dir: np.ndarray = np.zeros((self.nedges, 2), dtype=np.int8)
        self._prepare_dual()

    @property
    def npoints(self) -> int:
        return self._graph.npoints

    @property
    def points(self) -> np.ndarray:
        return self._graph.points

    @property
    def nedges(self) -> int:
        return len(self._graph.links)

    @property
    def ncycles(self) -> int:
        return len(self._graph.cycles)

    @property
    def edges(self) -> np.ndarray:
        return self._graph.links

    @property
    def cycles(self) -> np.ndarray:
        return self._graph.cycles

    @property
    def cycle_length(self) -> int:
        return len(self.cycles[0])

    def _prepare_dual(self) -> None:
        """Identify edges of the dual graph.

        Since, we build the problem in the dual space we will create edges
        in the dual space by iterating over cycles and it neighbors.
        """
        # Add a grounding node to go along with the residues
        # We use 1-index for other nodes as this allows
        # us to use sign information to indicate direction.
        # ground_node = 0

        # Iterate over the loops
        # Map edges to cycles they show up in
        edge_to_cycles: dict = {tuple(link): [] for link in self.edges}
        for icyc, cycle in enumerate(self.cycles):
            cycsize = len(cycle)
            for ii in range(cycsize):
                jj = (ii + 1) % cycsize
                edge = order_points((cycle[ii], cycle[jj]))

                # Sign indicates if the edge order in forward / reverse
                # direction. We augment icyc by 1 to use the sign infomation.
                edge_to_cycles[edge].append(
                    (icyc + 1, sign_nonzero(cycle[jj] - cycle[ii]))
                )

        # Now build list of dual_edges
        for ii, kk in enumerate(edge_to_cycles):
            vv = edge_to_cycles[kk]
            ncyc = len(vv)

            # TODO: Use switch-case here if Python version > 3.10
            if ncyc == 2:
                self._dual_edges[ii, :] = [vv[0][0], vv[1][0]]
                self._dual_edge_dir[ii, :] = [vv[0][1], vv[1][1]]
            elif ncyc == 1:
                # Arrays are already initialized to zero
                self._dual_edges[ii, 0] = vv[0][0]
                self._dual_edge_dir[ii, 0] = vv[0][1]
            else:
                errmsg = (
                    "Planar graph contains edges that are part of more than 2 cycles"
                )
                raise ValueError(errmsg)

    @property
    def dual_edges(self) -> np.ndarray:
        return self._dual_edges

    @property
    def dual_edge_dir(self) -> np.ndarray:
        return self._dual_edge_dir

    def compute_residues(
        self,
        wrapdata: ArrayLike,
    ) -> ArrayLike:
        """Compute phase residues for one set of input wrapped data."""
        if wrapdata.size != self.npoints:
            errmsg = (
                f"Size mismatch for residue computation."
                f" Received {wrapdata.shape} with {self.npoints} points"
            )
            raise ValueError(errmsg)

        # Residues includes the grounding node at index 0
        residues = np.zeros(self.ncycles + 1)
        ndim = self.cycle_length
        for col in range(ndim):
            nn = (col + 1) % ndim
            residues[1:] += phase_diff(
                wrapdata[self.cycles[:, col]], wrapdata[self.cycles[:, nn]]
            )
        residues = np.rint(residues / (2 * np.pi)).astype(int)
        residues[0] = -np.sum(residues[1:])
        return residues

    def compute_residues_from_gradients(
        self,
        graddata: ArrayLike,
    ) -> ArrayLike:

        if graddata.size != self.nedges:
            errmsg = (
                f"Size mismatch for residue computation."
                f" Received {graddata.shape} with {self.nedges} edges"
            )
            raise ValueError(errmsg)

        cyc0 = self.dual_edges[:, 0]
        cyc1 = self.dual_edges[:, 1]
        cyc0_dir = self.dual_edge_dir[:, 0]
        cyc1_dir = self.dual_edge_dir[:, 1]
        grad_sum = np.zeros(self.ncycles + 1, dtype=np.float32)
        # add.at to handle repeated indices
        np.add.at(grad_sum, cyc0, cyc0_dir * graddata)
        np.add.at(grad_sum, cyc1, cyc1_dir * graddata)

        residues = np.rint(grad_sum / (2 * np.pi)).astype(int)
        # Set supply of groud_node
        residues[0] = -np.sum(residues[1:])

        return residues

    def unwrap_one(
        self,
        wrapdata: ArrayLike,
        cost: ArrayLike,
        revcost: ArrayLike | None = None,
    ) -> tuple[ArrayLike, ArrayLike]:
        """Get the unwrapped phase solution for one set of input wrapped data."""
        if revcost is None:
            revcost = cost

        # Compute residues
        residues = self.compute_residues(wrapdata)

        # Instantiate the MCF solver and supply the data
        flows = self.residues_to_flows(residues, cost, revcost=revcost)

        # Flood fill with the flows
        unw = flood_fill(wrapdata, self.edges, flows, mode="points")

        return unw, flows

    def residues_to_flows(
        self,
        residues: np.ndarray,
        cost: np.ndarray,
        revcost: np.ndarray | None = None,
    ) -> np.ndarray:
        """Return flows on the edges corresponding to given set of residues.

        This is exposed to allow for unwrapping with gradients.
        """
        # Only solve if necessary
        if not np.any(residues != 0):
            return np.zeros(self.nedges, dtype=np.int32)

        if revcost is None:
            revcost = cost
        return solve_mcf(self._dual_edges, self._dual_edge_dir, residues, cost, revcost)

    def residues_to_flows_many(
        self,
        residues: np.ndarray,
        cost: np.ndarray,
        revcost: np.ndarray | None = None,
        worker_count: int | None = None,
        chunksize: int | None = 1,
    ) -> np.ndarray:
        """Parallel version of residues_to_flows.

        The worker_count is set to get_cpu_count() - 1 if not provided.
        Treating costs as 1D arrays for now. Can consider 2D at a later time,
        if necessary.
        """
        if (worker_count is None) or (worker_count <= 0):
            worker_count = max(1, get_cpu_count() - 1)

        if revcost is None:
            revcost = cost

        # Get dimensions of the problem
        nruns, nresidues = residues.shape

        if nresidues != self.ncycles + 1:
            errmsg = (
                f"Number of residues {nresidues} does not match number of"
                f" cycles {self.ncycles}"
            )
            raise ValueError(errmsg)

        # Create flows output variable
        flows = np.zeros((nruns, self.nedges), dtype=np.int32)

        # Only use multiprocessing if needed
        if worker_count == 1:
            for ii, res in enumerate(residues):
                if np.any(res != 0):
                    flows[ii, :] = self.residues_to_flows(res, cost, revcost=revcost)

        else:
            print(f"Processing batch of {nruns} with {worker_count} threads")

            def uw_inputs(idxs):
                for ii in idxs:
                    # Only solve if needed
                    if not np.any(residues[ii] != 0):
                        continue

                    yield (
                        ii,
                        self._dual_edges,
                        self._dual_edge_dir,
                        residues[ii],
                        cost,
                        revcost,
                    )

            # Create a pool and dispatch
            # We explicitly use fork here as osx has switched to using spawn
            # and that really slows down the use of multiprocessing
            with get_context("fork").Pool(processes=worker_count) as p:
                mp_tasks = p.imap_unordered(
                    wrap_solve_mcf, uw_inputs(range(nruns)), chunksize=chunksize
                )

                # Gather results
                for res in mp_tasks:
                    flows[res[0], :] = res[1]

        return flows

__init__(graph)

Initialize solver using a planar graph.

Initializes with information related to the dual graph as well to allow us to efficiently reuse the solver with multiple inputs

Source code in src/spurt/mcf/_ortools.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(self, graph: PlanarGraphInterface):
    """Initialize solver using a planar graph.

    Initializes with information related to the dual graph as
    well to allow us to efficiently reuse the solver with
    multiple inputs
    """
    # We borrow the arrays and avoid copying
    self._graph: PlanarGraphInterface = graph

    # These are needed for MCF
    # Edges represent arcs between cycles
    self._dual_edges: np.ndarray = np.zeros((self.nedges, 2), dtype=np.int32)
    # One-to-one correspondence with _dual_edges and represents
    # relative orientation of an edge within a cycle
    # 1 implies increasing/ fwd direction, -1 implies decreasing/reverse
    # direction and zero denotes an edge to the grounding node
    self._dual_edge_dir: np.ndarray = np.zeros((self.nedges, 2), dtype=np.int8)
    self._prepare_dual()

compute_residues(wrapdata)

Compute phase residues for one set of input wrapped data.

Source code in src/spurt/mcf/_ortools.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def compute_residues(
    self,
    wrapdata: ArrayLike,
) -> ArrayLike:
    """Compute phase residues for one set of input wrapped data."""
    if wrapdata.size != self.npoints:
        errmsg = (
            f"Size mismatch for residue computation."
            f" Received {wrapdata.shape} with {self.npoints} points"
        )
        raise ValueError(errmsg)

    # Residues includes the grounding node at index 0
    residues = np.zeros(self.ncycles + 1)
    ndim = self.cycle_length
    for col in range(ndim):
        nn = (col + 1) % ndim
        residues[1:] += phase_diff(
            wrapdata[self.cycles[:, col]], wrapdata[self.cycles[:, nn]]
        )
    residues = np.rint(residues / (2 * np.pi)).astype(int)
    residues[0] = -np.sum(residues[1:])
    return residues

residues_to_flows(residues, cost, revcost=None)

Return flows on the edges corresponding to given set of residues.

This is exposed to allow for unwrapping with gradients.

Source code in src/spurt/mcf/_ortools.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def residues_to_flows(
    self,
    residues: np.ndarray,
    cost: np.ndarray,
    revcost: np.ndarray | None = None,
) -> np.ndarray:
    """Return flows on the edges corresponding to given set of residues.

    This is exposed to allow for unwrapping with gradients.
    """
    # Only solve if necessary
    if not np.any(residues != 0):
        return np.zeros(self.nedges, dtype=np.int32)

    if revcost is None:
        revcost = cost
    return solve_mcf(self._dual_edges, self._dual_edge_dir, residues, cost, revcost)

residues_to_flows_many(residues, cost, revcost=None, worker_count=None, chunksize=1)

Parallel version of residues_to_flows.

The worker_count is set to get_cpu_count() - 1 if not provided. Treating costs as 1D arrays for now. Can consider 2D at a later time, if necessary.

Source code in src/spurt/mcf/_ortools.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def residues_to_flows_many(
    self,
    residues: np.ndarray,
    cost: np.ndarray,
    revcost: np.ndarray | None = None,
    worker_count: int | None = None,
    chunksize: int | None = 1,
) -> np.ndarray:
    """Parallel version of residues_to_flows.

    The worker_count is set to get_cpu_count() - 1 if not provided.
    Treating costs as 1D arrays for now. Can consider 2D at a later time,
    if necessary.
    """
    if (worker_count is None) or (worker_count <= 0):
        worker_count = max(1, get_cpu_count() - 1)

    if revcost is None:
        revcost = cost

    # Get dimensions of the problem
    nruns, nresidues = residues.shape

    if nresidues != self.ncycles + 1:
        errmsg = (
            f"Number of residues {nresidues} does not match number of"
            f" cycles {self.ncycles}"
        )
        raise ValueError(errmsg)

    # Create flows output variable
    flows = np.zeros((nruns, self.nedges), dtype=np.int32)

    # Only use multiprocessing if needed
    if worker_count == 1:
        for ii, res in enumerate(residues):
            if np.any(res != 0):
                flows[ii, :] = self.residues_to_flows(res, cost, revcost=revcost)

    else:
        print(f"Processing batch of {nruns} with {worker_count} threads")

        def uw_inputs(idxs):
            for ii in idxs:
                # Only solve if needed
                if not np.any(residues[ii] != 0):
                    continue

                yield (
                    ii,
                    self._dual_edges,
                    self._dual_edge_dir,
                    residues[ii],
                    cost,
                    revcost,
                )

        # Create a pool and dispatch
        # We explicitly use fork here as osx has switched to using spawn
        # and that really slows down the use of multiprocessing
        with get_context("fork").Pool(processes=worker_count) as p:
            mp_tasks = p.imap_unordered(
                wrap_solve_mcf, uw_inputs(range(nruns)), chunksize=chunksize
            )

            # Gather results
            for res in mp_tasks:
                flows[res[0], :] = res[1]

    return flows

unwrap_one(wrapdata, cost, revcost=None)

Get the unwrapped phase solution for one set of input wrapped data.

Source code in src/spurt/mcf/_ortools.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def unwrap_one(
    self,
    wrapdata: ArrayLike,
    cost: ArrayLike,
    revcost: ArrayLike | None = None,
) -> tuple[ArrayLike, ArrayLike]:
    """Get the unwrapped phase solution for one set of input wrapped data."""
    if revcost is None:
        revcost = cost

    # Compute residues
    residues = self.compute_residues(wrapdata)

    # Instantiate the MCF solver and supply the data
    flows = self.residues_to_flows(residues, cost, revcost=revcost)

    # Flood fill with the flows
    unw = flood_fill(wrapdata, self.edges, flows, mode="points")

    return unw, flows