Skip to content

utils

centroid_costs(points, cycles, dual_edges, scale=100.0)

Estimate edge costs based on centroid distance in dual graph.

Should probably relocate to a common area where cost functions are maintained at a later date.

Parameters:

Name Type Description Default
points ndarray

Location of points on data/primal grid

required
cycles ndarray | list[list[int]]

Cycles in the primal graph

required
dual_edges ndarray

Array of size (nedges, 2) where each element represents the cycle in which a primal edge participates. 1-index to account for grounding node in the dual graph.

required

Returns:

Name Type Description
cost ndarray

Nonnegative integer cost of the form 1 + distance / scale. Boundary edge costs are set to zero.

Source code in src/spurt/mcf/utils.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def centroid_costs(
    points: np.ndarray,
    cycles: np.ndarray | list[list[int]],
    dual_edges: np.ndarray,
    scale: float = 100.0,
) -> np.ndarray:
    """Estimate edge costs based on centroid distance in dual graph.

    Should probably relocate to a common area where cost functions are
    maintained at a later date.

    Parameters
    ----------
    points: np.ndarray
        Location of points on data/primal grid
    cycles: np.ndarray
        Cycles in the primal graph
    dual_edges: np.ndarray
        Array of size (nedges, 2) where each element represents the cycle in
        which a primal edge participates. 1-index to account for grounding node
        in the dual graph.

    Returns
    -------
    cost: np.ndarray
        Nonnegative integer cost of the form 1 + distance / scale. Boundary
        edge costs are set to zero.
    """
    cost = np.zeros(dual_edges.shape[0], dtype=int)
    centroids = np.zeros((len(cycles), 2))
    for ii, cycle in enumerate(cycles):
        centroids[ii] = np.mean(points[cycle], axis=0)

    for ii, edge in enumerate(dual_edges):
        # If connected to grounding node
        if edge[1] == 0:
            continue

        d = np.linalg.norm(centroids[abs(edge[0]) - 1] - centroids[abs(edge[1]) - 1])
        cost[ii] = np.rint(1 + scale / d)

    return cost

distance_costs(points, edges, scale=100.0)

Estimate edge costs based on distance between points in primal graph.

Should probably relocate to a common area where cost functions are maintained at a later date.

Parameters:

Name Type Description Default
points ndarray

Locations on points on data/primal grid

required
edges ndarray

Array of size (nedges, 2) containing indices of connected points/nodes.

required

Returns:

Name Type Description
cost ndarray

Nonnegative integer cost of the form 1 + distance / scale

Source code in src/spurt/mcf/utils.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def distance_costs(
    points: np.ndarray,
    edges: np.ndarray,
    scale: float = 100.0,
) -> np.ndarray:
    """Estimate edge costs based on distance between points in primal graph.

    Should probably relocate to a common area where cost functions are
    maintained at a later date.

    Parameters
    ----------
    points: np.ndarray
        Locations on points on data/primal grid
    edges: np.ndarray
        Array of size (nedges, 2) containing indices of connected points/nodes.

    Returns
    -------
    cost: np.ndarray
        Nonnegative integer cost of the form 1 + distance / scale
    """
    cost = np.zeros(edges.shape[0], dtype=int)
    for ii, edge in enumerate(edges):
        d = np.linalg.norm(points[edge[0]] - points[edge[1]])
        cost[ii] = np.rint(1 + scale / d)

    return cost

flood_fill(indata, links, flows, mode)

Flood fill unwrapping.

Given input data and links for those links start at an arbitrary point and walk along links adding the gradient. When we encounter a cycle, make sure that walking either path around the cycle will result in the same answer. Return the point values. This version has a lot of debugging friendly features. This method assumes that the graph is connected.

Parameters:

Name Type Description Default
indata ndarray

Input wrapped phase/ gradient data as 1D array. Same size as number of points/ edges in graph.

required
links ndarray

Links specifed as tuples of point indices. The links should represented a fully connected graph.

required
flows ndarray

Integer cycles to be added to each link.

required
mode str

Can be one of 'points' or 'gradients'

required

Returns:

Name Type Description
unwrapped ndarray

Unwrapped phase in radians. Same size as indata.

