Skip to content

emcf

GeneralSettings dataclass

Settings associated with breaking data into tiles.

Parameters:

Name Type Description Default
use_tiles bool

Tile up data spatially.

True
intermediate_folder str

Path to folder where intermediate outputs are created.

'./emcf_tmp'
output_folder str

Path to output folder.

'./emcf'
Source code in src/spurt/workflows/emcf/_settings.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@dataclass
class GeneralSettings:
    """Settings associated with breaking data into tiles.

    Parameters
    ----------
    use_tiles: bool
        Tile up data spatially.
    intermediate_folder: str
        Path to folder where intermediate outputs are created.
    output_folder: str
        Path to output folder.
    """

    use_tiles: bool = True
    intermediate_folder: str = "./emcf_tmp"
    output_folder: str = "./emcf"

    @property
    def tiles_jsonname(self) -> Path:
        return Path(self.intermediate_folder) / "tiles.json"

    def tile_filename(self, num: int) -> Path:
        """Input index is zero-based."""
        return Path(self.intermediate_folder) / f"uw_tile_{num:02d}.h5"

    @property
    def overlap_filename(self) -> Path:
        return Path(self.intermediate_folder) / "overlaps.h5"

    def overlap_groupname(self, ii: int, jj: int) -> str:
        return f"{ii:02d}_{jj:02d}"

    @property
    def offsets_filename(self) -> Path:
        return Path(self.intermediate_folder) / "bulk_offsets.h5"

    def unw_filename(self, d1: str, d2: str) -> Path:
        return Path(self.output_folder) / f"{d1}_{d2}.unw.tif"

    def __post_init__(self):
        p = Path(self.output_folder)
        if not p.is_dir():
            p.mkdir(exist_ok=True)

        p = Path(self.intermediate_folder)
        if not p.is_dir():
            p.mkdir(exist_ok=True)

tile_filename(num)

Input index is zero-based.

Source code in src/spurt/workflows/emcf/_settings.py
112
113
114
def tile_filename(self, num: int) -> Path:
    """Input index is zero-based."""
    return Path(self.intermediate_folder) / f"uw_tile_{num:02d}.h5"

MergerSettings dataclass

Class for holding tile merging settings.

Parameters:

Name Type Description Default
min_overlap_points int

Minimum number of pixels in overlap region for it to be considered valid.

25
method str

Currently, only "dirichlet" is supported.

'dirichlet'
bulk_method str

Method used to estimate bulk offset between tiles. Supported methods are 'integer' and 'L2'.

'L2'
num_parallel_ifgs int

Number of interferograms to merge in one batch. Use zero to merge all interferograms in a single batch.

1
Source code in src/spurt/workflows/emcf/_settings.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@dataclass
class MergerSettings:
    """Class for holding tile merging settings.

    Parameters
    ----------
    min_overlap_points: int
        Minimum number of pixels in overlap region for it to be considered
        valid.
    method: str
        Currently, only "dirichlet" is supported.
    bulk_method: str
        Method used to estimate bulk offset between tiles. Supported methods
        are 'integer' and 'L2'.
    num_parallel_ifgs: int
        Number of interferograms to merge in one batch. Use zero to merge all
        interferograms in a single batch.
    """

    min_overlap_points: int = 25
    method: str = "dirichlet"
    bulk_method: str = "L2"
    num_parallel_ifgs: int = 1

    def __post_init__(self):
        bulk_methods = {"integer", "L2"}
        if self.bulk_method not in bulk_methods:
            errmsg = (
                f"bulk_method must be one of {bulk_methods}. got {self.bulk_method}"
            )
            raise ValueError(errmsg)

        if self.method != "dirichlet":
            errmsg = f"'dirichlet' is the only valid option, got {self.method}"
            raise ValueError(errmsg)

Solver

Implementation of the EMCF algorithm.

Implements the Extended Minimum Cost Flow (EMCF) algorithm for phase unwrapping [Pepe and Lanari, 2006]. We only implement the solver framework [Olsen et al., 2023] as the graph generation and cost function implementations are not exactly replicable without more details.

Pepe and Lanari, 2006

Pepe A. and Lanari R., 2006. On the Extension of the Minimum Cost Flow Algorithm for Phase Unwrapping of Multitemporal Differential SAR Interferograms. IEEE Transactions on Geoscience and Remote Sensing. 44, pp.2374--2383. 10.1109/TGRS.2006.873207

Olsen et al., 2023

Olsen K.M., Calef M.T. and Agram P.S., 2023. Contextual uncertainty assessments for InSAR-based deformation retrieval using an ensemble approach. Remote Sensing of Environment. 287, pp.113456.

