Python pandas 模块,Int64Index() 实例源码
我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用pandas.Int64Index()。
def __init__(self, data, **kwargs):
assert isinstance(data.index, pd.tseries.index.DatetimeIndex)
# Only accept integer SIDs as the items of the DataFrame
assert isinstance(data.columns, pd.Int64Index)
# Todo is ffilling correct/necessary?
# Forward fill prices
self.data = data.fillna(method='ffill')
# Unpack config dictionary with default values.
self.start = kwargs.get('start', self.data.index[0])
self.end = kwargs.get('end', self.data.index[-1])
self.sids = self.data.columns
# Hash_value for downstream sorting.
self.arg_string = hash_args(data, **kwargs)
self._raw_data = None
self.started_sids = set()
def __init__(self, **kwargs):
assert isinstance(data.major_axis, pd.tseries.index.DatetimeIndex)
# Only accept integer SIDs as the items of the Panel
assert isinstance(data.items, pd.Int64Index)
# Todo is ffilling correct/necessary?
# forward fill with volumes of 0
self.data = data.fillna(value={'volume': 0})
# Unpack config dictionary with default values.
self.start = kwargs.get('start', self.data.major_axis[0])
self.end = kwargs.get('end', self.data.major_axis[-1])
self.sids = self.data.items
# Hash_value for downstream sorting.
self.arg_string = hash_args(data, **kwargs)
self._raw_data = None
self.started_sids = set()
def _dt_to_epoch_ns(dt_series):
"""Convert a timeseries into an Int64Index of nanoseconds since the epoch.
Parameters
----------
dt_series : pd.Series
The timeseries to convert.
Returns
-------
idx : pd.Int64Index
The index converted to nanoseconds since the epoch.
"""
index = pd.to_datetime(dt_series.values)
if index.tzinfo is None:
index = index.tz_localize('UTC')
else:
index = index.tz_convert('UTC')
return index.view(np.int64)
def test_abc_types(self):
self.assertisinstance(pd.Index(['a', 'b', 'c']), com.ABCIndex)
self.assertisinstance(pd.Int64Index([1, 2, 3]), com.ABCInt64Index)
self.assertisinstance(pd.Float64Index([1, com.ABCFloat64Index)
self.assertisinstance(self.multi_index, com.ABCMultiIndex)
self.assertisinstance(self.datetime_index, com.ABCDatetimeIndex)
self.assertisinstance(self.timedelta_index, com.ABCtimedeltaIndex)
self.assertisinstance(self.period_index, com.ABCPeriodindex)
self.assertisinstance(self.categorical_df.index,
com.ABCCategoricalIndex)
self.assertisinstance(pd.Index(['a', com.ABCIndexClass)
self.assertisinstance(pd.Int64Index([1, com.ABCIndexClass)
self.assertisinstance(pd.Series([1, com.ABCSeries)
self.assertisinstance(self.df, com.ABCDataFrame)
self.assertisinstance(self.df.to_panel(), com.ABCPanel)
self.assertisinstance(self.sparse_series, com.ABCSparseSeries)
self.assertisinstance(self.sparse_array, com.ABCSparseArray)
self.assertisinstance(self.categorical, com.ABCCategorical)
self.assertisinstance(pd.Period('2012', freq='A-DEC'), com.ABCPeriod)
def setattributeindex(self, instance, value):
bus_name = instance.bus.index
instance.branch['F_BUS'] = instance.branch['F_BUS'].apply(lambda x: value[bus_name.get_loc(x)])
instance.branch['T_BUS'] = instance.branch['T_BUS'].apply(lambda x: value[bus_name.get_loc(x)])
instance.gen['GEN_BUS'] = instance.gen['GEN_BUS'].apply(lambda x: value[bus_name.get_loc(x)])
try:
instance.load.columns = [v for b, v in zip(instance.bus_name.isin(instance.load.columns), value) if b == True]
except ValueError:
instance.load.columns = value
except AttributeError:
instance.load = pd.DataFrame(0, index=range(0, 1), columns=value, dtype='float')
instance.bus.index = value
if isinstance(instance.bus_name, pd.RangeIndex) or isinstance(instance.bus_name, pd.Int64Index):
logger.debug('Forcing string types for all bus names')
instance.bus_name = ['Bus{}'.format(b) for b in instance.bus_name]
def setUpClass(cls):
cls.__calendar = date_range('2014', '2015', freq=Trading_day)
cls.__assets = assets = Int64Index(arange(1, 20))
cls.__tmp_finder_ctx = tmp_asset_finder(
equities=make_simple_equity_info(
assets,
cls.__calendar[0],
cls.__calendar[-1],
)
)
cls.__finder = cls.__tmp_finder_ctx.__enter__()
cls.__mask = cls.__finder.lifetimes(
cls.__calendar[-30:],
include_start_date=False,
)
def test_outer_join_sort(self):
left_idx = Index(np.random.permutation(15))
right_idx = tm.makeDateIndex(10)
with tm.assert_produces_warning(RuntimeWarning):
joined = left_idx.join(right_idx, how='outer')
# right_idx in this case because DatetimeIndex has join precedence over
# Int64Index
with tm.assert_produces_warning(RuntimeWarning):
expected = right_idx.astype(object).union(left_idx.astype(object))
tm.assert_index_equal(joined, expected)
def test_reindex_doesnt_preserve_type_if_target_is_empty_index(self):
# GH7774
idx = pd.Index(list('abc'))
def get_reindex_type(target):
return idx.reindex(target)[0].dtype.type
self.assertEqual(get_reindex_type(pd.Int64Index([])), np.int64)
self.assertEqual(get_reindex_type(pd.Float64Index([])), np.float64)
self.assertEqual(get_reindex_type(pd.DatetimeIndex([])), np.datetime64)
reindexed = idx.reindex(pd.MultiIndex(
[pd.Int64Index([]), pd.Float64Index([])], [[], []]))[0]
self.assertEqual(reindexed.levels[0].dtype.type, np.int64)
self.assertEqual(reindexed.levels[1].dtype.type, np.float64)
def setattributeindex(self, value):
instance.gen.index = value
instance.gencost.index = value
if isinstance(instance.gen_name, pd.Int64Index):
instance.gen_name = ['GenCo{}'.format(g) for g in instance.gen_name]
def shift_dates(self,h):
""" Auxiliary function for creating dates for forecasts
Parameters
----------
h : int
How many steps to forecast
Returns
----------
A transformed date_index object
"""
date_index = copy.deepcopy(self.index)
date_index = date_index[self.max_lag:len(date_index)]
if self.is_pandas is True:
if isinstance(date_index, pd.core.indexes.datetimes.DatetimeIndex):
if pd.infer_freq(date_index) in ['H', 'M', 'S']:
for t in range(h):
date_index += pd.DateOffset((date_index[len(date_index)-1] - date_index[len(date_index)-2]).seconds)
else: # Assume higher frequency (configured for days)
for t in range(h):
date_index += pd.DateOffset((date_index[len(date_index)-1] - date_index[len(date_index)-2]).days)
elif isinstance(date_index, pd.core.indexes.numeric.Int64Index):
for i in range(h):
new_value = date_index.values[len(date_index.values)-1] + (date_index.values[len(date_index.values)-1] - date_index.values[len(date_index.values)-2])
date_index = pd.Int64Index(np.append(date_index.values,new_value))
else:
for t in range(h):
date_index.append(date_index[len(date_index)-1]+1)
return date_index
def get_adjustments(self,
zero_qtr_data,
requested_qtr_data,
last_per_qtr,
dates,
assets,
columns,
**kwargs):
"""
Creates an AdjustedArray from the given estimates data for the given
dates.
Parameters
----------
zero_qtr_data : pd.DataFrame
The 'time zero' data for each calendar date per sid.
requested_qtr_data : pd.DataFrame
The requested quarter data for each calendar date per sid.
last_per_qtr : pd.DataFrame
A DataFrame with a column MultiIndex of [self.estimates.columns,
normalized_quarters,sid] that allows easily getting the timeline
of estimates for a particular sid for a particular quarter.
dates : pd.DatetimeIndex
The calendar dates for which estimates data is requested.
assets : pd.Int64Index
An index of all the assets from the raw data.
columns : list of BoundColumn
The columns for which adjustments need to be calculated.
kwargs :
Additional keyword arguments that should be forwarded to
`get_adjustments_for_sid` and to be used in computing adjustments
for each sid.
Returns
-------
col_to_all_adjustments : dict[int -> AdjustedArray]
A dictionary of all adjustments that should be applied.
"""
zero_qtr_data.sort_index(inplace=True)
# Here we want to get the LAST record from each group of records
# corresponding to a single quarter. This is to ensure that we select
# the most up-to-date event date in case the event date changes.
quarter_shifts = zero_qtr_data.groupby(
level=[SID_FIELD_NAME, norMALIZED_QUARTERS]
).nth(-1)
col_to_all_adjustments = {}
sid_to_idx = dict(zip(assets, range(len(assets))))
quarter_shifts.groupby(level=SID_FIELD_NAME).apply(
self.get_adjustments_for_sid,
dates,
requested_qtr_data,
last_per_qtr,
sid_to_idx,
columns,
col_to_all_adjustments,
**kwargs
)
return col_to_all_adjustments
def test_categorical_df_concat(self):
inp = [
pd.DataFrame(
{
'A': pd.Series(['a', 'c'], dtype='category'),
'B': pd.Series([100, 102, 103], dtype='int64'),
'C': pd.Series(['x', 'x', 'x'],
}
),
pd.DataFrame(
{
'A': pd.Series(['c', 'd'],
'B': pd.Series([103, 104],
'C': pd.Series(['y', 'y', 'y'],
pd.DataFrame(
{
'A': pd.Series(['a',
'B': pd.Series([101,
'C': pd.Series(['z', 'z', 'z'],
]
result = categorical_df_concat(inp)
expected = pd.DataFrame(
{
'A': pd.Series(
['a', 'c', 'd', 'a',
dtype='category'
),
'B': pd.Series(
[100, 103, 104, 101,
dtype='int64'
),
'C': pd.Series(
['x',
},
)
expected.index = pd.Int64Index([0, 1, 0, 2])
assert_equal(expected, result)
assert_equal(
expected['A'].cat.categories,
result['A'].cat.categories
)
assert_equal(
expected['C'].cat.categories,
result['C'].cat.categories
)
def infer_freq(index, warn=True):
"""
Infer the most likely frequency given the input index. If the frequency is
uncertain,a warning will be printed.
Parameters
----------
index : DatetimeIndex or timedeltaIndex
if passed a Series will use the values of the series (NOT THE INDEX)
warn : boolean,default True
Returns
-------
freq : string or None
None if no discernible frequency
TypeError if the index is not datetime-like
ValueError if there are less than three values.
"""
import pandas as pd
if isinstance(index, com.ABCSeries):
values = index._values
if not (com.is_datetime64_dtype(values) or
com.is_timedelta64_dtype(values) or
values.dtype == object):
raise TypeError("cannot infer freq from a non-convertible "
"dtype on a Series of {0}".format(index.dtype))
index = values
if com.is_period_arraylike(index):
raise TypeError("Periodindex given. Check the `freq` attribute "
"instead of using infer_freq.")
elif isinstance(index, pd.timedeltaIndex):
inferer = _timedeltaFrequencyInferer(index, warn=warn)
return inferer.get_freq()
if isinstance(index, pd.Index) and not isinstance(index, pd.DatetimeIndex):
if isinstance(index, (pd.Int64Index, pd.Float64Index)):
raise TypeError("cannot infer freq from a non-convertible index "
"type {0}".format(type(index)))
index = index.values
if not isinstance(index, pd.DatetimeIndex):
try:
index = pd.DatetimeIndex(index)
except AmbiguousTimeError:
index = pd.DatetimeIndex(index.asi8)
inferer = _FrequencyInferer(index, warn=warn)
return inferer.get_freq()
def test_constructor_dtypes(self):
for idx in [Index(np.array([1, 3], dtype=int)),
Index(np.array([1, dtype=int),
Index([1, dtype=int)]:
self.assertisinstance(idx, Int64Index)
# these should coerce
for idx in [Index(np.array([1., 2., 3.], dtype=float),
Index([1., Int64Index)
for idx in [Index(np.array([1., dtype=float)),
Index(np.array([1., dtype=float)]:
self.assertisinstance(idx, Float64Index)
for idx in [Index(np.array([True, False, True], dtype=bool)),
Index([True, True]),
Index(np.array([True, dtype=bool), dtype=bool)]:
self.assertisinstance(idx, Index)
self.assertEqual(idx.dtype, object)
for idx in [Index(np.array([1,
Index(np.array([np.datetime64('2011-01-01'),
np.datetime64('2011-01-02')]),
Index([datetime(2011, datetime(2011, 2)], dtype='category')]:
self.assertisinstance(idx, CategoricalIndex)
for idx in [Index(np.array([np.datetime64('2011-01-01'),
np.datetime64('2011-01-02')])), 2)])]:
self.assertisinstance(idx, DatetimeIndex)
for idx in [Index(np.array([np.datetime64('2011-01-01'), dtype=object),
datetime(2011, dtype=object)]:
self.assertNotisinstance(idx, DatetimeIndex)
self.assertisinstance(idx, object)
for idx in [Index(np.array([np.timedelta64(1, 'D'), np.timedelta64(
1, 'D')])), Index([timedelta(1), timedelta(1)])]:
self.assertisinstance(idx, timedeltaIndex)
for idx in [Index(np.array([np.timedelta64(1,
np.timedelta64(1, 'D')]),
Index([timedelta(1), timedelta(1)], timedeltaIndex)
self.assertisinstance(idx, object)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。