Python pandas 模块,rolling_min() 实例源码
我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用pandas.rolling_min()。
def VCI(df, n, rng = 8):
if n > 7:
vara = pd.rolling_max(df.high, rng) - pd.rolling_min(df.low, rng)
varB = vara.shift(rng)
varC = vara.shift(rng*2)
varD = vara.shift(rng*3)
varE = vara.shift(rng*4)
avg_tr = (vara+varB+varC+varD+varE)/25.0
else:
tr = pd.concat([df.high - df.low, abs(df.close - df.close.shift(1))], join='outer', axis=1).max(1)
avg_tr = pd.rolling_mean(tr, n) * 0.16
avg_pr = (pd.rolling_mean(df.high, n) + pd.rolling_mean(df.low, n))/2.0
VO = pd.Series((df.open - avg_pr)/avg_tr, name = 'VCIO')
VH = pd.Series((df.high - avg_pr)/avg_tr, name = 'VCIH')
VL = pd.Series((df.low - avg_pr)/avg_tr, name = 'VCIL')
VC = pd.Series((df.close - avg_pr)/avg_tr, name = 'VCIC')
return pd.concat([VO, VH, VL, VC], axis=1)
def getwilliam(close, high, low):
'''
??????
:param DataFrame close: ???
:param DataFrame high: ?????
:param DataFrame low: ?????
:return: DataFrame w: ????
'''
# ?14???
n = 14
high = pd.rolling_max(high, n)
high.index = range(high.shape[0])
low = pd.rolling_min(low, n)
low.index = range(low.shape[0])
w = 100 - 100 * (close - low) / (high - low)
w.replace([np.nan, np.inf, -np.inf], 0, inplace=True)
return w
def rolling_min(series, window=14, min_periods=None):
min_periods = window if min_periods is None else min_periods
try:
try:
return series.rolling(window=window, min_periods=min_periods).min()
except BaseException:
return pd.Series(series).rolling(window=window, min_periods=min_periods).min()
except BaseException:
return pd.rolling_min(series, window=window, min_periods=min_periods)
# ---------------------------------------------
def rolling_max(series, min_periods=min_periods)
# ---------------------------------------------
def gen_config_file(filename):
sim_config = {}
sim_config['sim_func'] = 'bktest_psar_test.psar_test_sim'
sim_config['scen_keys'] = ['freq']
sim_config['sim_name'] = 'psar_test'
sim_config['products'] = ['m'] #[ 'm','RM','y','p','a','rb','SR','TA','MA','i','ru','j','jm','ag','cu','au','al','zn' ]
sim_config['start_date'] = '20141101'
sim_config['end_date'] = '20151118'
sim_config['freq'] = [ '15m', '60m' ]
sim_config['pos_class'] = 'strat.TradePos'
sim_config['proc_func'] = 'dh.min_freq_group'
#chan_func = {'high': {'func': 'pd.rolling_max','args':{}},
# 'low': {'func': 'pd.rolling_min',
# }
config = {'capital': 10000,
'offset': 0,
'chan': 20,
'use_chan': True,
'sar_params': {'iaf': 0.02, 'maxaf': 0.2, 'incr': 0.02},
'trans_cost': 0.0,
'close_daily': True,
'unit': 1,
'stoploss': 0.0,
#'proc_args': {'minlist':[1500]},
'proc_args': {'freq':15},
'pos_args': {},
'pos_update': False,
#'chan_func': chan_func,
}
sim_config['config'] = config
with open(filename, 'w') as outfile:
json.dump(sim_config, outfile)
return sim_config
def process_data(self, mdf):
xdf = self.proc_func(mdf, **self.proc_args)
if self.win == -1:
tr= pd.concat([xdf.high - xdf.low, abs(xdf.close - xdf.close.shift(1))],
join='outer', axis=1).max(axis=1)
elif self.win == 0:
tr = pd.concat([(pd.rolling_max(xdf.high, 2) - pd.rolling_min(xdf.close, 2))*self.multiplier,
(pd.rolling_max(xdf.close, 2) - pd.rolling_min(xdf.low,
xdf.high - xdf.close,
xdf.close - xdf.low],
join='outer', axis=1).max(axis=1)
else:
tr= pd.concat([pd.rolling_max(xdf.high, self.win) - pd.rolling_min(xdf.close, self.win),
pd.rolling_max(xdf.close, self.win) - pd.rolling_min(xdf.low, self.win)], axis=1).max(axis=1)
xdf['TR'] = tr
xdf['chanh'] = self.chan_high(xdf['high'], self.chan, **self.chan_func['high']['args'])
xdf['chanl'] = self.chan_low(xdf['low'], **self.chan_func['low']['args'])
xdf['ATR'] = dh.ATR(xdf, n = self.atr_len)
xdf['MA'] = dh.MA(xdf, n=self.atr_len, field = 'close')
xdata = pd.concat([xdf['TR'].shift(1), xdf['MA'].shift(1), xdf['ATR'].shift(1),
xdf['chanh'].shift(1), xdf['chanl'].shift(1),
xdf['open']], axis=1, keys=['tr','ma', 'atr', 'chanh', 'chanl', 'dopen']).fillna(0)
self.df = mdf.join(xdata, how = 'left').fillna(method='ffill')
self.df['datetime'] = self.df.index
self.df['cost'] = 0
self.df['pos'] = 0
self.df['Traded_price'] = self.df['open']
def process_data(self, how = 'left').fillna(method='ffill')
self.df['datetime'] = self.df.index
self.df['cost'] = 0
self.df['pos'] = 0
self.df['Traded_price'] = self.df['open']
def process_data(self, how = 'left').fillna(method='ffill')
self.df['datetime'] = self.df.index
self.df['cost'] = 0
self.df['pos'] = 0
self.df['Traded_price'] = self.df['open']
def gen_config_file(filename):
sim_config = {}
sim_config['sim_class'] = 'bktest_dtchan_vecsim.DTChanSim'
sim_config['sim_func'] = 'run_vec_sim'
sim_config['scen_keys'] = ['param', 'chan']
sim_config['sim_name'] = 'DTChan_VecSim'
sim_config['products'] = ['m', 'RM', 'y', 'p', 'a', 'rb', 'SR', 'TA', 'MA', 'i', 'ru', 'j' ]
sim_config['start_date'] = '20150102'
sim_config['end_date'] = '20170428'
sim_config['param'] = [
(0.5, 0.5, 0.0), (0.6, (0.7, (0.8, \
(0.9, (1.0, (1.1, \
(0.5, 1, \
(0.2, 2, (0.25,2, (0.3, (0.35,\
(0.4, (0.45,(0.5, 4,(0.3,\
]
sim_config['chan'] = [3, 5, 10, 15, 20]
sim_config['pos_class'] = 'strat.TradePos'
sim_config['proc_func'] = 'dh.day_split'
sim_config['offset'] = 1
chan_func = {'high': {'func': 'pd.rolling_max', 'args':{}},
'low': {'func': 'pd.rolling_min',
}
config = {'capital': 10000,
'use_chan': True,
'close_daily': False,
'min_range': 0.0035,
'proc_args': {'minlist':[1500]},
'chan_func': chan_func, outfile)
return sim_config
def CMI(df, n):
ts = pd.Series(abs(df['close'] - df['close'].shift(n))/(pd.rolling_max(df['high'], n) - pd.rolling_min(df['low'], n))*100, name='CMI'+str(n))
return ts
def CHENow_PLUNGER(df, atr_n = 40):
atr = ATR(df, atr_n)
high = pd.Series((pd.rolling_max(df['high'], n) - df['close'])/atr, name = 'CPLUNGER_H'+ str(n))
low = pd.Series((df['close'] - pd.rolling_min(df['low'], n))/atr, name = 'CPLUNGER_L'+ str(n))
return pd.concat([high,low], axis=1)
#Donchian Channel
def DONCH_L(df, field = 'low'):
DC_L = pd.rolling_min(df[field], n)
return pd.Series(DC_L, name = 'DONCH_L'+ field[0].upper() + str(n))
def FISHER(df, smooth_p = 0.7, smooth_i = 0.7):
roll_high = pd.rolling_max(df.high, n)
roll_low = pd.rolling_min(df.low, n)
price_loc = (df.close - roll_low)/(roll_high - roll_low) * 2.0 - 1
sm_price = pd.Series(pd.ewma(price_loc, com = 1.0/smooth_p - 1, adjust = False), name = 'FISHER_P')
fisher_ind = 0.5 * np.log((1 + sm_price)/(1 - sm_price))
sm_fisher = pd.Series(pd.ewma(fisher_ind, com = 1.0/smooth_i - 1, name = 'FISHER_I')
return pd.concat([sm_price, sm_fisher], axis=1)
def WPR(df, n):
res = pd.Series((df['close'] - pd.rolling_min(df['low'], n))/(pd.rolling_max(df['high'], name = "WPR_%s" % str(n))
return res
def DT_RNG(df, win = 2, ratio = 0.7):
if win == 0:
tr_ts = pd.concat([(pd.rolling_max(df['high'], 2) - pd.rolling_min(df['close'], 2))*0.5,
(pd.rolling_max(df['close'], 2) - pd.rolling_min(df['low'],
df['high'] - df['close'],
df['close'] - df['low']],
join='outer', axis=1).max(axis=1)
else:
tr_ts = pd.concat([pd.rolling_max(df['high'], win) - pd.rolling_min(df['close'], win),
pd.rolling_max(df['close'], win) - pd.rolling_min(df['low'], win)],
join='outer', axis=1).max(axis=1)
return pd.Series(tr_ts, name = 'DTRNG%s_%s' % (win, ratio))
def ichimoku(price_objs):
"""
computes the ichimoku cloud for price_objs
"""
dates = [pd.to_datetime(str(obj.created_on)) for obj in price_objs]
prices = [obj.price for obj in price_objs]
d = {'date': dates,
'price': prices}
_prices = pd.DataFrame(d)
# Tenkan-sen (Conversion Line): (9-period high + 9-period low)/2))
period9_high = pd.rolling_max(_prices['price'], window=9)
period9_low = pd.rolling_min(_prices['price'], window=9)
tenkan_sen = (period9_high + period9_low) / 2
# Kijun-sen (Base Line): (26-period high + 26-period low)/2))
period26_high = pd.rolling_max(_prices['price'], window=26)
period26_low = pd.rolling_min(_prices['price'], window=26)
kijun_sen = (period26_high + period26_low) / 2
# Senkou Span A (Leading Span A): (Conversion Line + Base Line)/2))
senkou_span_a = ((tenkan_sen + kijun_sen) / 2).shift(26)
# Senkou Span B (Leading Span B): (52-period high + 52-period low)/2))
period52_high = pd.rolling_max(_prices['price'], window=52)
period52_low = pd.rolling_min(_prices['price'], window=52)
senkou_span_b = ((period52_high + period52_low) / 2).shift(26)
# The most current closing price plotted 22 time periods behind (optional)
chikou_span = _prices.shift(-22) # 22 according to investopedia
return {
'tenkan_sen': tenkan_sen,
'kijun_sen': kijun_sen,
'senkou_span_a': senkou_span_a,
'senkou_span_b': senkou_span_b,
'chikou_span': chikou_span,
}
def ichimoku(price_objs):
"""
computes the ichimoku cloud for price_objs
"""
dates = [pd.to_datetime(str(obj.created_on)) for obj in price_objs]
prices = [obj.price for obj in price_objs]
d = {'date': dates,
}
def gen_config_file(filename):
sim_config = {}
sim_config['sim_class'] = 'bktest_dtsplit_psar.DTStopSim'
sim_config['sim_func'] = 'run_loop_sim'
sim_config['scen_keys'] = ['param', 'stoploss']
sim_config['sim_name'] = 'DT_psar'
sim_config['products'] = ['m', 'j' ]
sim_config['start_date'] = '20150102'
sim_config['end_date'] = '20160311'
sim_config['param'] = [
(0.5, \
#(0.2,4,0.5,0.0),(0.25,(0.3,(0.35,\
#(0.4,(0.45,(0.5,\
]
sim_config['stoploss'] = [1, 3]
sim_config['pos_class'] = 'strat.TargetTrailTradePos'
sim_config['proc_func'] = 'dh.day_split'
sim_config['offset'] = 1
chan_func = {'high': {'func': 'pd.rolling_max',
'chan': 10,
'use_chan': False,
'min_range': 0.004,
'pos_update': True,
'atr_period': 10,
'chan_func': chan_func, outfile)
return sim_config
def run_vec_sim(self):
xdf = self.proc_func(self.df,
join='outer', 2)) * self.multiplier,
(pd.rolling_max(xdf.close,
xdf.close - xdf.low],
join='outer',
pd.rolling_max(xdf.close, axis=1).max(axis=1)
xdf['tr'] = tr
xdf['chan_h'] = self.chan_high(xdf, **self.chan_func['high']['args'])
xdf['chan_l'] = self.chan_low(xdf, **self.chan_func['low']['args'])
xdf['atr'] = dh.ATR(xdf, self.machan)
xdf['ma'] = pd.rolling_mean(xdf.close, self.machan)
xdf['rng'] = pd.DataFrame([self.min_rng * xdf['open'], self.k * xdf['tr'].shift(1)]).max()
xdf['upper'] = xdf['open'] + xdf['rng'] * (1 + (xdf['open'] < xdf['ma'].shift(1))*self.f)
xdf['lower'] = xdf['open'] - xdf['rng'] * (1 + (xdf['open'] > xdf['ma'].shift(1))*self.f)
xdata = pd.concat([xdf['upper'], xdf['lower'],
xdf['chan_h'].shift(1), xdf['chan_l'].shift(1), keys=['upper','lower', 'chan_h', 'chan_l', 'xopen']).fillna(0)
mdf = self.df.join(xdata, how = 'left').fillna(method='ffill')
mdf['dt_signal'] = np.nan
if self.price_mode == "HL":
up_price = mdf['high']
dn_price = mdf['low']
elif self.price_mode == "TP":
up_price = (mdf['high'] + mdf['low'] + mdf['close'])/3.0
dn_price = up_price
elif self.price_mode == "CL":
up_price = mdf['close']
dn_price = mdf['close']
else:
print "unsupported price mode"
mdf.ix[up_price >= mdf['upper'], 'dt_signal'] = 1
mdf.ix[dn_price <= mdf['lower'], 'dt_signal'] = -1
if self.close_daily:
mdf.ix[ mdf['min_id'] >= self.exit_min, 'dt_signal'] = 0
addon_signal = copy.deepcopy(mdf['dt_signal'])
mdf['dt_signal'] = mdf['dt_signal'].fillna(method='ffill').fillna(0)
mdf['chan_sig'] = np.nan
if combo_signal:
mdf.ix[(up_price >= mdf['chan_h']) & (addon_signal > 0), 'chan_sig'] = 1
mdf.ix[(dn_price <= mdf['chan_l']) & (addon_signal < 0), 'chan_sig'] = -1
else:
mdf.ix[(mdf['high'] >= mdf['chan_h']), 'chan_sig'] = 1
mdf.ix[(mdf['low'] <= mdf['chan_l']), 'chan_sig'] = -1
mdf['chan_sig'] = mdf['chan_sig'].fillna(method='ffill').fillna(0)
pos = mdf['dt_signal'] * (self.unit[0] + (mdf['chan_sig'] * mdf['dt_signal'] > 0) * self.unit[1])
mdf['pos'] = pos.shift(1).fillna(0)
mdf.ix[-3:, 'pos'] = 0
mdf['cost'] = abs(mdf['pos'] - mdf['pos'].shift(1)) * (self.offset + mdf['open'] * self.tcost)
mdf['cost'] = mdf['cost'].fillna(0.0)
mdf['Traded_price'] = mdf['open']
self.closed_Trades = backtest.simdf_to_Trades1(mdf, slippage = self.offset )
return mdf, self.closed_Trades
def dual_thrust_sim( mdf, config):
close_daily = config['close_daily']
offset = config['offset']
k = config['param'][0]
win = config['param'][1]
proc_func = config['proc_func']
proc_args = config['proc_args']
chan_func = config['chan_func']
chan_high = eval(chan_func['high']['func'])
chan_low = eval(chan_func['low']['func'])
tcost = config['trans_cost']
min_rng = config['min_range']
chan = config['chan']
xdf = proc_func(mdf, **proc_args)
tr= pd.concat([pd.rolling_max(xdf.high, win) - pd.rolling_min(xdf.close,
pd.rolling_max(xdf.close, win) - pd.rolling_min(xdf.low,
join='outer', axis=1).max(axis=1)
xdf['tr'] = tr
xdf['chan_h'] = chan_high(xdf, chan, **chan_func['high']['args'])
xdf['chan_l'] = chan_low(xdf, **chan_func['low']['args'])
#sar_param = config['sar_param']
#sar = dh.SAR(xdf,**sar_param)
#sar_signal = pd.Series(0,index = sar.index)
#sar_signal[(sar < xdf['close'])] = 1
#sar_signal[(sar > xdf['close'])] = -1
#xdf['atr'] = dh.ATR(xdf,chan)
xdata = pd.concat([xdf['tr'].shift(1), xdf['chan_h'].shift(1),
xdf['open']],'chan_h', 'xopen'])
mdf = mdf.join(xdata, how = 'left').fillna(method='ffill')
rng = pd.DataFrame([min_rng * mdf['xopen'], k * mdf['tr']]).max()
long_signal = pd.Series(np.nan, index = mdf.index)
long_signal[(mdf['high'] >= mdf['xopen'] + rng) & (mdf['high'] >= mdf['chan_h']) ] = 1
long_signal[(mdf['low'] <= mdf['xopen'] - rng) | (mdf['low'] <= mdf['chan_l'])] = 0
if close_daily:
long_signal[(mdf['min_id']>=config['exit_min'])] = 0
long_signal = long_signal.shift(1)
long_signal = long_signal.fillna(method='ffill').fillna(0)
short_signal = pd.Series(np.nan, index = mdf.index)
short_signal[(mdf['low'] <= mdf['xopen'] - rng) & (mdf['low'] <= mdf['chan_l']) ] = -1
short_signal[(mdf['high'] >= mdf['xopen'] + rng) | (mdf['high'] >= mdf['chan_h'])] = 0
if close_daily:
short_signal[(mdf['min_id']>=config['exit_min'])] = 0
short_signal = short_signal.shift(1)
short_signal = short_signal.fillna(method='ffill').fillna(0)
if len(mdf[(long_signal>0)&(short_signal<0)]) > 0:
print "Warning: long and short signal happen at the same time"
mdf['pos'] = long_signal + short_signal
mdf.ix[-3:, 'pos'] = 0
mdf['cost'] = abs(mdf['pos'] - mdf['pos'].shift(1)) * (offset + mdf['open'] * tcost)
mdf['cost'] = mdf['cost'].fillna(0.0)
mdf['Traded_price'] = mdf.open + (mdf['pos'] - mdf['pos'].shift(1)) * offset
closed_Trades = backtest.simdf_to_Trades1(mdf, slippage = offset )
return (mdf, closed_Trades)
def run_vec_sim(self):
xdf = self.proc_func(self.df, 'dt_signal'] = 0
addon_signal = copy.deepcopy(mdf['dt_signal'])
mdf['dt_signal'] = mdf['dt_signal'].fillna(method='ffill').fillna(0)
mdf['chan_sig'] = np.nan
if self.combo_signal:
mdf.ix[(up_price >= mdf['chan_h']) & (addon_signal > 0), 'pos'] = 0
mdf['cost'] = abs(mdf['pos'] - mdf['pos'].shift(1)) * (self.offset + mdf['open'] * self.tcost)
mdf['cost'] = mdf['cost'].fillna(0.0)
mdf['Traded_price'] = mdf['open']
self.closed_Trades = simdf_to_Trades1(mdf, slippage = self.offset )
return (mdf, self.closed_Trades)
def getKDJ(close, low):
'''
calculate KDJ value
:param DataFrame close:close price
:param DataFrame high:highest price of a day
:param DataFrame low: lowest price of a day
:return: [DataFrame,DataFrame,DataFrame] [RSV,K,D,KDJ]:KDJ value and some subproducts
'''
# interval over which KDJ is calculated
kdj_interval = 9
N = 3
# calculate RSV
# get the close value to be used
close = pd.DataFrame(close.iloc[(kdj_interval - 1):, :].values)
# calculate maximum in (kdj_interval) days in high value
high_max_in_interval = pd.rolling_max(high, kdj_interval)
# rolling_sum function will set the first (kdj_interval-1) days as np.nan,drop them
high_max_in_interval.dropna(inplace=True)
# set index with 0,1,2...,otherwise it will be kdj_interval,kdj_interval+1,...(may not be explicit but fuck the index)
high_max_in_interval.index = range(high_max_in_interval.shape[0])
low_min_in_interval = pd.rolling_min(low, kdj_interval)
low_min_in_interval.dropna(inplace=True)
low_min_in_interval.index = range(low_min_in_interval.shape[0])
# calculate RSV
RSV = 100 * (close - low_min_in_interval) / (high_max_in_interval - low_min_in_interval)
# replace np.nan and np.inf in RSV because there might be 0 in the denominator of the last formula
RSV.replace([np.nan,-np.inf], inplace=True)
# get matrix shape
[row, col] = RSV.shape
# calculate K
# assuming N equals n in the formula
# initialize both N and K with 50
K = pd.DataFrame(np.zeros([row, col]))
D = pd.DataFrame(np.zeros([row, col]))
K.iloc[0, :] = 50 * np.ones([1, col])
D.iloc[0, col])
# calculate K and D iteratively
for i in range(1, row):
K.iloc[i, :] = (RSV.iloc[i, :] + K.iloc[(i - 1), :]) / N
D.iloc[i, :] = (K.iloc[i, :] - D.iloc[(i - 1), :]) / N
KDJ = 3 * K - 2 * D
return [RSV, K, D, KDJ]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。