Page MenuHomec4science

dc_chunker.py
No OneTemporary

File Metadata

Created
Sun, Feb 23, 13:49

dc_chunker.py

import math
from datetime import datetime
from itertools import groupby
import xarray as xr
import numpy as np
import os
def create_geographic_chunks(longitude=None, latitude=None, geographic_chunk_size=0.5):
"""Chunk a parameter set defined by latitude, longitude, and a list of acquisitions.
Process the lat/lon/time parameters defined for loading Data Cube Data - these should be
produced by dc.list_acquisition_dates
Args:
latitude: Latitude range to split
longitude: Longitude range to split
Returns:
A zip formatted list of dicts containing longitude, latitude that can be used to update params
"""
assert latitude and longitude, "Longitude and latitude are both required kwargs."
square_area = (latitude[1] - latitude[0]) * (longitude[1] - longitude[0])
geographic_chunks = math.ceil(square_area / geographic_chunk_size)
#we're splitting accross latitudes and not longitudes
#this can be a fp value, no issue there.
latitude_chunk_size = (latitude[1] - latitude[0]) / geographic_chunks
latitude_ranges = [(latitude[0] + latitude_chunk_size * chunk_number,
latitude[0] + latitude_chunk_size * (chunk_number + 1))
for chunk_number in range(geographic_chunks)]
longitude_ranges = [longitude for __ in latitude_ranges]
return [{'longitude': pair[0], 'latitude': pair[1]} for pair in zip(longitude_ranges, latitude_ranges)]
def combine_geographic_chunks(chunks):
"""Combine a group of chunks generated by create_geographic_chunks
Combines chunks, eliminating duplicate indices. reindexes
on all dims to ensure that the resulting dataset is identical to what
would be generated in a single monolithic load.
Args:
Chunks: array of xarray datasets to combine
Returns:
Xarray representing the combined product.
"""
# we aren't doing the xr.Dataset combine_first as it causes mem to spike to 10+Gb for small areas
data_types = {data_var: chunks[0][data_var].dtype for data_var in list(chunks[0].data_vars)}
combined_data = []
valid_latitudes = None
for chunk in chunks:
if valid_latitudes is None:
combined_data.append(chunk.copy(deep=True))
valid_latitudes = chunk.latitude.values
continue
# Create a mask flagging latitudes that already exist as false so they can be filtered out.
mask = np.in1d(chunk.latitude, valid_latitudes)
masked_dataset = chunk.where(~xr.ufuncs.logical_and(chunk.latitude, mask), drop=True)
# required to preserve dtype - they are lost during .where for some reason.
for data_var in masked_dataset.data_vars:
masked_dataset[data_var] = masked_dataset[data_var].astype(data_types[data_var])
combined_data.append(masked_dataset)
valid_latitudes = np.append(valid_latitudes, masked_dataset.latitude.values)
combined_data = xr.concat(combined_data, dim='latitude')
indices = {
'latitude': sorted(combined_data.latitude.values, reverse=True),
}
return combined_data.reindex(indices, copy=False)
def create_time_chunks(datetime_list, _reversed=False, time_chunk_size=10):
"""Create an iterable containing groups of acquisition dates using class attributes
Seperate a list of datetimes into chunks by acquisition, year, month, etc.
Args:
datetime_list: List or iterable of datetimes to chunk
kwargs:
_reversed (optional): boolean signifying that the acquisitions should be sorted least recent -> most recent (default)
or most recent -> least recent
Returns:
iterable of time chunks
"""
datetimes_sorted = sorted(datetime_list, reverse=_reversed)
if time_chunk_size is None:
return [datetimes_sorted]
return _chunk_iterable(datetimes_sorted, time_chunk_size)
def group_datetimes_by_year(datetime_list):
"""Group a list of datetimes by year"""
data = {}
for key, val in groupby(sorted(datetime_list, key=lambda x: x.year), lambda y: y.year):
data[key] = list(val)
return data
def group_datetimes_by_month(datetime_list, months=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]):
"""Group an iterable of datetimes by month with an inclusion list"""
month_filtered = filter(lambda x: x.month in months, datetime_list)
data = {}
#groupby requires a sorted list
for key, val in groupby(sorted(month_filtered, key=lambda x: x.month), lambda y: y.month):
data[key] = list(val)
return data
def _chunk_iterable(_iterable, chunk_size):
"""Split an iterable into chunk_sized parts"""
chunks = [_iterable[index:index + chunk_size] for index in range(0, len(_iterable), chunk_size)]
return chunks
def generate_baseline(_iterable, window_length):
"""Generate a sliding baseline of an iterable
Creates a list of sliding baselines for the iterable. e.g. if you pass in
a list of len==5 with a baseline length of 2, we will generate:
[
[elem0 (first element), elem1, elem2],
[elem1, elem2, elem3],
[elem2, elem3, elem4(last element)]
]
The first element in each list is the element that the baseline is created for, followed by
window_length number of elements as the baseline.
Args:
_iterable: iterable to create baselines for
window_length: Number of elements to form a baseline
Returns:
list like [[window_1], [window_2], [window_3] ...]
"""
if len(_iterable) <= window_length:
return [_iterable]
num_windows = len(_iterable) - window_length
return [_iterable[window:window + window_length + 1] for window in range(num_windows)]

Event Timeline