Source code in src/spurt/mcf/utils.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def flood_fill(indata: np.ndarray, links: np.ndarray, flows: np.ndarray, mode: str):
    """Flood fill unwrapping.

    Given input data and links for those links start at an arbitrary point
    and walk along links adding the gradient. When we encounter a cycle,
    make sure that walking either path around the cycle will result in
    the same answer. Return the point values. This version has a lot of
    debugging friendly features. This method assumes that the graph is
    connected.

    Parameters
    ----------
    indata: np.ndarray
        Input wrapped phase/ gradient data as 1D array. Same size as number of
        points/ edges in graph.
    links : np.ndarray
        Links specifed as tuples of point indices. The links should represented
        a fully connected graph.
    flows : np.ndarray
        Integer cycles to be added to each link.
    mode: str
        Can be one of 'points' or 'gradients'

    Returns
    -------
    unwrapped : np.ndarray
        Unwrapped phase in radians. Same size as indata.
    """
    if len(links) != len(flows):
        errmsg = f"Dimension mismatch - {links.shape} vs {flows.shape}"
        raise ValueError(errmsg)

    if mode not in ["points", "gradients"]:
        errmsg = f"mode can be 'points' or 'gradients'. got {mode}"
        raise ValueError(errmsg)

    npts = np.max(links) + 1
    if mode == "gradients":
        if len(indata) != len(links):
            errmsg = f"Shape mismatch in gradients mode. {len(indata)} vs {len(links)}"
            raise ValueError(errmsg)

        input_is_pts = False

    elif mode == "points":
        if len(indata) != npts:
            errmsg = f"Shape mismatch in point mode. {len(indata)} vs {npts}"
            raise ValueError(errmsg)

        input_is_pts = True

    else:
        errmsg = f"Invalid mode: {mode}"
        raise RuntimeError(errmsg)

    # Indices of points
    pts = np.arange(npts)

    # Mapping of points to its immediate neighbors and gradient on the link
    pts_to_nbrs: dict = {pt: [] for pt in pts}

    # Iterate over the links
    for ii, link in enumerate(links):
        # Get the unwrapped phase gradient by adding flows to
        if input_is_pts:
            gradient = (
                phase_diff(indata[link[0]], indata[link[1]]) + 2 * np.pi * flows[ii]
            )
        else:
            gradient = indata[ii] + 2 * np.pi * flows[ii]

        # Add the link in either direction with appropriate gradient sign
        pts_to_nbrs[link[0]].append((link[1], gradient))
        pts_to_nbrs[link[1]].append((link[0], -gradient))

    # To track if a pixel has been unwrapped already
    done = np.zeros(len(pts), dtype=bool)

    # Track indices of point yet to be unwrapped
    to_do = []

    # To store unwrapped value that will be returned
    unwrapped = np.zeros(len(pts))

    # To track the integration path to each point
    pts_to_paths: dict = {pt: [] for pt in pts}

    # Start with point with index 0 - this is the reference
    to_do.append(0)
    done[0] = True
    pts_to_paths[0].append(0)

    # This is for reporting in case things go wrong
    # This should never happen but having this on helps
    # with fast debugging of sign errors
    multi_paths = []

    # Continue till all points are unwrapped
    while to_do:

        # Get the first pixel from the to do list
        i = to_do.pop(0)

        # For each of its neighbors and associated gradient
        for j, g in pts_to_nbrs[i]:

            # Unwrap by adding the gradient
            u = unwrapped[i] + g

            # If not unwrapped, now label as unwrapped
            if not done[j]:
                unwrapped[j] = u
                done[j] = True
                # Track the path to the point
                pts_to_paths[j] = pts_to_paths[i] + [j]
                # Add new point to to do list
                to_do.append(j)

            # If already unwrapped, verify values are numerically compatible
            elif np.abs(u - unwrapped[j]) > 1e-3:
                # If you get here, unwrapping path dependent - track for
                # debugging
                multi_paths.append(
                    (pts_to_paths[j], pts_to_paths[i] + [j], u - unwrapped[j])
                )

    # Report issues if any - this is bad sign if we get here
    if len(multi_paths) > 0:
        errmsg = f"Error: Encountered {len(multi_paths)} closure errors"
        raise ValueError(errmsg)

    # Check that all points were visited
    if not np.all(done):
        errmsg = (
            "Failed to integrate all flows. The input graph must be fully connected."
        )
        raise ValueError(errmsg)

    # Adding the source node value - we started at index 0
    if input_is_pts:
        if np.iscomplexobj(indata):
            unwrapped += np.angle(indata[0])
        else:
            unwrapped += indata[0]

    return unwrapped

phase_diff(z0, z1, model=0.0)

Compute the wrapped phase difference for between two numbers in radians.

If a model is provided, represents phase difference within +/-pi of the model.

Parameters:

Name Type Description Default
z0 ndarray

Can be complex or real

required
z1 ndarray

Same type as z0

required
model float | ndarray

Real array with a model of the phase difference

0.0
Source code in src/spurt/mcf/utils.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def phase_diff(
    z0: np.ndarray, z1: np.ndarray, model: float | np.ndarray = 0.0
) -> np.ndarray:
    """Compute the wrapped phase difference for between two numbers in radians.

    If a model is provided, represents phase difference within +/-pi of the model.

    Parameters
    ----------
    z0 : np.ndarray
        Can be complex or real
    z1 : np.ndarray
        Same type as z0
    model: float | np.ndarray, optional
        Real array with a model of the phase difference
    """
    if np.iscomplexobj(z0):
        p0 = np.angle(z0)
        p1 = np.angle(z1)
        d = p1 - p0 - model
    else:
        d = z1 - z0 - model
    return model + d - np.round(d / (2 * np.pi)) * 2 * np.pi

sign_nonzero(x)

Return +1 if x > 0 and -1 for x < 0.

Non-zero value should not be passed in.

Source code in src/spurt/mcf/utils.py
31
32
33
34
35
36
37
38
def sign_nonzero(x: float) -> int:
    """Return +1 if x > 0 and -1 for x < 0.

    Non-zero value should not be passed in.
    """
    if x > 0:
        return 1
    return -1