Skip to content

Index

BBox dataclass

Utility class for managing tile bounds.

We follow shapely convention to store the bounds. The bounds should also be interpreted similar to python's indexing - i.e, left edge inclusive. See https://shapely.readthedocs.io/en/stable/manual.html#object.bounds for details on shapely convention.

Source code in src/spurt/utils/_tiling.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@dataclass
class BBox:
    """Utility class for managing tile bounds.

    We follow shapely convention to store the bounds. The bounds should
    also be interpreted similar to python's indexing - i.e, left edge
    inclusive. See https://shapely.readthedocs.io/en/stable/manual.html#object.bounds
    for details on shapely convention.
    """

    xmin: int
    ymin: int
    xmax: int
    ymax: int

    @classmethod
    def from_shapely_bounds(cls, bounds: Sequence[int]):
        """Create using any sequence in shapely convention."""
        if len(bounds) != 4:
            errmsg = f"bounds array must have size == 4, got {len(bounds)}"
            raise ValueError(errmsg)
        return cls(*bounds)

    def intersects(self, box: BBox) -> bool:
        """Check if two boxes intersect."""
        return not (
            (self.xmax < box.xmin)
            or (self.xmin > box.xmax)
            or (self.ymin > box.ymax)
            or (self.ymax < box.ymin)
        )

    def tolist(self) -> list[int]:
        """List of integers in shapely convention."""
        return [self.xmin, self.ymin, self.xmax, self.ymax]

    @property
    def space(self) -> tuple[slice, slice]:
        """Slice notation for use with NumPy arrays."""
        return (slice(self.xmin, self.xmax), slice(self.ymin, self.ymax))

    @property
    def count(self) -> int:
        """Number of pixels in tile."""
        return (self.xmax - self.xmin) * (self.ymax - self.ymin)

count property

Number of pixels in tile.

space property

Slice notation for use with NumPy arrays.

from_shapely_bounds(bounds) classmethod

Create using any sequence in shapely convention.

Source code in src/spurt/utils/_tiling.py
34
35
36
37
38
39
40
@classmethod
def from_shapely_bounds(cls, bounds: Sequence[int]):
    """Create using any sequence in shapely convention."""
    if len(bounds) != 4:
        errmsg = f"bounds array must have size == 4, got {len(bounds)}"
        raise ValueError(errmsg)
    return cls(*bounds)

intersects(box)

Check if two boxes intersect.

Source code in src/spurt/utils/_tiling.py
42
43
44
45
46
47
48
49
def intersects(self, box: BBox) -> bool:
    """Check if two boxes intersect."""
    return not (
        (self.xmax < box.xmin)
        or (self.xmin > box.xmax)
        or (self.ymin > box.ymax)
        or (self.ymax < box.ymin)
    )

tolist()

List of integers in shapely convention.

Source code in src/spurt/utils/_tiling.py
51
52
53
def tolist(self) -> list[int]:
    """List of integers in shapely convention."""
    return [self.xmin, self.ymin, self.xmax, self.ymax]

TileSet

Utility class for managing collection of tiles.

Source code in src/spurt/utils/_tiling.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class TileSet:
    """Utility class for managing collection of tiles."""

    def __init__(
        self,
        shape: tuple[int, int],
        tiles: list[BBox],
    ):
        self._shape: tuple[int, int] = shape
        self._tiles: list[BBox] = tiles

    @property
    def shape(self) -> tuple[int, int]:
        return self._shape

    @property
    def tiles(self) -> list[BBox]:
        return self._tiles

    @property
    def ntiles(self) -> int:
        return len(self._tiles)

    @classmethod
    def from_json(cls, fname: Path) -> TileSet:
        """Load tiles from a JSON file."""
        with fname.open(mode="r") as fid:
            jdict = json.load(fid)

        shape = jdict["shape"]
        tiles: list[BBox] = []
        for tt in jdict["tiles"]:
            tiles.append(BBox.from_shapely_bounds(tt["bounds"]))

        return cls(shape, tiles)

    def to_json(self, fname: Path) -> None:
        """Write tiles to a JSON file."""
        tiles = []
        for tt in self.tiles:
            tiles.append({"bounds": tt.tolist()})

        jdict: dict = {
            "shape": self.shape,
            "tiles": tiles,
        }
        with fname.open(mode="w") as fid:
            fid.write(json.dumps(jdict, indent=4))

    @classmethod
    def single_tile(cls, shape: tuple[int, int]) -> TileSet:
        """Return tileset with single tile corresponding to shape."""
        return cls(shape, [BBox.from_shapely_bounds((0, 0, shape[0], shape[1]))])

    def get_overlaps(self) -> list[tuple[int, int]]:
        """Return list of pairs of overlapping tiles."""
        olaps: list[tuple[int, int]] = []

        ntiles = self.ntiles
        for ii in range(ntiles - 1):
            box1 = self._tiles[ii]
            for jj in range(ii + 1, ntiles):
                if box1.intersects(self._tiles[jj]):
                    olaps.append((ii, jj))

        return olaps

    def dilate(self, factor: float) -> TileSet:
        """Dilate current tile set."""
        tiles: list[BBox] = []
        shape = self.shape

        # Iterate over and dilate rectangles
        for tt in self.tiles:
            rdiff = tt.xmax - tt.xmin
            r0 = int(max(0, tt.xmin - factor * rdiff))
            r1 = int(min(shape[0], tt.xmax + factor * rdiff))
            cdiff = tt.ymax - tt.ymin
            c0 = int(max(0, tt.ymin - factor * cdiff))
            c1 = int(min(shape[1], tt.ymax + factor * cdiff))

            tiles.append(BBox(r0, c0, r1, c1))

        return TileSet(shape, tiles)

