Python pandas 模块,DateOffset() 实例源码
我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用pandas.DateOffset()。
def round_timestamp_to_sleep_date(timeseries):
"""
Not my proudest function ... this isn't as efficient as it Could be,but struggling
with some pandas Syntax to find the perfect pandas one-line
This can be much more performant,but need time to sit down and figure it out
"""
sleep_dates = []
for value in timeseries:
if value.hour < SLEEP_CUTOFF_TIME:
result = value - pd.DateOffset(days=1)
else:
result = value
sleep_dates.append(result)
index = pd.DatetimeIndex(sleep_dates)
return index
def bug_timeseries2seqs(data, timestamps, length=3, T=48):
# have a bug
if type(timestamps[0]) != pd.Timestamp:
timestamps = string2timestamp(timestamps, T=T)
offset = pd.DateOffset(minutes=24 * 60 // T)
breakpoints = [0]
for i in range(1, len(timestamps)):
if timestamps[i-1] + offset != timestamps[i]:
breakpoints.append(i)
X = []
Y = []
for b in range(1, len(breakpoints)):
print('breakpoints: ', breakpoints[b-1], breakpoints[b])
idx = range(breakpoints[b-1], breakpoints[b])
for i in range(len(idx) - 3):
x = np.vstack(data[idx[i:i+3]])
y = data[idx[i+3]]
X.append(x)
Y.append(y)
X = np.asarray(X)
Y = np.asarray(Y)
print("X shape: ", X.shape, "Y shape:", Y.shape)
return X, Y
def periods(today=None):
"""
Construct a series of Period objects
:param today: If not specified use today's date. Specifying a today is quite useful in unit tests.
:return:
"""
today = today or pd.Timestamp("today")
def __f(offset, today):
return Period(start=today - offset, end=today)
offset = pd.Series()
offset["Two weeks"] = pd.DateOffset(weeks=2)
offset["Month-to-Date"] = pd.offsets.MonthBegin()
offset["Year-to-Date"] = pd.offsets.YearBegin()
offset["One Month"] = pd.DateOffset(months=1)
offset["Three Months"] = pd.DateOffset(months=3)
offset["One Year"] = pd.DateOffset(years=1)
offset["Three Years"] = pd.DateOffset(years=3)
offset["Five Years"] = pd.DateOffset(years=5)
offset["Ten Years"] = pd.DateOffset(years=10)
return offset.apply(__f, today=today)
def plotDailyStatsSleep(stats, columns=None):
"""
Plot daily stats. Fill all data range,and put NaN for days without measures
:param data: data to plot
"""
MEASURE_NAME = 'date'
if not columns:
columns = ['sleep_inefficiency', 'sleep_hours']
dataToPlot = _prepareDailyStats(stats, columns)
f, axes = getAxes(2,1)
xTicksDiv = min(10, len(dataToPlot))
#xticks = [(x-pd.DateOffset(years=1,day=2)).date() for x in stats.date]
xticks = [x.date() for x in dataToPlot.date]
keptticks = xticks[::int(len(xticks)/xTicksDiv)]
xticks = ['' for _ in xticks]
xticks[::int(len(xticks)/xTicksDiv)] = keptticks
for i, c in enumerate(columns):
g =sns.pointplot(x=MEASURE_NAME, y=NAMES[c], data=dataToPlot, ax=axes[i])
g.set_xticklabels([])
g.set_xlabel('')
g.set_xticklabels(xticks, rotation=45)
sns.plt.show()
def get_sim_index(self, tmin, tmax, freq, warmup):
"""Method to get the indices for the simulation,including the warmup
period.
Parameters
----------
tmin
tmax
freq
warmup
Returns
-------
"""
sim_index = pd.date_range(tmin - pd.DateOffset(days=warmup),
freq=freq)
return sim_index
def fetch_data(self, state):
query = QUERY % {'state': NAMES_TO_CODES[state], 'year': YEAR}
dataframe = run_query(query, cache_key='temperature-%s' % NAMES_TO_CODES[state])
dataframe['date'] = pd.to_datetime(dataframe[['year', 'month', 'day']])
dataframe['date_readable'] = dataframe['date'].apply(lambda x: x.strftime("%Y-%m-%d"))
dataframe['left'] = dataframe.date - pd.DateOffset(days=0.5)
dataframe['right'] = dataframe.date + pd.DateOffset(days=0.5)
dataframe = dataframe.set_index(['date'])
dataframe.sort_index(inplace=True)
return dataframe
def _is_offset(self, arr_or_obj):
""" check if obj or all elements of list-like is DateOffset """
if isinstance(arr_or_obj, pd.DateOffset):
return True
elif is_list_like(arr_or_obj):
return all(isinstance(x, pd.DateOffset) for x in arr_or_obj)
else:
return False
def _reference_dates(self, start_date, end_date):
"""
Get reference dates for the holiday.
Return reference dates for the holiday also returning the year
prior to the start_date and year following the end_date. This ensures
that any offsets to be applied will yield the holidays within
the passed in dates.
"""
if self.start_date is not None:
start_date = self.start_date.tz_localize(start_date.tz)
if self.end_date is not None:
end_date = self.end_date.tz_localize(start_date.tz)
year_offset = DateOffset(years=1)
reference_start_date = Timestamp(
datetime(start_date.year - 1, self.month, self.day))
reference_end_date = Timestamp(
datetime(end_date.year + 1, self.day))
# Don't process unnecessary holidays
dates = DatetimeIndex(start=reference_start_date,
end=reference_end_date,
freq=year_offset, tz=start_date.tz)
return dates
def test_catch_infinite_loop(self):
offset = datetools.DateOffset(minute=5)
# blow up,don't loop forever
self.assertRaises(Exception, date_range, datetime(2011, 11, 11),
datetime(2011, 12), freq=offset)
def test_series_interpolate_inTraday(self):
# #1698
index = pd.date_range('1/1/2012', periods=4, freq='12D')
ts = pd.Series([0, 12, 24, 36], index)
new_index = index.append(index + pd.DateOffset(days=1)).sort_values()
exp = ts.reindex(new_index).interpolate(method='time')
index = pd.date_range('1/1/2012', freq='12H')
ts = pd.Series([0, index)
new_index = index.append(index + pd.DateOffset(hours=1)).sort_values()
result = ts.reindex(new_index).interpolate(method='time')
self.assert_numpy_array_equal(result.values, exp.values)
def test_intersection_bug_1708(self):
from pandas import DateOffset
index_1 = date_range('1/1/2012', freq='12H')
index_2 = index_1 + DateOffset(hours=1)
result = index_1 & index_2
self.assertEqual(len(result), 0)
def check_complete(self):
missing_timestamps = []
offset = pd.DateOffset(minutes=24 * 60 // self.T)
pd_timestamps = self.pd_timestamps
i = 1
while i < len(pd_timestamps):
if pd_timestamps[i-1] + offset != pd_timestamps[i]:
missing_timestamps.append("(%s -- %s)" % (pd_timestamps[i-1], pd_timestamps[i]))
i += 1
for v in missing_timestamps:
print(v)
assert len(missing_timestamps) == 0
def timeseries2seqs(data, T=48):
raw_ts = copy(timestamps)
if type(timestamps[0]) != pd.Timestamp:
timestamps = string2timestamp(timestamps, len(timestamps)):
if timestamps[i-1] + offset != timestamps[i]:
print(timestamps[i-1], timestamps[i], raw_ts[i-1], raw_ts[i])
breakpoints.append(i)
breakpoints.append(len(timestamps))
X = []
Y = []
for b in range(1, breakpoints[b])
for i in range(len(idx) - length):
x = np.vstack(data[idx[i:i+length]])
y = data[idx[i+length]]
X.append(x)
Y.append(y)
X = np.asarray(X)
Y = np.asarray(Y)
print("X shape: ", Y
def timeseries2seqs_Meta(data, raw_ts[i])
breakpoints.append(i)
breakpoints.append(len(timestamps))
X = []
Y = []
avail_timestamps = []
for b in range(1, breakpoints[b])
for i in range(len(idx) - length):
avail_timestamps.append(raw_ts[idx[i+length]])
x = np.vstack(data[idx[i:i+length]])
y = data[idx[i+length]]
X.append(x)
Y.append(y)
X = np.asarray(X)
Y = np.asarray(Y)
print("X shape: ", Y, avail_timestamps
def period_by_hours(x, separation):
''' aggrege le x par intervale d'heure.
Le calcul pourrait être simple si on interdisait
le chevauchement de jour.
'''
print(separation)
assert isinstance(separation, list)
assert all([sep < 24 for sep in separation])
separation.sort()
if 0 in separation:
separation.append(24)
hour_categ = pd.cut(x.dt.hour, separation, right=False)
date_categ = x.dt.date
return date_categ.astype(str) + ' ' + hour_categ.astype(str)
else:
hour = x.dt.hour
hour_categ = pd.cut(hour, right=False).astype(str)
night_categ = '[' + str(separation[-1]) + ',' + str(separation[0]) + ')'
hour_categ[(hour < separation[0]) | (hour >= separation[-1])] = night_categ
assert hour_categ.nunique(dropna=False) == len(separation)
date_categ = x.dt.date.astype(str)
# décalage d'un jour pour les premières heures
decale = x.dt.date[x.dt.hour < separation[1]] + pd.DateOffset(days=-1)
date_categ[x.dt.hour < separation[1]] = decale.astype(str)
assert all(date_categ.str.len() == 10)
return date_categ + ' ' + hour_categ
### 4 - special
def test_medians_no_series_keys(self):
guac = test_util.load_dataset('bike_sharing', target='count')
guac.make_time_series('datetime', prediction_length=1, frequency=pd.DateOffset(hours=1))
medians = HistoricalMedians([1], guac.config, guac.logger)
out = medians.execute(guac.data)
out.df = out.df.sort_values('datetime')
self.assertTrue(np.isnan(out.df['count_median_1'].iloc[0]))
self.assertAlmostEqual(out.df['count_median_1'].iloc[1], 16, delta=1)
def shift_dates(self,h):
""" Auxiliary function for creating dates for forecasts
Parameters
----------
h : int
How many steps to forecast
Returns
----------
A transformed date_index object
"""
date_index = copy.deepcopy(self.index)
date_index = date_index[self.max_lag:len(date_index)]
if self.is_pandas is True:
if isinstance(date_index, pd.core.indexes.datetimes.DatetimeIndex):
if pd.infer_freq(date_index) in ['H', 'M', 'S']:
for t in range(h):
date_index += pd.DateOffset((date_index[len(date_index)-1] - date_index[len(date_index)-2]).seconds)
else: # Assume higher frequency (configured for days)
for t in range(h):
date_index += pd.DateOffset((date_index[len(date_index)-1] - date_index[len(date_index)-2]).days)
elif isinstance(date_index, pd.core.indexes.numeric.Int64Index):
for i in range(h):
new_value = date_index.values[len(date_index.values)-1] + (date_index.values[len(date_index.values)-1] - date_index.values[len(date_index.values)-2])
date_index = pd.Int64Index(np.append(date_index.values,new_value))
else:
for t in range(h):
date_index.append(date_index[len(date_index)-1]+1)
return date_index
def getDatasForOneRouteForOneDepartureDate(route, departureDate):
X = getoneRouteData(datas, route)
minDeparture = np.amin(X[:,8])
maxDeparture = np.amax(X[:,8])
print minDeparture
print maxDeparture
# get specific departure date datas
X = X[np.where(X[:, 8]==departureDate)[0], :]
# get the x values
xaxis = X[:,9] # observed date state
print xaxis
xaxis = departureDate-1-xaxis
print xaxis
tmp = xaxis
startdate = "20151109"
xaxis = [pd.to_datetime(startdate) + pd.DateOffset(days=state) for state in tmp]
print xaxis
# get the y values
yaxis = X[:,12]
# every monday
mondays = WeekdayLocator(MONDAY)
# every 3rd month
months = MonthLocator(range(1, 13), bymonthday=1, interval=01)
days = WeekdayLocator(byweekday=1, interval=2)
monthsFmt = DateFormatter("%b. %d,%Y")
fig, ax = plt.subplots()
ax.plot_date(xaxis, yaxis, 'r--')
ax.plot_date(xaxis, 'bo')
ax.xaxis.set_major_locator(days)
ax.xaxis.set_major_formatter(monthsFmt)
#ax.xaxis.set_minor_locator(mondays)
ax.autoscale_view()
#ax.xaxis.grid(False,'major')
#ax.xaxis.grid(True,'minor')
ax.grid(True)
plt.xlabel('Date')
plt.ylabel('Price in Euro')
fig.autofmt_xdate()
plt.show()
"""
# plot
line1,= plt.plot(xaxis,yaxis,'r--')
line2,'bo')
#plt.legend([line2],["Price"])
plt.xlabel('States')
plt.ylabel('Price in Euro')
plt.show()
"""
def __init__(self, name, year=None, month=None, day=None, offset=None,
observance=None, start_date=None, end_date=None,
days_of_week=None):
"""
Parameters
----------
name : str
Name of the holiday,defaults to class name
offset : array of pandas.tseries.offsets or
class from pandas.tseries.offsets
computes offset from date
observance: function
computes when holiday is given a pandas Timestamp
days_of_week:
provide a tuple of days e.g (0,1,2,3,) for Monday Through Thursday
Monday=0,..,Sunday=6
Examples
--------
>>> from pandas.tseries.holiday import Holiday,nearest_workday
>>> from pandas import DateOffset
>>> from dateutil.relativedelta import MO
>>> USMemorialDay = Holiday('MemorialDay',month=5,day=24,
offset=DateOffset(weekday=MO(1)))
>>> USLaborDay = Holiday('Labor Day',month=9,day=1,
offset=DateOffset(weekday=MO(1)))
>>> July3rd = Holiday('July 3rd',month=7,day=3,)
>>> NewYears = Holiday('New Years Day',month=1,
observance=nearest_workday),
>>> July3rd = Holiday('July 3rd',
days_of_week=(0,3))
"""
if offset is not None and observance is not None:
raise NotImplementedError("Cannot use both offset and observance.")
self.name = name
self.year = year
self.month = month
self.day = day
self.offset = offset
self.start_date = Timestamp(
start_date) if start_date is not None else start_date
self.end_date = Timestamp(
end_date) if end_date is not None else end_date
self.observance = observance
assert (days_of_week is None or type(days_of_week) == tuple)
self.days_of_week = days_of_week
def create_dataset(self, len_closeness=3, len_trend=3, TrendInterval=7, len_period=3, PeriodInterval=1):
"""current version
"""
# offset_week = pd.DateOffset(days=7)
offset_frame = pd.DateOffset(minutes=24 * 60 // self.T)
XC = []
XP = []
XT = []
Y = []
timestamps_Y = []
depends = [range(1, len_closeness+1),
[PeriodInterval * self.T * j for j in range(1, len_period+1)],
[TrendInterval * self.T * j for j in range(1, len_trend+1)]]
i = max(self.T * TrendInterval * len_trend, self.T * PeriodInterval * len_period, len_closeness)
while i < len(self.pd_timestamps):
Flag = True
for depend in depends:
if Flag is False:
break
Flag = self.check_it([self.pd_timestamps[i] - j * offset_frame for j in depend])
if Flag is False:
i += 1
continue
x_c = [self.get_matrix(self.pd_timestamps[i] - j * offset_frame) for j in depends[0]]
x_p = [self.get_matrix(self.pd_timestamps[i] - j * offset_frame) for j in depends[1]]
x_t = [self.get_matrix(self.pd_timestamps[i] - j * offset_frame) for j in depends[2]]
y = self.get_matrix(self.pd_timestamps[i])
if len_closeness > 0:
XC.append(np.vstack(x_c))
if len_period > 0:
XP.append(np.vstack(x_p))
if len_trend > 0:
XT.append(np.vstack(x_t))
Y.append(y)
timestamps_Y.append(self.timestamps[i])
i += 1
XC = np.asarray(XC)
XP = np.asarray(XP)
XT = np.asarray(XT)
Y = np.asarray(Y)
print("XC shape: ", XC.shape, "XP shape: ", XP.shape, "XT shape: ", XT.shape, Y.shape)
return XC, XP, XT, timestamps_Y
def timeseries2seqs_peroid_trend(data, T=48, peroid=pd.DateOffset(days=7), peroid_len=2):
raw_ts = copy(timestamps)
if type(timestamps[0]) != pd.Timestamp:
timestamps = string2timestamp(timestamps, T=T)
# timestamps index
timestamp_idx = dict()
for i, t in enumerate(timestamps):
timestamp_idx[t] = i
offset = pd.DateOffset(minutes=24 * 60 // T)
breakpoints = [0]
for i in range(1, breakpoints[b])
for i in range(len(idx) - length):
# period
target_timestamp = timestamps[i+length]
legal_idx = []
for pi in range(1, 1+peroid_len):
if target_timestamp - peroid * pi not in timestamp_idx:
break
legal_idx.append(timestamp_idx[target_timestamp - peroid * pi])
# print("len: ",len(legal_idx),peroid_len)
if len(legal_idx) != peroid_len:
continue
legal_idx += idx[i:i+length]
# trend
x = np.vstack(data[legal_idx])
y = data[idx[i+length]]
X.append(x)
Y.append(y)
X = np.asarray(X)
Y = np.asarray(Y)
print("X shape: ", Y
def Blue_Green(Name_NC_ET, Name_NC_P, Name_NC_ETref, Startdate, Enddate, Additional_Months):
"""
This functions split the evapotranspiration into green and blue evapotranspiration.
Parameters
----------
Dir_Basin : str
Path to all the output data of the Basin
Name_NC_ET : str
Path to the .nc file containing ET data
Name_NC_P : str
Path to the .nc file containing P data (including moving average period)
Name_NC_ETref : str
Path to the .nc file containing ETref data (including moving average period)
Moving_Averaging_Length: integer
Number defines the amount of months that are taken into account
Returns
-------
ET_Blue : array
Array[time,lat,lon] contains Blue Evapotranspiration
ET_Green : array
Array[time,lon] contains Green Evapotranspiration
"""
import wa.General.raster_conversions as RC
# Define startdate and enddate with moving average
Startdate_Moving_Average = pd.Timestamp(Startdate) - pd.DateOffset(months = Additional_Months)
Enddate_Moving_Average = pd.Timestamp(Enddate) + pd.DateOffset(months = Additional_Months)
Startdate_Moving_Average_String = '%d-%02d-%02d' %(Startdate_Moving_Average.year, Startdate_Moving_Average.month, Startdate_Moving_Average.day)
Enddate_Moving_Average_String = '%d-%02d-%02d' %(Enddate_Moving_Average.year, Enddate_Moving_Average.month, Enddate_Moving_Average.day)
# Extract ETref data from NetCDF file
ETref = RC.Open_nc_array(Name_NC_ETref, Startdate = Startdate_Moving_Average_String, Enddate = Enddate_Moving_Average_String)
# Extract P data from NetCDF file
P = RC.Open_nc_array(Name_NC_P, Enddate = Enddate_Moving_Average_String)
# Extract ET data from NetCDF file
ET = RC.Open_nc_array(Name_NC_ET, Startdate = Startdate, Enddate = Enddate)
# Apply moving average over 3 months
Pavg = RC.Moving_average(P, Additional_Months, Additional_Months)
ETrefavg = RC.Moving_average(ETref, Additional_Months)
# Calculate aridity index
Pavg[Pavg == 0] = 0.0001
phi = ETrefavg/Pavg
# Calculate Budyko
Budyko = Calc_budyko(phi)
# Calculate ETgreen
ETgreen = np.minimum(Budyko * P[Additional_Months:-Additional_Months,:,:], ET)
# Calculate ETblue
ETblue = ET - ETgreen
return(ETblue, ETgreen)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。