我有一些来自chirpS数据库的时空数据。它是一个NetCDF,包含全世界每天的降水量,其空间分辨率为1x1km2。该数据集具有3个维度(“时间”,“经度”,“纬度”)。
我想根据每个像素的坐标(“纬度”和“经度”)的时间分布来对该降水数据进行分类。因此,我希望应用二值化的维度是“时间”域。
这是StackOverflow(see in here)中已经讨论的类似问题。在我的情况下,它们的问题与我的问题之间的区别在于,我需要根据每个特定像素的时间分布对数据进行二值化,而不是对我所有的坐标(像素)应用单一值范围进行二值化。因此,我希望有不同的合并阈值(“ n”个阈值集),数据集中的每个“ n”个像素都有一个。
据我了解,在Xarray的DataArray / DataSet的每个坐标(像素)上应用函数的最简单,最快的方法是使用xarray.apply_ufunc。
对于二值化,我使用的是熊猫qcut方法,该方法只需要一组值和一些给定的相对频率(即:[0.1%,0.5%,25%,99%])即可为它工作。
由于pandas分箱功能需要一个数据数组,并且还返回另一个二值化数据数组,所以我知道我必须在U_function(described in here)中使用参数“ vectorize” = True。>
最后,当我运行分析时,结果Xarray数据集最终在处理后丢失了“时间”维。另外,我不确定该处理是否真正返回了具有正确分类数据的Xarray数据集。
这是可复制的代码段。注意,“ ds_binned”的“时间”维丢失了。因此,我必须稍后将装箱的数据重新插入到原始xarray数据集(ds)中。另请注意,尺寸设置不正确。这也给我的分析带来了麻烦。
import pandas as pd
pd.set_option('display.width',50000)
pd.set_option('display.max_rows',50000)
pd.set_option('display.max_columns',5000)
import numpy as np
import xarray as xr
from dask.diagnostics import ProgressBar
ds = xr.tutorial.open_dataset('rasm').load()
def parse_datetime(time):
return pd.to_datetime([str(x) for x in time])
ds.coords['time'] = parse_datetime(ds.coords['time'].values)
def binning_function(x,distribution_type='Positive',b=False):
y = np.where(np.abs(x)==np.inf,x)
y = np.where(np.isnan(y),y)
if np.all(y) == 0:
return x
else:
Classified = pd.qcut(y,np.linspace(0.01,1,10))
return Classified.codes
def xarray_parse_extremes(ds,dim=['time'],dask='allowed',new_dim_name=['classes'],kwargs={'b': False,'distribution_type':'Positive'}):
filtered = xr.apply_ufunc(binning_function,ds,dask=dask,vectorize=True,input_core_dims=[dim],#exclude_dims = [dim],output_core_dims=[new_dim_name],kwargs=kwargs,output_dtypes=[float],join='outer',dataset_fill_value=np.nan,).compute()
return filtered
with ProgressBar():
da_binned = xarray_parse_extremes(ds['Tair'],['time'],dask='allowed')
da_binned.name = 'classes'
ds_binned = da_binned.to_dataset()
ds['classes'] = (('y','x','time'),ds_binned['classes'].values)
mask = (ds['classes'] >= 5) & (ds['classes'] != 0)
ds.where(mask,drop=True).resample({'time':'Y'}).count('time')['Tair'].isel({'time':-1}).plot()
print(ds)
(ds.where(mask,drop=True).resample({'time':'Y'}).count('time')['Tair']
.to_dataframe().dropna().sort_values('Tair',ascending=False)
)
delayed_to_netcdf = ds.to_netcdf(r'F:\Philipe\temp\teste_tutorial.nc',engine='netcdf4',compute =False)
print('saving data classified')
with ProgressBar():
delayed_to_netcdf.compute()