Page MenuHomec4science

geo_util.py
No OneTemporary

File Metadata

Created
Wed, Jun 18, 15:46

geo_util.py

import fiona
import geopandas as gpd
import pandas as pd
from shapely.geometry.polygon import Polygon
from shapely.geometry.multipolygon import MultiPolygon
from sklearn.neighbors import KNeighborsRegressor
from sklearn.ensemble import RandomForestRegressor
import numpy as np
import itertools
import matplotlib
from scipy.spatial.distance import cdist
import csv
import time
LV03 = 'epsg:21781'
LV95 = 'epsg:2056'
def ceil_to_interval(x, round_int):
return np.ceil(x / round_int) * round_int
def floor_to_interval(x, round_int):
return np.floor(x / round_int) * round_int
def round_to_interval(x, round_int):
return np.round(x / round_int) * round_int
def write_row_to_csv( filename, content_vector, initialize = False, delimiter = ','):
# writes a row to a csv file
if initialize: open_mode = 'w'
else: open_mode = 'a'
with open(filename, open_mode) as csvfile:
w = csv.writer(csvfile, delimiter=delimiter)
w.writerow( content_vector )
def load_from_file(filepath, layer_name, batch_id = 0, batch_size = None):
# load a given batch of lines from file and pre-process the data
# returns geopandas dataframe with polygons and database information
# open file with fiona
file = fiona.open(filepath, layer=layer_name)
# load selected rows from file
if batch_size is None:
start = 0
end = len(file)
else:
start = batch_id * batch_size
end = min( (batch_id + 1) * batch_size, len(file) )
data = gpd.GeoDataFrame.from_features(file[start : end])
if len(data) == 0:
raise ValueError('Rooftop dataframe is empty - no data loaded from file.')
return data
def load_file(fp, columns = None, rows = None, crs = None, bbox = None, **kwargs):
if columns is not None:
all_cols = gpd.read_file( fp, rows = 0, **kwargs ).columns
ignore_cols = list( all_cols[ np.logical_not( all_cols.isin( columns ))])
else:
ignore_cols = None
return gpd.read_file( fp, ignore_fields = ignore_cols, rows = rows, bbox = bbox, **kwargs)
def spatially_interpolate( dataIn, coordsOut, interpVar, coordNamesIn = ['x', 'y'], weights = 'distance' ):
# Interpolate a variable according to its coordinates in 2D
interpolator = KNeighborsRegressor(n_neighbors=4, weights = weights)
interpolator.fit(dataIn[coordNamesIn], dataIn[interpVar])
return interpolator.predict( coordsOut )
def spatially_interpolate_RF( dataIn, coordsOut, interpVar, coordNamesIn = ['x', 'y'], n_est = 200 ):
# Interpolate a variable according to its coordinates in the coordNamesIn
tt = time.time()
interpolator = RandomForestRegressor(n_estimators=n_est, n_jobs = -1)
interpolator.fit(dataIn[coordNamesIn], dataIn[interpVar])
print('Fitted estimator in %.2fs' %(time.time()-tt))
return interpolator.predict( coordsOut )
def map_coords_to_pixel(coords, offset, pixel_res, decimals = 1 ):
# offset should be either scalar or of type np.array()
tmp_coords = coords - offset
tmp_coords_rounded = round_to_interval(tmp_coords, pixel_res)
return (tmp_coords_rounded + offset).round( decimals )
def select_by_coords(gdf, X0, X1, Y0, Y1):
bounds = gdf.bounds
return gdf[(bounds.minx >= X0) & (bounds.maxx <= X1) & (bounds.miny >= Y0) & (bounds.maxy <= Y1)]
def coords_to_point( df, coord_names = ['x', 'y'] ):
xy = df[ coord_names ].values
return gpd.GeoDataFrame( df.copy(), geometry = gpd.points_from_xy(xy[:,0], xy[:,1]))
def create_point_grid(delta, minx, maxx, miny, maxy, NX = None, NY = None, coord_names = ['x', 'y']):
# compute the number of required points and get the vectors for x and y
if NX is None:
NX = (maxx - minx) / delta + 1
x = np.linspace(minx, maxx, int(NX))
if NY is None:
NY = (maxy - miny) / delta + 1
y = np.linspace(miny, maxy, int(NY))
# get all combinations of x and y and create a dataframe
xy = np.asarray(list(itertools.product(x, y)))
return gpd.GeoDataFrame( xy, columns = coord_names, geometry = gpd.points_from_xy(xy[:,0], xy[:,1]))
def create_grid_from_centers(coords, pixel_res, coord_names = ['x', 'y']):
# loop through all polygons and append coordinates of each corner (generate "fishnet")
polygons = []
for idx, data in coords.iterrows():
Xleft = data[ coord_names[0] ] - 0.5*pixel_res
Xright = data[ coord_names[0] ] + 0.5*pixel_res
Ytop = data[ coord_names[1] ] + 0.5*pixel_res
Ybottom = data[ coord_names[1] ] - 0.5*pixel_res
polygons.append(Polygon([(Xleft, Ytop), (Xright, Ytop), (Xright, Ybottom), (Xleft, Ybottom)]))
return gpd.GeoDataFrame({ 'geometry': polygons,
coord_names[0] : coords[coord_names[0] ],
coord_names[1] : coords[coord_names[1] ],})
def create_grid_from_corners(coords, coord_names = ['x', 'y']):
# loop through all polygons and append coordinates of each corner (generate "fishnet")
polygons = []
X = np.sort(coords[coord_names[0]].unique())
Y = np.sort(coords[coord_names[1]].unique())
Xleft = X[:-1]
Xright = X[1:]
Ybottom = Y[:-1]
Ytop = Y[1:]
NX = len(X)-1
NY = len(Y)-1
for y in range(NY):
for x in range(NX):
xleft = Xleft[x]
xright = Xright[x]
ybottom = Ybottom[y]
ytop = Ytop[y]
polygons.append(Polygon([(xleft, ytop), (xright, ytop), (xright, ybottom), (xleft, ybottom)]))
return gpd.GeoDataFrame({ 'geometry': polygons })
def explode_multipolygons( indf, ID_NAME = 'index' ):
# CODE FROM: https://gist.github.com/mhweber/cf36bb4e09df9deee5eb54dc6be74d26
workdf = indf.copy()
workdf[ ID_NAME ] = indf.index
outdf = gpd.GeoDataFrame(columns = workdf.columns)
for idx, row in workdf.iterrows():
if type(row.geometry) == Polygon:
outdf = outdf.append(row,ignore_index=True)
if type(row.geometry) == MultiPolygon:
geomdf = gpd.GeoDataFrame( pd.DataFrame( list(row.geometry), columns = ['geometry']) )
geomdf[ ID_NAME ] = idx
multdf = geomdf.merge( workdf.drop(columns = ['geometry']), on = ID_NAME, how = 'left' )
outdf = outdf.append(multdf,ignore_index=True)
return outdf[ workdf.columns ] # put columns in the right order
def extract_geom_from_collection(df, keep_geom = ['Polygon', 'MultiPolygon']):
# Extracts geometries from a geopandas dataframe, by looping through the dataframe and its geometry collections
# To obtain geometry collections, set keep_geom_type = False in overlay function
good_geoms = df[ df.geometry.geom_type.isin(keep_geom) ]
collections = df[ df.geometry.geom_type == 'GeometryCollection' ]
collected_geoms = []
for idx, data in collections.iterrows():
for item in data.geometry:
if item.geom_type in keep_geom:
data['geometry'] = item
collected_geoms.append(data)
return pd.concat([good_geoms, gpd.GeoDataFrame( collected_geoms )])
def overlay_and_clean(df1, df2, keep_geom = ['Polygon', 'MultiPolygon'], how = 'intersection', tolerance = None):
# Performs overlay operation with all geometries, and extracts only the keep_geoms, including those from collections
tt = time.time()
overlay = gpd.overlay( df1, df2, how = how, keep_geom_type = False )
print('Overlaid dataframes in %.2fs' %(time.time()-tt))
tt = time.time()
cleaned_df = extract_geom_from_collection(overlay, keep_geom = keep_geom)
if tolerance is not None:
cleaned_df = cleaned_df[cleaned_df.area > tolerance]
print('Cleaned geometries in %.2fs' %(time.time()-tt))
return cleaned_df
def fill_na_by_closest(df, nan_field, coords = ['x', 'y']):
if type(nan_field) is str:
nan_field = [nan_field]
# split the data in two subsets: the one with the nans and the ones with existing data
remaining_nan = df[ (df[ nan_field ].isna().all(axis = 1)) & np.logical_not(df[ coords ].isna().all(axis = 1)) ]
matched_data = df.dropna(subset = nan_field)
idx_closest_coord = []
tt = time.time()
for idx, remaining_val in remaining_nan.iterrows():
# Compute the distance matrix and round to allow for easier matching
dist_matrix = cdist(remaining_val[ coords ].values.reshape((1,len(coords))), matched_data[ coords ])
# Get the iloc index in the matched_data df for the point closest to the remaining_nan
idx_closest_coord.append( dist_matrix.argmin() )
print('Filled NaN values in %.2fs' %(time.time()-tt))
# Add the restriction code of the closest point (by iloc index) to the remaining nan df
for field_name in nan_field:
remaining_nan[ field_name ] = matched_data.iloc[ idx_closest_coord ][ field_name ].values
# replace the respective columns in the joint dataframe
# Note: df passes by reference, so changes are made in the passed matrix
df.loc[ remaining_nan.index,: ] = remaining_nan
def dissolve_overlapping_polygons(df, by = 'dummy'):
if by not in df.columns:
df.loc[:,by] = 1
tt = time.time()
df_dissolved = df.dissolve(by = by)
print('Dissolved polygons in %.2fs' %(time.time()-tt))
tt = time.time()
df_split = explode_multipolygons( df_dissolved )
print('Split into individual polygons in %.2fs' %(time.time()-tt))
return df_split
def clip_and_dissolve(data, pixels, name, pixel_ID='FID', pixel_res=None, keep_cols=None):
tt = time.time()
data_clipped = gpd.overlay(data, pixels, how='intersection')
print('Clipped %s in %.2fs' %(name, time.time()-tt))
tt = time.time()
data_per_pixel = data_clipped.dissolve(by = pixel_ID)
print('Dissolved %s by %s in %.2fs' %(name, pixel_ID, time.time()-tt))
# compute area per pixel
output_cols = ['geometry', pixel_ID, '%s_area' %name]
data_per_pixel['%s_area' %name] = data_per_pixel.geometry.area
# compute percentage area per pixel
if pixel_res is not None:
data_per_pixel['%s_area_perc' %name] = data_per_pixel['%s_area' %name] / pixel_res**2
output_cols.append('%s_area_perc' %name)
if keep_cols is not None:
[ output_cols.append(col) for col in keep_cols ]
return data_per_pixel.reset_index()[output_cols]

Event Timeline