Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F118235325
geo_util.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, Jun 18, 15:46
Size
10 KB
Mime Type
text/x-python
Expires
Fri, Jun 20, 15:46 (2 d)
Engine
blob
Format
Raw Data
Handle
26854507
Attached To
R10679 hybrid_potential
geo_util.py
View Options
import
fiona
import
geopandas
as
gpd
import
pandas
as
pd
from
shapely.geometry.polygon
import
Polygon
from
shapely.geometry.multipolygon
import
MultiPolygon
from
sklearn.neighbors
import
KNeighborsRegressor
from
sklearn.ensemble
import
RandomForestRegressor
import
numpy
as
np
import
itertools
import
matplotlib
from
scipy.spatial.distance
import
cdist
import
csv
import
time
LV03
=
'epsg:21781'
LV95
=
'epsg:2056'
def
ceil_to_interval
(
x
,
round_int
):
return
np
.
ceil
(
x
/
round_int
)
*
round_int
def
floor_to_interval
(
x
,
round_int
):
return
np
.
floor
(
x
/
round_int
)
*
round_int
def
round_to_interval
(
x
,
round_int
):
return
np
.
round
(
x
/
round_int
)
*
round_int
def
write_row_to_csv
(
filename
,
content_vector
,
initialize
=
False
,
delimiter
=
','
):
# writes a row to a csv file
if
initialize
:
open_mode
=
'w'
else
:
open_mode
=
'a'
with
open
(
filename
,
open_mode
)
as
csvfile
:
w
=
csv
.
writer
(
csvfile
,
delimiter
=
delimiter
)
w
.
writerow
(
content_vector
)
def
load_from_file
(
filepath
,
layer_name
,
batch_id
=
0
,
batch_size
=
None
):
# load a given batch of lines from file and pre-process the data
# returns geopandas dataframe with polygons and database information
# open file with fiona
file
=
fiona
.
open
(
filepath
,
layer
=
layer_name
)
# load selected rows from file
if
batch_size
is
None
:
start
=
0
end
=
len
(
file
)
else
:
start
=
batch_id
*
batch_size
end
=
min
(
(
batch_id
+
1
)
*
batch_size
,
len
(
file
)
)
data
=
gpd
.
GeoDataFrame
.
from_features
(
file
[
start
:
end
])
if
len
(
data
)
==
0
:
raise
ValueError
(
'Rooftop dataframe is empty - no data loaded from file.'
)
return
data
def
load_file
(
fp
,
columns
=
None
,
rows
=
None
,
crs
=
None
,
bbox
=
None
,
**
kwargs
):
if
columns
is
not
None
:
all_cols
=
gpd
.
read_file
(
fp
,
rows
=
0
,
**
kwargs
)
.
columns
ignore_cols
=
list
(
all_cols
[
np
.
logical_not
(
all_cols
.
isin
(
columns
))])
else
:
ignore_cols
=
None
return
gpd
.
read_file
(
fp
,
ignore_fields
=
ignore_cols
,
rows
=
rows
,
bbox
=
bbox
,
**
kwargs
)
def
spatially_interpolate
(
dataIn
,
coordsOut
,
interpVar
,
coordNamesIn
=
[
'x'
,
'y'
],
weights
=
'distance'
):
# Interpolate a variable according to its coordinates in 2D
interpolator
=
KNeighborsRegressor
(
n_neighbors
=
4
,
weights
=
weights
)
interpolator
.
fit
(
dataIn
[
coordNamesIn
],
dataIn
[
interpVar
])
return
interpolator
.
predict
(
coordsOut
)
def
spatially_interpolate_RF
(
dataIn
,
coordsOut
,
interpVar
,
coordNamesIn
=
[
'x'
,
'y'
],
n_est
=
200
):
# Interpolate a variable according to its coordinates in the coordNamesIn
tt
=
time
.
time
()
interpolator
=
RandomForestRegressor
(
n_estimators
=
n_est
,
n_jobs
=
-
1
)
interpolator
.
fit
(
dataIn
[
coordNamesIn
],
dataIn
[
interpVar
])
print
(
'Fitted estimator in
%.2f
s'
%
(
time
.
time
()
-
tt
))
return
interpolator
.
predict
(
coordsOut
)
def
map_coords_to_pixel
(
coords
,
offset
,
pixel_res
,
decimals
=
1
):
# offset should be either scalar or of type np.array()
tmp_coords
=
coords
-
offset
tmp_coords_rounded
=
round_to_interval
(
tmp_coords
,
pixel_res
)
return
(
tmp_coords_rounded
+
offset
)
.
round
(
decimals
)
def
select_by_coords
(
gdf
,
X0
,
X1
,
Y0
,
Y1
):
bounds
=
gdf
.
bounds
return
gdf
[(
bounds
.
minx
>=
X0
)
&
(
bounds
.
maxx
<=
X1
)
&
(
bounds
.
miny
>=
Y0
)
&
(
bounds
.
maxy
<=
Y1
)]
def
coords_to_point
(
df
,
coord_names
=
[
'x'
,
'y'
]
):
xy
=
df
[
coord_names
]
.
values
return
gpd
.
GeoDataFrame
(
df
.
copy
(),
geometry
=
gpd
.
points_from_xy
(
xy
[:,
0
],
xy
[:,
1
]))
def
create_point_grid
(
delta
,
minx
,
maxx
,
miny
,
maxy
,
NX
=
None
,
NY
=
None
,
coord_names
=
[
'x'
,
'y'
]):
# compute the number of required points and get the vectors for x and y
if
NX
is
None
:
NX
=
(
maxx
-
minx
)
/
delta
+
1
x
=
np
.
linspace
(
minx
,
maxx
,
int
(
NX
))
if
NY
is
None
:
NY
=
(
maxy
-
miny
)
/
delta
+
1
y
=
np
.
linspace
(
miny
,
maxy
,
int
(
NY
))
# get all combinations of x and y and create a dataframe
xy
=
np
.
asarray
(
list
(
itertools
.
product
(
x
,
y
)))
return
gpd
.
GeoDataFrame
(
xy
,
columns
=
coord_names
,
geometry
=
gpd
.
points_from_xy
(
xy
[:,
0
],
xy
[:,
1
]))
def
create_grid_from_centers
(
coords
,
pixel_res
,
coord_names
=
[
'x'
,
'y'
]):
# loop through all polygons and append coordinates of each corner (generate "fishnet")
polygons
=
[]
for
idx
,
data
in
coords
.
iterrows
():
Xleft
=
data
[
coord_names
[
0
]
]
-
0.5
*
pixel_res
Xright
=
data
[
coord_names
[
0
]
]
+
0.5
*
pixel_res
Ytop
=
data
[
coord_names
[
1
]
]
+
0.5
*
pixel_res
Ybottom
=
data
[
coord_names
[
1
]
]
-
0.5
*
pixel_res
polygons
.
append
(
Polygon
([(
Xleft
,
Ytop
),
(
Xright
,
Ytop
),
(
Xright
,
Ybottom
),
(
Xleft
,
Ybottom
)]))
return
gpd
.
GeoDataFrame
({
'geometry'
:
polygons
,
coord_names
[
0
]
:
coords
[
coord_names
[
0
]
],
coord_names
[
1
]
:
coords
[
coord_names
[
1
]
],})
def
create_grid_from_corners
(
coords
,
coord_names
=
[
'x'
,
'y'
]):
# loop through all polygons and append coordinates of each corner (generate "fishnet")
polygons
=
[]
X
=
np
.
sort
(
coords
[
coord_names
[
0
]]
.
unique
())
Y
=
np
.
sort
(
coords
[
coord_names
[
1
]]
.
unique
())
Xleft
=
X
[:
-
1
]
Xright
=
X
[
1
:]
Ybottom
=
Y
[:
-
1
]
Ytop
=
Y
[
1
:]
NX
=
len
(
X
)
-
1
NY
=
len
(
Y
)
-
1
for
y
in
range
(
NY
):
for
x
in
range
(
NX
):
xleft
=
Xleft
[
x
]
xright
=
Xright
[
x
]
ybottom
=
Ybottom
[
y
]
ytop
=
Ytop
[
y
]
polygons
.
append
(
Polygon
([(
xleft
,
ytop
),
(
xright
,
ytop
),
(
xright
,
ybottom
),
(
xleft
,
ybottom
)]))
return
gpd
.
GeoDataFrame
({
'geometry'
:
polygons
})
def
explode_multipolygons
(
indf
,
ID_NAME
=
'index'
):
# CODE FROM: https://gist.github.com/mhweber/cf36bb4e09df9deee5eb54dc6be74d26
workdf
=
indf
.
copy
()
workdf
[
ID_NAME
]
=
indf
.
index
outdf
=
gpd
.
GeoDataFrame
(
columns
=
workdf
.
columns
)
for
idx
,
row
in
workdf
.
iterrows
():
if
type
(
row
.
geometry
)
==
Polygon
:
outdf
=
outdf
.
append
(
row
,
ignore_index
=
True
)
if
type
(
row
.
geometry
)
==
MultiPolygon
:
geomdf
=
gpd
.
GeoDataFrame
(
pd
.
DataFrame
(
list
(
row
.
geometry
),
columns
=
[
'geometry'
])
)
geomdf
[
ID_NAME
]
=
idx
multdf
=
geomdf
.
merge
(
workdf
.
drop
(
columns
=
[
'geometry'
]),
on
=
ID_NAME
,
how
=
'left'
)
outdf
=
outdf
.
append
(
multdf
,
ignore_index
=
True
)
return
outdf
[
workdf
.
columns
]
# put columns in the right order
def
extract_geom_from_collection
(
df
,
keep_geom
=
[
'Polygon'
,
'MultiPolygon'
]):
# Extracts geometries from a geopandas dataframe, by looping through the dataframe and its geometry collections
# To obtain geometry collections, set keep_geom_type = False in overlay function
good_geoms
=
df
[
df
.
geometry
.
geom_type
.
isin
(
keep_geom
)
]
collections
=
df
[
df
.
geometry
.
geom_type
==
'GeometryCollection'
]
collected_geoms
=
[]
for
idx
,
data
in
collections
.
iterrows
():
for
item
in
data
.
geometry
:
if
item
.
geom_type
in
keep_geom
:
data
[
'geometry'
]
=
item
collected_geoms
.
append
(
data
)
return
pd
.
concat
([
good_geoms
,
gpd
.
GeoDataFrame
(
collected_geoms
)])
def
overlay_and_clean
(
df1
,
df2
,
keep_geom
=
[
'Polygon'
,
'MultiPolygon'
],
how
=
'intersection'
,
tolerance
=
None
):
# Performs overlay operation with all geometries, and extracts only the keep_geoms, including those from collections
tt
=
time
.
time
()
overlay
=
gpd
.
overlay
(
df1
,
df2
,
how
=
how
,
keep_geom_type
=
False
)
print
(
'Overlaid dataframes in
%.2f
s'
%
(
time
.
time
()
-
tt
))
tt
=
time
.
time
()
cleaned_df
=
extract_geom_from_collection
(
overlay
,
keep_geom
=
keep_geom
)
if
tolerance
is
not
None
:
cleaned_df
=
cleaned_df
[
cleaned_df
.
area
>
tolerance
]
print
(
'Cleaned geometries in
%.2f
s'
%
(
time
.
time
()
-
tt
))
return
cleaned_df
def
fill_na_by_closest
(
df
,
nan_field
,
coords
=
[
'x'
,
'y'
]):
if
type
(
nan_field
)
is
str
:
nan_field
=
[
nan_field
]
# split the data in two subsets: the one with the nans and the ones with existing data
remaining_nan
=
df
[
(
df
[
nan_field
]
.
isna
()
.
all
(
axis
=
1
))
&
np
.
logical_not
(
df
[
coords
]
.
isna
()
.
all
(
axis
=
1
))
]
matched_data
=
df
.
dropna
(
subset
=
nan_field
)
idx_closest_coord
=
[]
tt
=
time
.
time
()
for
idx
,
remaining_val
in
remaining_nan
.
iterrows
():
# Compute the distance matrix and round to allow for easier matching
dist_matrix
=
cdist
(
remaining_val
[
coords
]
.
values
.
reshape
((
1
,
len
(
coords
))),
matched_data
[
coords
])
# Get the iloc index in the matched_data df for the point closest to the remaining_nan
idx_closest_coord
.
append
(
dist_matrix
.
argmin
()
)
print
(
'Filled NaN values in
%.2f
s'
%
(
time
.
time
()
-
tt
))
# Add the restriction code of the closest point (by iloc index) to the remaining nan df
for
field_name
in
nan_field
:
remaining_nan
[
field_name
]
=
matched_data
.
iloc
[
idx_closest_coord
][
field_name
]
.
values
# replace the respective columns in the joint dataframe
# Note: df passes by reference, so changes are made in the passed matrix
df
.
loc
[
remaining_nan
.
index
,:
]
=
remaining_nan
def
dissolve_overlapping_polygons
(
df
,
by
=
'dummy'
):
if
by
not
in
df
.
columns
:
df
.
loc
[:,
by
]
=
1
tt
=
time
.
time
()
df_dissolved
=
df
.
dissolve
(
by
=
by
)
print
(
'Dissolved polygons in
%.2f
s'
%
(
time
.
time
()
-
tt
))
tt
=
time
.
time
()
df_split
=
explode_multipolygons
(
df_dissolved
)
print
(
'Split into individual polygons in
%.2f
s'
%
(
time
.
time
()
-
tt
))
return
df_split
def
clip_and_dissolve
(
data
,
pixels
,
name
,
pixel_ID
=
'FID'
,
pixel_res
=
None
,
keep_cols
=
None
):
tt
=
time
.
time
()
data_clipped
=
gpd
.
overlay
(
data
,
pixels
,
how
=
'intersection'
)
print
(
'Clipped
%s
in
%.2f
s'
%
(
name
,
time
.
time
()
-
tt
))
tt
=
time
.
time
()
data_per_pixel
=
data_clipped
.
dissolve
(
by
=
pixel_ID
)
print
(
'Dissolved
%s
by
%s
in
%.2f
s'
%
(
name
,
pixel_ID
,
time
.
time
()
-
tt
))
# compute area per pixel
output_cols
=
[
'geometry'
,
pixel_ID
,
'
%s
_area'
%
name
]
data_per_pixel
[
'
%s
_area'
%
name
]
=
data_per_pixel
.
geometry
.
area
# compute percentage area per pixel
if
pixel_res
is
not
None
:
data_per_pixel
[
'
%s
_area_perc'
%
name
]
=
data_per_pixel
[
'
%s
_area'
%
name
]
/
pixel_res
**
2
output_cols
.
append
(
'
%s
_area_perc'
%
name
)
if
keep_cols
is
not
None
:
[
output_cols
.
append
(
col
)
for
col
in
keep_cols
]
return
data_per_pixel
.
reset_index
()[
output_cols
]
Event Timeline
Log In to Comment