dilate(factor)

Dilate current tile set.

Source code in src/spurt/utils/_tiling.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def dilate(self, factor: float) -> TileSet:
    """Dilate current tile set."""
    tiles: list[BBox] = []
    shape = self.shape

    # Iterate over and dilate rectangles
    for tt in self.tiles:
        rdiff = tt.xmax - tt.xmin
        r0 = int(max(0, tt.xmin - factor * rdiff))
        r1 = int(min(shape[0], tt.xmax + factor * rdiff))
        cdiff = tt.ymax - tt.ymin
        c0 = int(max(0, tt.ymin - factor * cdiff))
        c1 = int(min(shape[1], tt.ymax + factor * cdiff))

        tiles.append(BBox(r0, c0, r1, c1))

    return TileSet(shape, tiles)

from_json(fname) classmethod

Load tiles from a JSON file.

Source code in src/spurt/utils/_tiling.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@classmethod
def from_json(cls, fname: Path) -> TileSet:
    """Load tiles from a JSON file."""
    with fname.open(mode="r") as fid:
        jdict = json.load(fid)

    shape = jdict["shape"]
    tiles: list[BBox] = []
    for tt in jdict["tiles"]:
        tiles.append(BBox.from_shapely_bounds(tt["bounds"]))

    return cls(shape, tiles)

get_overlaps()

Return list of pairs of overlapping tiles.

Source code in src/spurt/utils/_tiling.py
120
121
122
123
124
125
126
127
128
129
130
131
def get_overlaps(self) -> list[tuple[int, int]]:
    """Return list of pairs of overlapping tiles."""
    olaps: list[tuple[int, int]] = []

    ntiles = self.ntiles
    for ii in range(ntiles - 1):
        box1 = self._tiles[ii]
        for jj in range(ii + 1, ntiles):
            if box1.intersects(self._tiles[jj]):
                olaps.append((ii, jj))

    return olaps

single_tile(shape) classmethod

Return tileset with single tile corresponding to shape.

Source code in src/spurt/utils/_tiling.py
115
116
117
118
@classmethod
def single_tile(cls, shape: tuple[int, int]) -> TileSet:
    """Return tileset with single tile corresponding to shape."""
    return cls(shape, [BBox.from_shapely_bounds((0, 0, shape[0], shape[1]))])

to_json(fname)

Write tiles to a JSON file.

Source code in src/spurt/utils/_tiling.py
102
103
104
105
106
107
108
109
110
111
112
113
def to_json(self, fname: Path) -> None:
    """Write tiles to a JSON file."""
    tiles = []
    for tt in self.tiles:
        tiles.append({"bounds": tt.tolist()})

    jdict: dict = {
        "shape": self.shape,
        "tiles": tiles,
    }
    with fname.open(mode="w") as fid:
        fid.write(json.dumps(jdict, indent=4))

create_tiles_density(points, shape, max_tiles)

Tile set with non-overlapping tiles based on density of points.

This is based on the nice implementation found here: https://mathoverflow.net/questions/412127/partitioning-unit-square-with-equal-frequency-rectangles

Parameters:

Name Type Description Default
points ndarray

2D array with point coordinates within the shape.

required
shape tuple[int, int]

Shape of rectangle to tile up.

required
max_tiles int

Maximum number of tiles. Actual number of tiles is perfect square '<= max_tiles'.