Source code in src/spurt/workflows/emcf/_solver.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
class EMCFSolver:
    """Implementation of the EMCF algorithm.

    Implements the Extended Minimum Cost Flow (EMCF) algorithm for
    phase unwrapping [@Pepe2006ExtensionMinimumCost]. We only implement the solver
    framework [@Olsen2023ContextualUncertaintyAssessments] as the
    graph generation and cost function implementations are not exactly
    replicable without more details.
    """

    def __init__(
        self,
        solver_space: MCFSolverInterface,
        solver_time: MCFSolverInterface,
        settings: SolverSettings,
        link_model: LinkModelInterface | None = None,
    ):
        """Spatio-temporal unwrapping.

        Parameters
        ----------
        solver_space: MCFSolverInterface
            MCF Solver interface for spatial graph connecting the stable points.
            Typically a Delaunaytriangulation.
        solver_time: MCFSolverInterface
            MCF Solver interface for temporal graph usually representing
            interferograms in time-Bperp space. Typically a Delaunay triangulation.
        settings: SolverSettings
            Settings to be used for setting up the solver like number of
            workers, link batch size etc.
        model: LinkModel | None
            Per-link model in time used to correct the gradients before
            unwrapping.
        """
        self._solver_space = solver_space
        self._solver_time = solver_time
        self._settings = settings
        self._link_model = link_model

        if link_model is not None:
            errmsg = "Not implemented yet."
            raise NotImplementedError(errmsg)

    @property
    def npoints(self) -> int:
        """Number of points in the spatial network."""
        return self._solver_space.npoints

    @property
    def nlinks(self) -> int:
        """Number of links in the spatial network."""
        return self._solver_space.nedges

    @property
    def nepochs(self) -> int:
        """Number of points in the temporal network."""
        return self._solver_time.npoints

    @property
    def nifgs(self) -> int:
        """Number of links in the temporal network."""
        return self._solver_time.nedges

    @property
    def settings(self) -> SolverSettings:
        """Retrieve settings for the workflow."""
        return self._settings

    @property
    def link_model(self) -> LinkModelInterface | None:
        """Retrieve the link model for the workflow."""
        return self._link_model

    def unwrap_cube(self, wrap_data: Irreg3DInput) -> np.ndarray:
        """Unwrap a 3D cube of data.

        Parameters
        ----------
        wrap_data: np.ndarray
            2D array of size (nslc, npoints) or (nifg, npoints).

        Returns
        -------
        uw_data: np.ndarray
            2D float32 array of size (nifg, npoints).
        """
        if wrap_data.ndim != 2:
            errmsg = f"Input data is not a 2D array - {wrap_data.ndim}"
            raise ValueError(errmsg)

        if wrap_data.time_dim != 0:
            errmsg = "Time must be first dimension in input stack."
            raise NotImplementedError(errmsg)

        input_is_ifg: bool = False
        if wrap_data.data.shape[0] == self.nepochs:
            input_is_ifg = False
        elif wrap_data.data.shape[0] == self.nifgs:
            input_is_ifg = True
        else:
            errmsg = (
                f"Input size {wrap_data.data.shape[0]} does not match solver"
                f" for {self.nifgs} Ifgs from {self.nepochs} images"
            )
            raise ValueError(errmsg)

        # First unwrap in time to get spatial gradients
        grad_space: np.ndarray = self.unwrap_gradients_in_time(
            wrap_data.data, input_is_ifg=input_is_ifg
        )

        # Then unwrap spatial gradients
        return self.unwrap_gradients_in_space(grad_space)

    def unwrap_gradients_in_time(
        self, wrap_data: np.ndarray, *, input_is_ifg: bool
    ) -> np.ndarray:
        """Temporally unwrap links in parallel.

        The output of this step is the temporally unwrapped phase gradients on
        each link of the spatial graph.
        """
        # First set up temporal cost
        if self.settings.t_cost_type == "constant":
            cost = np.ones(self.nifgs, dtype=int)
        elif self.settings.t_cost_type == "distance":
            cost = utils.distance_costs(
                self._solver_time.points,
                self._solver_time.edges,
                scale=self.settings.t_cost_scale,
            )
        elif self.settings.t_cost_type == "centroid":
            cost = utils.centroid_costs(
                self._solver_time.points,
                self._solver_time.cycles,
                self._solver_time.dual_edges,
                scale=self.settings.t_cost_scale,
            )
        else:
            errmsg = f"Unknown cost type: {self.settings.t_cost_type}"
            raise ValueError(errmsg)

        # Create output array
        grad_space: np.ndarray = np.zeros((self.nifgs, self.nlinks), dtype=np.float32)

        logger.info(f"Temporal: Number of interferograms: {self.nifgs}")
        logger.info(f"Temporal: Number of links: {self.nlinks}")
        logger.info(f"Temporal: Number of cycles: {self._solver_time.ncycles}")

        # Number of batches to process
        nbatches: int = ((self.nlinks - 1) // self.settings.links_per_batch) + 1

        # Iterate over batches
        for bb in range(nbatches):
            i_start = bb * self.settings.links_per_batch
            i_end = min(i_start + self.settings.links_per_batch, self.nlinks)
            links_in_batch = i_end - i_start
            if links_in_batch == 0:
                continue

            # Get indices of points forming links from spatial graph
            inds = self._solver_space.edges[i_start:i_end, :]

            # TODO: Incorporate link_model here when ready
            # Add self._modeled_phase_diff to replace phase_diff

            # Compute spatial gradients for each link
            # If input data is already interferograms
            if input_is_ifg:
                grad_space[:, i_start:i_end] = utils.phase_diff(
                    wrap_data[:, inds[:, 0]], wrap_data[:, inds[:, 1]]
                )
            else:
                logger.info(f"Temporal: Preparing batch {bb + 1}/{nbatches}")
                self._ifg_spatial_gradients_from_slc(
                    wrap_data, inds, grad_space, np.s_[i_start:i_end]
                )

            # Compute residues for each cycle in temporal graph
            # Easier to loop over interferograms here
            ncycles: int = len(self._solver_time.cycles)
            grad_sum: np.ndarray = np.zeros(
                (links_in_batch, ncycles + 1), dtype=np.float32
            )
            for ii in range(self.nifgs):
                # Cycles that ifg contributes to
                cyc = np.abs(self._solver_time.dual_edges[ii])
                cyc_dir = self._solver_time.dual_edge_dir[ii]
                grad_sum[:, cyc[0]] += cyc_dir[0] * grad_space[ii, i_start:i_end]
                if cyc[1] != 0:
                    grad_sum[:, cyc[1]] += cyc_dir[1] * grad_space[ii, i_start:i_end]

            residues = np.rint(grad_sum / (2 * np.pi)).astype(int)
            # Set grounding node
            residues[:, 0] = -np.sum(residues[:, 1:], axis=1)

            # Unwrap the batch
            logger.info(f"Temporal: Unwrapping batch {bb + 1}/{nbatches}")
            flows = self._solver_time.residues_to_flows_many(
                residues,
                cost,
                worker_count=self.settings.t_worker_count,
                chunksize=50000,
            )

            # Update the spatial gradients with estimated flows
            grad_space[:, i_start:i_end] += 2 * np.pi * flows.T

        return grad_space

    def unwrap_gradients_in_space(self, grad_space: np.ndarray) -> np.ndarray:
        """Spatially unwrap each interferogram sequentially."""
        # First set up spatial cost
        if self.settings.s_cost_type == "constant":
            cost = np.ones(self.nlinks, dtype=int)
        elif self.settings.s_cost_type == "distance":
            cost = utils.distance_costs(
                self._solver_space.points,
                self._solver_space.edges,
                scale=self.settings.s_cost_scale,
            )
        elif self.settings.s_cost_type == "centroid":
            cost = utils.centroid_costs(
                self._solver_space.points,
                self._solver_space.cycles,
                self._solver_space.dual_edges,
                scale=self.settings.s_cost_scale,
            )
        else:
            errmsg = f"Unknown cost type: {self.settings.s_cost_type}"
            raise ValueError(errmsg)

        # Create output array
        uw_data = np.zeros((self.nifgs, self.npoints), dtype=np.float32)

        logger.info(f"Spatial: Number of interferograms: {self.nifgs}")
        logger.info(f"Spatial: Number of links: {self.nlinks}")
        logger.info(f"Spatial: Number of cycles: {self._solver_space.ncycles}")

        nworkers = self.settings.s_worker_count
        if nworkers < 1:
            nworkers = get_cpu_count() - 1

        mp_context = mp.get_context("fork")
        with ProcessPoolExecutor(
            max_workers=nworkers, mp_context=mp_context
        ) as executor:
            futures = {
                executor.submit(
                    _unwrap_ifg_in_space,
                    grad_space[ii, :],
                    self._solver_space,
                    cost,
                    ii,
                ): ii
                for ii in range(self.nifgs)
            }
            for fut in as_completed(futures):
                ii, data = fut.result()
                futures.pop(fut)
                uw_data[ii, :] = data
        return uw_data

    def _ifg_spatial_gradients_from_slc(
        self,
        wrap_data: np.ndarray,
        edges: np.ndarray,
        grad_space: np.ndarray,
        link_slice: slice,
    ) -> None:
        """Compute interferometric spatial gradients from slc data.

        Parameters
        ----------
        wrap_data: np.ndarray
            Wrapped slc data 2D array for whole graph of shape (nslc, npts)
        edges: np.ndarray
            2D array corresponding to edges in spatial graph. These are a
            subset of all links in the graph.
        grad_space: np.ndarray
            Spatial gradient array for the whole graph of shape (nifg, nlinks).
            This array gets updated in place.
        link_slice: slice
            Slice corresponding to edges within the array of all links.
        """
        # Interferogram edges
        ifg_inds = self._solver_time.edges

        # Extract SLC data first
        slc_data0 = wrap_data[:, edges[:, 0]]
        slc_data1 = wrap_data[:, edges[:, 1]]

        # Make interferograms for extracted points
        ifg_data0 = utils.phase_diff(
            slc_data0[ifg_inds[:, 0], :], slc_data0[ifg_inds[:, 1], :]
        )
        ifg_data1 = utils.phase_diff(
            slc_data1[ifg_inds[:, 0], :], slc_data1[ifg_inds[:, 1], :]
        )

        # Update gradient in place
        grad_space[:, link_slice] = utils.phase_diff(ifg_data0, ifg_data1)

Retrieve the link model for the workflow.

nepochs property

Number of points in the temporal network.

nifgs property

Number of links in the temporal network.

Number of links in the spatial network.

npoints property

Number of points in the spatial network.

settings property

Retrieve settings for the workflow.

__init__(solver_space, solver_time, settings, link_model=None)

Spatio-temporal unwrapping.

Parameters:

Name Type Description Default
solver_space MCFSolverInterface

MCF Solver interface for spatial graph connecting the stable points. Typically a Delaunaytriangulation.

required
solver_time MCFSolverInterface

MCF Solver interface for temporal graph usually representing interferograms in time-Bperp space. Typically a Delaunay triangulation.

required
settings SolverSettings

Settings to be used for setting up the solver like number of workers, link batch size etc.

required
model

Per-link model in time used to correct the gradients before unwrapping.

required
Source code in src/spurt/workflows/emcf/_solver.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(
    self,
    solver_space: MCFSolverInterface,
    solver_time: MCFSolverInterface,
    settings: SolverSettings,
    link_model: LinkModelInterface | None = None,
):
    """Spatio-temporal unwrapping.

    Parameters
    ----------
    solver_space: MCFSolverInterface
        MCF Solver interface for spatial graph connecting the stable points.
        Typically a Delaunaytriangulation.
    solver_time: MCFSolverInterface
        MCF Solver interface for temporal graph usually representing
        interferograms in time-Bperp space. Typically a Delaunay triangulation.
    settings: SolverSettings
        Settings to be used for setting up the solver like number of
        workers, link batch size etc.
    model: LinkModel | None
        Per-link model in time used to correct the gradients before
        unwrapping.
    """
    self._solver_space = solver_space
    self._solver_time = solver_time
    self._settings = settings
    self._link_model = link_model

    if link_model is not None:
        errmsg = "Not implemented yet."
        raise NotImplementedError(errmsg)

unwrap_cube(wrap_data)

Unwrap a 3D cube of data.

Parameters:

Name Type Description Default
wrap_data Irreg3DInput

2D array of size (nslc, npoints) or (nifg, npoints).

required

Returns:

Name Type Description
uw_data ndarray

2D float32 array of size (nifg, npoints).

Source code in src/spurt/workflows/emcf/_solver.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def unwrap_cube(self, wrap_data: Irreg3DInput) -> np.ndarray:
    """Unwrap a 3D cube of data.

    Parameters
    ----------
    wrap_data: np.ndarray
        2D array of size (nslc, npoints) or (nifg, npoints).

    Returns
    -------
    uw_data: np.ndarray
        2D float32 array of size (nifg, npoints).
    """
    if wrap_data.ndim != 2:
        errmsg = f"Input data is not a 2D array - {wrap_data.ndim}"
        raise ValueError(errmsg)

    if wrap_data.time_dim != 0:
        errmsg = "Time must be first dimension in input stack."
        raise NotImplementedError(errmsg)

    input_is_ifg: bool = False
    if wrap_data.data.shape[0] == self.nepochs:
        input_is_ifg = False
    elif wrap_data.data.shape[0] == self.nifgs:
        input_is_ifg = True
    else:
        errmsg = (
            f"Input size {wrap_data.data.shape[0]} does not match solver"
            f" for {self.nifgs} Ifgs from {self.nepochs} images"
        )
        raise ValueError(errmsg)

    # First unwrap in time to get spatial gradients
    grad_space: np.ndarray = self.unwrap_gradients_in_time(
        wrap_data.data, input_is_ifg=input_is_ifg
    )

    # Then unwrap spatial gradients
    return self.unwrap_gradients_in_space(grad_space)

unwrap_gradients_in_space(grad_space)

Spatially unwrap each interferogram sequentially.

Source code in src/spurt/workflows/emcf/_solver.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def unwrap_gradients_in_space(self, grad_space: np.ndarray) -> np.ndarray:
    """Spatially unwrap each interferogram sequentially."""
    # First set up spatial cost
    if self.settings.s_cost_type == "constant":
        cost = np.ones(self.nlinks, dtype=int)
    elif self.settings.s_cost_type == "distance":
        cost = utils.distance_costs(
            self._solver_space.points,
            self._solver_space.edges,
            scale=self.settings.s_cost_scale,
        )
    elif self.settings.s_cost_type == "centroid":
        cost = utils.centroid_costs(
            self._solver_space.points,
            self._solver_space.cycles,
            self._solver_space.dual_edges,
            scale=self.settings.s_cost_scale,
        )
    else:
        errmsg = f"Unknown cost type: {self.settings.s_cost_type}"
        raise ValueError(errmsg)

    # Create output array
    uw_data = np.zeros((self.nifgs, self.npoints), dtype=np.float32)

    logger.info(f"Spatial: Number of interferograms: {self.nifgs}")
    logger.info(f"Spatial: Number of links: {self.nlinks}")
    logger.info(f"Spatial: Number of cycles: {self._solver_space.ncycles}")

    nworkers = self.settings.s_worker_count
    if nworkers < 1:
        nworkers = get_cpu_count() - 1

    mp_context = mp.get_context("fork")
    with ProcessPoolExecutor(
        max_workers=nworkers, mp_context=mp_context
    ) as executor:
        futures = {
            executor.submit(
                _unwrap_ifg_in_space,
                grad_space[ii, :],
                self._solver_space,
                cost,
                ii,
            ): ii
            for ii in range(self.nifgs)
        }
        for fut in as_completed(futures):
            ii, data = fut.result()
            futures.pop(fut)
            uw_data[ii, :] = data
    return uw_data

unwrap_gradients_in_time(wrap_data, *, input_is_ifg)

Temporally unwrap links in parallel.

The output of this step is the temporally unwrapped phase gradients on each link of the spatial graph.

Source code in src/spurt/workflows/emcf/_solver.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def unwrap_gradients_in_time(
    self, wrap_data: np.ndarray, *, input_is_ifg: bool
) -> np.ndarray:
    """Temporally unwrap links in parallel.

    The output of this step is the temporally unwrapped phase gradients on
    each link of the spatial graph.
    """
    # First set up temporal cost
    if self.settings.t_cost_type == "constant":
        cost = np.ones(self.nifgs, dtype=int)
    elif self.settings.t_cost_type == "distance":
        cost = utils.distance_costs(
            self._solver_time.points,
            self._solver_time.edges,
            scale=self.settings.t_cost_scale,
        )
    elif self.settings.t_cost_type == "centroid":
        cost = utils.centroid_costs(
            self._solver_time.points,
            self._solver_time.cycles,
            self._solver_time.dual_edges,
            scale=self.settings.t_cost_scale,
        )
    else:
        errmsg = f"Unknown cost type: {self.settings.t_cost_type}"
        raise ValueError(errmsg)

    # Create output array
    grad_space: np.ndarray = np.zeros((self.nifgs, self.nlinks), dtype=np.float32)

    logger.info(f"Temporal: Number of interferograms: {self.nifgs}")
    logger.info(f"Temporal: Number of links: {self.nlinks}")
    logger.info(f"Temporal: Number of cycles: {self._solver_time.ncycles}")

    # Number of batches to process
    nbatches: int = ((self.nlinks - 1) // self.settings.links_per_batch) + 1

    # Iterate over batches
    for bb in range(nbatches):
        i_start = bb * self.settings.links_per_batch
        i_end = min(i_start + self.settings.links_per_batch, self.nlinks)
        links_in_batch = i_end - i_start
        if links_in_batch == 0:
            continue

        # Get indices of points forming links from spatial graph
        inds = self._solver_space.edges[i_start:i_end, :]

        # TODO: Incorporate link_model here when ready
        # Add self._modeled_phase_diff to replace phase_diff

        # Compute spatial gradients for each link
        # If input data is already interferograms
        if input_is_ifg:
            grad_space[:, i_start:i_end] = utils.phase_diff(
                wrap_data[:, inds[:, 0]], wrap_data[:, inds[:, 1]]
            )
        else:
            logger.info(f"Temporal: Preparing batch {bb + 1}/{nbatches}")
            self._ifg_spatial_gradients_from_slc(
                wrap_data, inds, grad_space, np.s_[i_start:i_end]
            )

        # Compute residues for each cycle in temporal graph
        # Easier to loop over interferograms here
        ncycles: int = len(self._solver_time.cycles)
        grad_sum: np.ndarray = np.zeros(
            (links_in_batch, ncycles + 1), dtype=np.float32
        )
        for ii in range(self.nifgs):
            # Cycles that ifg contributes to
            cyc = np.abs(self._solver_time.dual_edges[ii])
            cyc_dir = self._solver_time.dual_edge_dir[ii]
            grad_sum[:, cyc[0]] += cyc_dir[0] * grad_space[ii, i_start:i_end]
            if cyc[1] != 0:
                grad_sum[:, cyc[1]] += cyc_dir[1] * grad_space[ii, i_start:i_end]

        residues = np.rint(grad_sum / (2 * np.pi)).astype(int)
        # Set grounding node
        residues[:, 0] = -np.sum(residues[:, 1:], axis=1)

        # Unwrap the batch
        logger.info(f"Temporal: Unwrapping batch {bb + 1}/{nbatches}")
        flows = self._solver_time.residues_to_flows_many(
            residues,
            cost,
            worker_count=self.settings.t_worker_count,
            chunksize=50000,
        )

        # Update the spatial gradients with estimated flows
        grad_space[:, i_start:i_end] += 2 * np.pi * flows.T

    return grad_space

SolverSettings dataclass

Settings associated with Extended Minimum Cost Flow (EMCF) workflow.

Parameters:

Name Type Description Default
t_worker_count int

Number of workers for temporal unwrapping in parallel. Set value to <=0 to let workflow use default workers (ncpus - 1). Must be set when num_parallel_tiles > 1.

0
s_worker_count int

Number of workers for spatial unwrapping in parallel. Set value to <=0 to let workflow use (ncpus - 1). Must be set when num_parallel_tiles > 1.

1
links_per_batch int

Temporal unwrapping operations over spatial links are performed in batches and each batch is solved in parallel.

10000
t_cost_type str

Temporal unwrapping costs. Can be one of 'constant', 'distance', 'centroid'.

'constant'
t_cost_scale float

Scale factor used in computing edge costs for temporal unwrapping.

100.0
s_cost_type str

Spatial unwrapping costs. Can be one of 'constant', 'distance', 'centroid'.

'constant'
s_cost_scale float

Scale factor used in computing edge costs for spatial unwrapping.

100.0
num_parallel_tiles int

Number of tiles to process in parallel. Set to 0 for all tiles.

1
Source code in src/spurt/workflows/emcf/_settings.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@dataclass
class SolverSettings:
    """Settings associated with Extended Minimum Cost Flow (EMCF) workflow.

    Parameters
    ----------
    t_worker_count: int
        Number of workers for temporal unwrapping in parallel. Set value to <=0
        to let workflow use default workers (ncpus - 1). Must be set when
        num_parallel_tiles > 1.
    s_worker_count: int
        Number of workers for spatial unwrapping in parallel. Set value to <=0
        to let workflow use (ncpus - 1). Must be set when
        num_parallel_tiles > 1.
    links_per_batch: int
        Temporal unwrapping operations over spatial links are performed in batches
        and each batch is solved in parallel.
    t_cost_type: str
        Temporal unwrapping costs. Can be one of 'constant', 'distance',
        'centroid'.
    t_cost_scale: float
        Scale factor used in computing edge costs for temporal unwrapping.
    s_cost_type: str
        Spatial unwrapping costs. Can be one of 'constant', 'distance',
        'centroid'.
    s_cost_scale: float
        Scale factor used in computing edge costs for spatial unwrapping.
    num_parallel_tiles: int
        Number of tiles to process in parallel. Set to 0 for all tiles.
    """

    t_worker_count: int = 0
    s_worker_count: int = 1
    links_per_batch: int = 10000
    t_cost_type: str = "constant"
    t_cost_scale: float = 100.0
    s_cost_type: str = "constant"
    s_cost_scale: float = 100.0
    num_parallel_tiles: int = 1

    def __post_init__(self):
        cost_types = {"constant", "distance", "centroid"}
        if self.t_cost_type not in cost_types:
            errmsg = f"t_cost_type must be one of {cost_types}, got {self.t_cost_type}"
            raise ValueError(errmsg)
        if self.s_cost_type not in cost_types:
            errmsg = f"s_cost_type must be one of {cost_types}, got {self.s_cost_type}"
            raise ValueError(errmsg)
        if self.links_per_batch <= 0:
            errmsg = f"links_per_batch must be > 0, got {self.links_per_batch}"
            raise ValueError(errmsg)
        if self.t_cost_scale <= 0.0:
            errmsg = f"t_cost_scale must be > 0, got {self.t_cost_scale}"
            raise ValueError(errmsg)
        if self.s_cost_scale <= 0.0:
            errmsg = f"s_cost_scale must be > 0, got {self.s_cost_scale}"
            raise ValueError(errmsg)

        # num_parallel_tiles must be explicit - no system based defaults
        if self.num_parallel_tiles < 1:
            errmsg = f"num_parallel_tiles must be >= 1, got {self.num_parallel_tiles}"
            raise ValueError(errmsg)

        # if more than one tile being processed in parallel,
        # worker counts must be set explicitly
        if self.num_parallel_tiles > 1:
            if self.t_worker_count < 1:
                errmsg = (
                    "t_worker_count must be explicitly set"
                    " when processing tiles in parallel"
                )
                raise ValueError(errmsg)

            if self.s_worker_count < 1:
                errmsg = (
                    "s_worker_count must be explicitly set"
                    " when processing tiles in parallel"
                )
                raise ValueError(errmsg)

TilerSettings dataclass

Class for holding tile generation settings.

Parameters:

Name Type Description Default
target_points_per_tile int

Target points per tile when generating tiles.

800000
max_tiles int

Maximum number of tiles allowed.

16
target_points_for_generation int

Number of points used for determining tiles based on density. If input has a lot of points, tiling can take a really long time. We use this as a guide to downsample inputs to generate tile boundaries. The tile boundaries are then used with full set of inputs.

120000
dilation_factor float

Dilation factor of non-overlapping tiles. 0.05 would lead to 10 percent dilation of the tile.

0.05
Source code in src/spurt/workflows/emcf/_settings.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
@dataclass
class TilerSettings:
    """Class for holding tile generation settings.

    Parameters
    ----------
    target_points_per_tile: int
        Target points per tile when generating tiles.
    max_tiles: int
        Maximum number of tiles allowed.
    target_points_for_generation: int
        Number of points used for determining tiles based on density.
        If input has a lot of points, tiling can take a really long time.
        We use this as a guide to downsample inputs to generate tile
        boundaries. The tile boundaries are then used with full set of inputs.
    dilation_factor: float
        Dilation factor of non-overlapping tiles. 0.05 would lead to
        10 percent dilation of the tile.
    """

    target_points_per_tile: int = 800000
    max_tiles: int = 16
    target_points_for_generation: int = 120000
    dilation_factor: float = 0.05

    def __post_init__(self):
        if self.max_tiles < 1:
            errmsg = f"max_tiles must be at least 1, got {self.max_tiles}"
            raise ValueError(errmsg)
        if self.dilation_factor < 0.0:
            errmsg = f"dilation_factor must be >= 0., got {self.dilation_factor}"
            raise ValueError(errmsg)
        if self.target_points_for_generation <= 0:
            errmsg = (
                "target_points_for_generation must be > 0,"
                f" got {self.target_points_for_generation}"
            )
            raise ValueError(errmsg)
        if self.target_points_per_tile <= 0.0:
            errmsg = (
                f"target_points_per_tile must be > 0, got {self.target_points_per_tile}"
            )
            raise ValueError(errmsg)

compute_phasediff_deciles(gen_settings, mrg_settings)

Compute overlap phase difference stats and save to h5.

We compute histograms of phase difference between overlapping tiles. While we only use a constant bulk offset for reconciling differences, these histograms can be used for debugging and assessing quality of consistency betwen unwrapped results of individual tiles.

Source code in src/spurt/workflows/emcf/_overlap.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def compute_phasediff_deciles(
    gen_settings: GeneralSettings,
    mrg_settings: MergerSettings,
) -> None:
    """Compute overlap phase difference stats and save to h5.

    We compute histograms of phase difference between overlapping tiles.
    While we only use a constant bulk offset for reconciling differences, these
    histograms can be used for debugging and assessing quality of consistency
    betwen unwrapped results of individual tiles.
    """
    # Check if already processed
    if gen_settings.overlap_filename.is_file():
        logger.info("Overlap file already exists. Skipping ...")
        return

    # Load tile info
    tiledata = spurt.utils.TileSet.from_json(gen_settings.tiles_jsonname)

    # If single tile json - just return
    if tiledata.ntiles == 1:
        logger.info("Single tile used. Skipping overlaps ...")
        return

    t1 = -1
    t2 = -1
    ntiles = tiledata.ntiles
    conn_mat = np.zeros((ntiles, ntiles))
    overlaps = []
    overlap_file: str = str(gen_settings.overlap_filename)
    for pair in tiledata.get_overlaps():
        logger.info(f"Processing neighboring pair: {pair}")
        if pair[0] != t1:
            t1 = pair[0]
            tile1 = tiledata.tiles[t1]
            pt1, uw1 = _load_uw_tile(str(gen_settings.tile_filename(t1)), tile1)

        if pair[1] != t2:
            t2 = pair[1]
            tile2 = tiledata.tiles[t2]
            pt2, uw2 = _load_uw_tile(str(gen_settings.tile_filename(t2)), tile2)

        # Find common points
        c1, c2 = spurt.utils.merge.find_common_points(pt1, pt2)

        if len(c1) < mrg_settings.min_overlap_points:
            logger.info("Insufficient overlap. Skipping ...")
            continue

        # Track overlaps
        overlaps.append([t1, t2])
        conn_mat[t1, t2] = 1

        # difference stats
        cuw1 = uw1[:, c1]
        cuw2 = uw2[:, c2]
        stats = spurt.utils.merge.pairwise_unwrapped_diff_deciles(cuw1, cuw2)
        grpname = gen_settings.overlap_groupname(t1, t2)
        with h5py.File(overlap_file, "a") as fid:
            grp = fid.create_group(grpname)
            grp["c1"] = c1.astype(int)
            grp["c2"] = c2.astype(int)
            grp["stats"] = stats.astype(np.int16)

    # Identify connected components
    graph = csr_matrix(conn_mat)
    n_components, labels = connected_components(
        csgraph=graph, directed=False, return_labels=True
    )

    logger.info(f"Number of connected components: {n_components}")

    # Add list of overlaps at the end
    with h5py.File(overlap_file, "a") as fid:
        fid["conn_comp"] = labels
        fid["overlaps"] = np.array(overlaps, dtype=np.int16)

get_bulk_offsets(stack, gen_settings, mrg_settings)

Compute bulk phase offsets between overlapping tiles.

We compute the bulk offset used to adjust individually unwrapped tiles in the merge process. While we computed phase difference deciles, we only use the median phase difference here for adjusting tiles.

Source code in src/spurt/workflows/emcf/_bulk_offset.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def get_bulk_offsets(
    stack: spurt.io.SLCStackReader,
    gen_settings: GeneralSettings,
    mrg_settings: MergerSettings,
) -> None:
    """Compute bulk phase offsets between overlapping tiles.

    We compute the bulk offset used to adjust individually unwrapped tiles in
    the merge process. While we computed phase difference deciles, we only use
    the median phase difference here for adjusting tiles.
    """
    # Check if offsets already computed
    if gen_settings.offsets_filename.is_file():
        logger.info("Offsets file already exists. Skipping ...")
        return

    # Load tile info
    tiledata = spurt.utils.TileSet.from_json(gen_settings.tiles_jsonname)

    # Single tile processing
    if tiledata.ntiles == 1:
        logger.info("Single tile used. Skipping bulk offsets ...")
        return

    # Load offset data for inversion
    offsets = []
    olap_counts = []
    overlap_file = str(gen_settings.overlap_filename)
    with h5py.File(str(overlap_file), "r") as fid:
        labels: np.ndarray = fid["conn_comp"][...]
        overlaps: np.ndarray = fid["overlaps"][...]
        for pair in overlaps:
            t1, t2 = pair
            grp_name = gen_settings.overlap_groupname(t1, t2)
            grp = fid[grp_name]

            # Just use the median for now
            olap_counts.append(grp["c1"].shape[0])
            offsets.append(grp["stats"][:, 5])

    # Get number of bands to merge
    nbands = len(offsets[0])
    ntiles = tiledata.ntiles
    counts = np.zeros(ntiles)

    # Add counts of valid pixels in each tile
    arr = stack.read_temporal_coherence(np.s_[:, :]) > stack.temp_coh_threshold
    for ii, tile in enumerate(tiledata.tiles):
        counts[ii] = np.sum(arr[tile.space])

    # If integer solver requested
    logger.info(f"Solving for bulk offsets with method: {mrg_settings.method}")
    if mrg_settings.bulk_method == "integer":
        bulk_offset: np.ndarray = np.zeros((nbands, ntiles), dtype=np.int32)
        obj: np.ndarray = np.zeros(nbands, dtype=np.int32)

        # Solve band by band
        for ii in range(nbands):
            logger.info(f"Computing bulk offsets for band {ii + 1}")
            off: np.ndarray = np.array([x[ii] for x in offsets])
            bulk_offset[ii, :], obj[ii] = _solve_int_offsets(
                overlaps, labels, off, counts
            )
    # Use L2 method
    elif mrg_settings.bulk_method == "L2":
        # These can be floating point
        off = np.zeros((len(overlaps), nbands))
        for ii in range(len(overlaps)):
            off[ii, :] = offsets[ii]

        bulk_offset, obj = _solve_l2_min(overlaps, off, ntiles, counts)

    else:
        errmsg = f"Unsupported bulk offset method {mrg_settings.bulk_method}"
        raise RuntimeError(errmsg)

    # Write HDF5 file with bulk offsets
    with h5py.File(str(gen_settings.offsets_filename), "w") as fid:
        grp = fid.create_group(mrg_settings.bulk_method)
        grp["offsets"] = bulk_offset
        grp["residuals"] = obj

get_tiles(stack, gen_settings, tile_settings)

Generate tiles based on settings.

Create a JSON file with tile information in the intermediate folder. Tiles are not regenerated if the JSON file is already present.

Source code in src/spurt/workflows/emcf/_tiling.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def get_tiles(
    stack: spurt.io.SLCStackReader,
    gen_settings: GeneralSettings,
    tile_settings: TilerSettings,
) -> None:
    """Generate tiles based on settings.

    Create a JSON file with tile information in the intermediate folder.
    Tiles are not regenerated if the JSON file is already present.
    """
    json_file = gen_settings.tiles_jsonname

    # Generate tiles if file doesn't exist
    if json_file.is_file():
        logger.info(f"Using existing tiles file: {json_file!s}")
        return

    # Actual tile generation
    logger.info("Generating tiles for stack")
    arr = stack.read_temporal_coherence(np.s_[:, :]) > stack.temp_coh_threshold
    logger.info(f"Stack shape for generating tiles: {arr.shape}")

    # No tiles requested
    if not gen_settings.use_tiles:
        tileset = spurt.utils.TileSet.single_tile(arr.shape)

    # Generate tiles and write
    else:
        # Get points
        pts = np.column_stack(np.nonzero(arr))
        logger.info(f"Number of points: {pts.shape[0]}")
        logger.info(f"Fraction good:  {pts.shape[0] / arr.size:.3f}")

        # Skip points as needed for tile generation
        skip = max(1, int(len(pts) / tile_settings.target_points_for_generation))
        logger.info(f"Skipping {skip} pixels for tile generation.")

        # Determine max tiles
        ntiles = int(np.rint(np.sqrt(len(pts) / tile_settings.target_points_per_tile)))
        ntiles = min(max(1, ntiles * ntiles), tile_settings.max_tiles)
        logger.info(f"Generating {ntiles} tiles.")

        # Set up tiles
        tileset = spurt.utils.create_tiles_density(
            pts[::skip, :], shape=arr.shape, max_tiles=ntiles
        )

    # Dilate tiles to create overlaps
    if (tileset.ntiles > 1) and (tile_settings.dilation_factor > 0.0):
        tileset = tileset.dilate(tile_settings.dilation_factor)

    # Write tiles to json file
    logger.info(f"Writing tiles to: {json_file!s}")
    tileset.to_json(json_file)

merge_tiles(stack, g_time, gen_settings, mrg_settings)

Merge the different tiles.

Source code in src/spurt/workflows/emcf/_merge.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def merge_tiles(
    stack: spurt.io.SLCStackReader,
    g_time: spurt.graph.GraphInterface,
    gen_settings: GeneralSettings,
    mrg_settings: MergerSettings,
) -> list[Path]:
    """Merge the different tiles."""
    if mrg_settings.method != "dirichlet":
        errmsg = "dirichlet is the only merge method supported."
        raise NotImplementedError(errmsg)

    # Load tile json
    tiledata = spurt.utils.TileSet.from_json(gen_settings.tiles_jsonname)

    # Tile manager for each tile
    tiles: dict[int, Tile] = {}
    for ii in range(tiledata.ntiles):
        # Use band 0 as place holder
        tiles[ii] = Tile(str(gen_settings.tile_filename(ii)))

    # Preparing file names
    ifgs = g_time.links
    dates = stack.dates
    fnames: list[Path] = []
    for ifg in ifgs:
        fnames.append(gen_settings.unw_filename(dates[ifg[0]], dates[ifg[1]]))

    like_slc_file = stack.slc_files[dates[-1]]
    # If we only have to write one tile
    # Nothing to merge - just write to geotiff
    if len(tiles) == 1:
        logger.info(f"Writing single tile output to {gen_settings.output_folder}")
        write_single_tile(tiles[0], fnames, tiledata.shape, like=like_slc_file)
        return fnames

    # Create overlap map for the graph
    overlap_map = _get_overlap_map(gen_settings)
    max_degree = _get_max_degree(tiles, tiledata.shape)
    logger.info(f"Maximum degree for Dirichlet iterations: {max_degree}")

    # Read bulk offsets from hdf5 file
    with h5py.File(str(gen_settings.offsets_filename), "r") as fid:
        grp = fid[mrg_settings.bulk_method]
        bulk_offsets = grp["offsets"][...]

    # Write batch-by-batch
    nifgs = len(fnames)

    batch_size = min(mrg_settings.num_parallel_ifgs, nifgs)
    if batch_size < 1:
        batch_size = nifgs
    logger.info(f"Merging batches of {batch_size} interferograms")

    batch_start = np.arange(0, nifgs, batch_size, dtype=int)

    for bnum, bstart in enumerate(batch_start):
        bend = min(bstart + batch_size, nifgs)
        if (bend - bstart) <= 0:
            continue

        # Process bstart to bend interferograms
        if batch_size > 1:
            logger.info(f"Merging batch {bnum + 1} from {bstart + 1} to {bend}")

        # Check if batch already processed
        idx = []
        for ii in range(bstart, bend):
            fname = fnames[ii]
            if fname.is_file():
                logger.info(
                    f"{fname!s} already exists. Skipping merging band {ii + 1}."
                )
            else:
                idx.append(ii)

        if not idx:
            logger.info(f"Batch {bnum + 1} already processed. Skipping ..")
            continue

        # Initialize each tile for the right band with bulk offset
        for jj, tile in tiles.items():
            tile.reset_band_index(idx)
            tile.add_correction(np.float32(-2 * np.pi * bulk_offsets[idx, jj]))
            tile.increment_correction()

        # Adjust the tiles
        _adjust_tiles(tiles, overlap_map, gen_settings, max_degree, debug_stats=False)

        # Write file to output band-by-band
        for ii in idx:
            fname = fnames[ii]
            write_merged_band(tiles, fname, ii, tiledata.shape, like=like_slc_file)

    return fnames

unwrap_tiles(stack, g_time, gen_settings, solv_settings)

Unwrap each tile and save to h5.

Source code in src/spurt/workflows/emcf/_unwrap.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def unwrap_tiles(
    stack: spurt.io.SLCStackReader,
    g_time: spurt.graph.PlanarGraphInterface,
    gen_settings: GeneralSettings,
    solv_settings: SolverSettings,
) -> None:
    """Unwrap each tile and save to h5."""
    # Load tile set
    tile_json = gen_settings.tiles_jsonname
    tiledata = spurt.utils.TileSet.from_json(tile_json)

    mp_context = mp.get_context("fork")
    with ProcessPoolExecutor(
        max_workers=solv_settings.num_parallel_tiles, mp_context=mp_context
    ) as executor:
        futures = {}

        # Iterate over tiles
        for tt in range(tiledata.ntiles):
            tfname = str(gen_settings.tile_filename(tt))
            if Path(tfname).is_file():
                logger.info(f"Tile {tt+1} already processed. Skipping...")
                continue

            futures[
                executor.submit(
                    _unwrap_one_tile,
                    stack,
                    tile_json,
                    tfname,
                    g_time,
                    solv_settings,
                    tt,
                )
            ] = tt

        for fut in as_completed(futures):
            fut.result()
            futures.pop(fut)