required
Source code in src/spurt/utils/_tiling.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def create_tiles_density(
    points: np.ndarray, shape: tuple[int, int], max_tiles: int
) -> TileSet:
    """Tile set with non-overlapping tiles based on density of points.

    This is based on the nice implementation found here:
    https://mathoverflow.net/questions/412127/partitioning-unit-square-with-equal-frequency-rectangles

    Parameters
    ----------
    points: np.ndarray
        2D array with point coordinates within the shape.
    shape: tuple[int, int]
        Shape of rectangle to tile up.
    max_tiles: int
        Maximum number of tiles.
        Actual number of tiles is perfect square '<= max_tiles'.
    """
    if (points.ndim != 2) or (points.shape[1] != 2):
        errmsg = f"Point coordinates must be a Nx2 array. Got {points.shape}"
        raise ValueError(errmsg)

    if (shape[0] <= 0) and (shape[1] <= 0):
        errmsg = f"Invalid shape provided to tiler: {shape}"
        raise ValueError(errmsg)

    if max_tiles <= 0:
        errmsg = f"Maximum tiles should at least be 1. Got {max_tiles}"
        raise ValueError(errmsg)

    # Compute tiles per dim
    tiles_per_dim: int = int(np.sqrt(max_tiles))
    max_tiles = tiles_per_dim * tiles_per_dim

    # If only 1 tile was requested
    if max_tiles == 1:
        return TileSet.single_tile(shape)

    bounds = ((0, shape[0]), (0, shape[1]))
    splits = split_rectangle(points, bounds, max_tiles)
    tiles = []

    for s in splits:
        # Reorder indices
        t = [s[0][0], s[1][0], s[0][1], s[1][1]]
        r0 = int(max(0, t[0]))
        r1 = int(min(shape[0], t[2]))
        c0 = int(max(0, t[1]))
        c1 = int(min(shape[1], t[3]))

        tiles.append(BBox.from_shapely_bounds([r0, c0, r1, c1]))

    return TileSet(shape, tiles)

create_tiles_regular(shape, max_tiles)

Tile set with approximately regularly sized non-overlapping tiles.

Parameters:

Name Type Description Default
shape tuple[int, int]

Shape of rectangle to tile up.

required
max_tiles int

Maximum number of tiles. Actual number of tiles is perfect square '<= max_tiles'.

required
Source code in src/spurt/utils/_tiling.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def create_tiles_regular(shape: tuple[int, int], max_tiles: int) -> TileSet:
    """Tile set with approximately regularly sized non-overlapping tiles.

    Parameters
    ----------
    shape: tuple[int, int]
        Shape of rectangle to tile up.
    max_tiles: int
        Maximum number of tiles.
        Actual number of tiles is perfect square '<= max_tiles'.
    """
    if (shape[0] <= 0) and (shape[1] <= 0):
        errmsg = f"Invalid shape provided to tiler: {shape}"
        raise ValueError(errmsg)

    if max_tiles <= 0:
        errmsg = f"Maximum tiles should at least be 1. Got {max_tiles}"
        raise ValueError(errmsg)

    # Compute tiles per dimension
    tiles_per_dim: int = int(np.sqrt(max_tiles))

    # If only 1 tile was requested
    if tiles_per_dim == 1:
        return TileSet.single_tile(shape)

    # Generate tile extents
    rows = np.ogrid[0 : shape[0] : 1j * (tiles_per_dim + 1)]  # type: ignore[misc]
    cols = np.ogrid[0 : shape[1] : 1j * (tiles_per_dim + 1)]  # type: ignore[misc]
    tiles: list[BBox] = []

    # Counter for rows
    for ii in range(tiles_per_dim):
        r0 = int(rows[ii])
        r1 = int(rows[ii + 1])
        # Counter for columns
        for jj in range(tiles_per_dim):
            c0 = int(cols[jj])
            c1 = int(cols[jj + 1])
            tiles.append(BBox.from_shapely_bounds((r0, c0, r1, c1)))

    return TileSet(shape, tiles)

get_cpu_count()

Get the number of CPUs available to the current process.

This function accounts for the possibility of a Docker container with limited CPU resources on a larger machine (which is ignored by multiprocessing.cpu_count()). This is derived from isce-framework/dolphin.

Returns:

Type Description
int

The number of CPUs available to the current process.

Source code in src/spurt/utils/_cpu.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def get_cpu_count() -> int:
    """Get the number of CPUs available to the current process.

    This function accounts for the possibility of a Docker container with
    limited CPU resources on a larger machine (which is ignored by
    `multiprocessing.cpu_count()`). This is derived from
    isce-framework/dolphin.

    Returns
    -------
    int
        The number of CPUs available to the current process.
    """

    def get_cpu_quota() -> int:
        return int(Path("/sys/fs/cgroup/cpu/cpu.cfs_quota_us").read_text())

    def get_cpu_period() -> int:
        return int(Path("/sys/fs/cgroup/cpu/cpu.cfs_period_us").read_text())

    try:
        cfs_quota_us = get_cpu_quota()
        cfs_period_us = get_cpu_period()
        if cfs_quota_us > 0 and cfs_period_us > 0:
            return math.ceil(cfs_quota_us / cfs_period_us)
    except Exception:
        pass
    return cpu_count()