Python nose 模块,SkipTest() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nose.SkipTest()。
def skip_if(predicate, reason=None):
"""Skip a test if predicate is true."""
reason = reason or predicate.__name__
from nose import SkipTest
def decorate(fn):
fn_name = fn.__name__
def maybe(*args, **kw):
if predicate():
msg = "'%s' skipped: %s" % (
fn_name, reason)
raise SkipTest(msg)
else:
return fn(*args, **kw)
return function_named(maybe, fn_name)
return decorate
def setup_class(cls):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
'aliases': u'b\xfck2',
'fields': [{'id': 'book', 'type': 'text'},
{'id': 'author',
{'id': 'rating with %', 'type': 'text'}],
'records': [{'book': 'annakarenina', 'author': 'tolstoy',
'rating with %': '90%'},
{'book': 'warandpeace',
'rating with %': '42%'}]
}
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
def setup_class(cls):
wsgiapp = middleware.make_app(config['global_conf'], **config)
cls.app = paste.fixture.TestApp(wsgiapp)
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
p.load('datapusher')
p.load('test_datapusher_plugin')
resource = factories.Resource(url_type='datastore')
cls.dataset = factories.Dataset(resources=[resource])
cls.sysadmin_user = factories.User(name='testsysadmin', sysadmin=True)
cls.normal_user = factories.User(name='annafan')
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def test_interrupted_systemcall(self):
'''
Make sure interrupted system calls don't break the world,since we
can't control what all signals our connection thread will get
'''
if 'linux' not in platform:
raise SkipTest('Unable to reproduce error case on'
' non-linux platforms')
path = 'interrupt_test'
value = b"1"
self.client.create(path, value)
# set the euid to the current process' euid.
# glibc sends SIGRT to all children,which will interrupt the
# system call
os.seteuid(os.geteuid())
# basic sanity test that it worked alright
assert self.client.get(path)[0] == value
def setup_class(cls):
if not pylons.config.get('ckan.datastore.read_url'):
raise nose.SkipTest('Datastore runs on legacy mode,skipping...')
engine = db._get_engine(
{'connection_url': pylons.config['ckan.datastore.write_url']}
)
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
datastore_test_helpers.clear_db(cls.Session)
create_tables = [
u'CREATE TABLE test_a (id_a text)',
u'CREATE TABLE test_b (id_b text)',
u'CREATE TABLE "TEST_C" (id_c text)',
u'CREATE TABLE test_d ("?/?" integer)',
]
for create_table_sql in create_tables:
cls.Session.execute(create_table_sql)
def skip(msg=None):
"""Decorator factory - mark a test function for skipping from test suite.
Parameters
----------
msg : string
Optional message to be added.
Returns
-------
decorator : function
Decorator,which,when applied to a function,causes SkipTest
to be raised,with the optional message added.
"""
return skipif(True,msg)
def test_obj_del(self):
"""Test that object's __del__ methods are called on exit."""
if sys.platform == 'win32':
try:
import win32api
except ImportError:
raise SkipTest("Test requires pywin32")
src = ("class A(object):\n"
" def __del__(self):\n"
" print 'object A deleted'\n"
"a = A()\n")
self.mktmp(py3compat.doctest_refactor_print(src))
if dec.module_not_available('sqlite3'):
err = 'WARNING: IPython History requires SQLite,your history will not be saved\n'
else:
err = None
tt.ipexec_validate(self.fname, 'object A deleted', err)
def test_not_writable_ipdir():
tmpdir = tempfile.mkdtemp()
os.name = "posix"
env.pop('IPYTHON_DIR', None)
env.pop('IPYTHONDIR', None)
env.pop('XDG_CONFIG_HOME', None)
env['HOME'] = tmpdir
ipdir = os.path.join(tmpdir, '.ipython')
os.mkdir(ipdir, 0o555)
try:
open(os.path.join(ipdir, "_foo_"), 'w').close()
except IOError:
pass
else:
# I can still write to an unwritable dir,
# assume I'm root and skip the test
raise SkipTest("I can't create directories that I can't write to")
with AssertPrints('is not a writable location', channel='stderr'):
ipdir = paths.get_ipython_dir()
env.pop('IPYTHON_DIR', None)
def monkeypatch_xunit():
try:
knownfailureif(True)(lambda: None)()
except Exception as e:
KnownFailureTest = type(e)
def addError(self, test, err, capt=None):
if issubclass(err[0], KnownFailureTest):
err = (SkipTest,) + err[1:]
return self.orig_addError(test, capt)
Xunit.orig_addError = Xunit.addError
Xunit.addError = addError
#-----------------------------------------------------------------------------
# Check which dependencies are installed and greater than minimum version.
#-----------------------------------------------------------------------------
def __call__(self, fn):
@decorator
def decorate(fn, *args, **kw):
if self.predicate():
if self.reason:
msg = "'%s' : %s" % (
fn.__name__,
self.reason
)
else:
msg = "'%s': %s" % (
fn.__name__, self.predicate
)
raise SkipTest(msg)
else:
if self._fails_on:
with self._fails_on.fail_if(name=fn.__name__):
return fn(*args, **kw)
else:
return fn(*args, **kw)
return decorate(fn)
def test_status_exceptions_can_be_pickled_across_processes(self):
try:
import jobrunner
except ImportError:
raise SkipTest("jobrunner not installed,skipping")
runner = jobrunner.JobRunner(jobrunner.JobRunner.RUN_MODE_MULTIPROCESS,
runnables=[raise_an_exception],
auto_assert=False)
result = runner.run()[0]
self.assertTrue(result.exception_occured())
self.assertEqual(str(result.err_type), str(nifpga.FifoTimeoutError))
self.assertIn("session: 0xbeef", result.err_class)
if python_version == 2:
self.assertIn("a bogus string argument: 'I am a string'", result.err_class)
else:
self.assertIn("a bogus string argument: b'I am a string'", result.err_class)
def test_complex_signature_py3(self):
if six.PY2:
raise SkipTest()
with LogCapture() as l:
# without this exec Python 2 and pyflakes complain about syntax errors etc
exec (
"""@trace_call(self.logger)
def foo(a,b,c,d,e,*varargs_,f=None,g='G',h='H',i='ii',j='jj',**varkwargs_: None):
pass
foo('a','b',*['c','d'],e='E',f='F',Z='Z',**{'g':'g','h':'h'})
""", locals(), globals()
)
l.check(
(
'test.v0_1.test_base',
'DEBUG',
"calling foo(a='a',b='b',c='c',d='d',"
"g='g',h='h',varkwargs_={'Z': 'Z'},varargs_=<class '%s._empty'>,"
"i='ii',j='jj')" % (INSPECT_MODULE_NAME,) # prefix does not work because of the eval,inspect module is for pypy3
),
)
def testWLS(self):
# WLS centered SS changed (fixed) in 0.5.0
sm_version = sm.version.version
if sm_version < LooseVersion('0.5.0'):
raise nose.SkipTest("WLS centered SS not fixed in statsmodels"
" version {0}".format(sm_version))
X = DataFrame(np.random.randn(30, 4), columns=['A', 'B', 'C', 'D'])
Y = Series(np.random.randn(30))
weights = X.std(1)
self._check_wls(X, Y, weights)
weights.ix[[5, 15]] = np.nan
Y[[2, 21]] = np.nan
self._check_wls(X, weights)
def test_compat():
# test we have compat with our version of nu
from pandas.computation import _NUMEXPR_INSTALLED
try:
import numexpr as ne
ver = ne.__version__
if ver == LooseVersion('2.4.4'):
assert not _NUMEXPR_INSTALLED
elif ver < LooseVersion('2.1'):
with tm.assert_produces_warning(UserWarning,
check_stacklevel=False):
assert not _NUMEXPR_INSTALLED
else:
assert _NUMEXPR_INSTALLED
except ImportError:
raise nose.SkipTest("not testing numexpr version compat")
def check_invalid_numexpr_version(engine, parser):
def testit():
a, b = 1, 2
res = pd.eval('a + b', engine=engine, parser=parser)
tm.assert_equal(res, 3)
if engine == 'numexpr':
try:
import numexpr as ne
except ImportError:
raise nose.SkipTest("no numexpr")
else:
if ne.__version__ < LooseVersion('2.1'):
with tm.assertRaisesRegexp(ImportError, "'numexpr' version is "
".+,must be >= 2.1"):
testit()
elif ne.__version__ == LooseVersion('2.4.4'):
raise nose.SkipTest("numexpr version==2.4.4")
else:
testit()
else:
testit()
def get_expected_pow_result(self, lhs, rhs):
try:
expected = _eval_single_bin(lhs, '**', rhs, self.engine)
except ValueError as e:
msg = 'negative number cannot be raised to a fractional power'
try:
emsg = e.message
except AttributeError:
emsg = e
emsg = u(emsg)
if emsg == msg:
if self.engine == 'python':
raise nose.SkipTest(emsg)
else:
expected = np.nan
else:
raise
return expected
def test_legacy_pickle(self):
if PY3:
raise nose.SkipTest("testing for legacy pickles not "
"support on py3")
path = tm.get_data_path('multiindex_v1.pickle')
obj = pd.read_pickle(path)
obj2 = MultiIndex.from_tuples(obj.values)
self.assertTrue(obj.equals(obj2))
res = obj.get_indexer(obj)
exp = np.arange(len(obj))
assert_almost_equal(res, exp)
res = obj.get_indexer(obj2[::-1])
exp = obj.get_indexer(obj[::-1])
exp2 = obj2.get_indexer(obj2[::-1])
assert_almost_equal(res, exp)
assert_almost_equal(exp, exp2)
def test_quantile_interpolation_np_lt_1p9(self):
# GH #10174
if not _np_version_under1p9:
raise nose.SkipTest("Numpy version is greater than 1.9")
from numpy import percentile
# interpolation = linear (default case)
q = self.ts.quantile(0.1, interpolation='linear')
self.assertEqual(q, percentile(self.ts.valid(), 10))
q1 = self.ts.quantile(0.1)
self.assertEqual(q1, 10))
# interpolation other than linear
expErrMsg = "Interpolation methods other than "
with tm.assertRaisesRegexp(ValueError, expErrMsg):
self.ts.quantile(0.9, interpolation='nearest')
# object dtype
with tm.assertRaisesRegexp(ValueError, expErrMsg):
q = Series(self.ts, dtype=object).quantile(0.7,
interpolation='higher')
def test_to_excel(self):
try:
import xlwt # noqa
import xlrd # noqa
import openpyxl # noqa
from pandas.io.excel import ExcelFile
except ImportError:
raise nose.SkipTest("need xlwt xlrd openpyxl")
for ext in ['xls', 'xlsx']:
path = '__tmp__.' + ext
with ensure_clean(path) as path:
self.panel.to_excel(path)
try:
reader = ExcelFile(path)
except ImportError:
raise nose.SkipTest("need xlwt xlrd openpyxl")
for item, df in self.panel.iteritems():
recdf = reader.parse(str(item), index_col=0)
assert_frame_equal(df, recdf)
def test_to_excel_xlsxwriter(self):
try:
import xlrd # noqa
import xlsxwriter # noqa
from pandas.io.excel import ExcelFile
except ImportError:
raise nose.SkipTest("Requires xlrd and xlsxwriter. Skipping test.")
path = '__tmp__.xlsx'
with ensure_clean(path) as path:
self.panel.to_excel(path, engine='xlsxwriter')
try:
reader = ExcelFile(path)
except ImportError as e:
raise nose.SkipTest("cannot write excel file: %s" % e)
for item, df in self.panel.iteritems():
recdf = reader.parse(str(item), index_col=0)
assert_frame_equal(df, recdf)
def test_setitem_ndarray(self):
raise nose.SkipTest("skipping for now")
# from pandas import DateRange,datetools
# timeidx = DateRange(start=datetime(2009,1,1),
# end=datetime(2009,12,31),
# offset=datetools.MonthEnd())
# lons_coarse = np.linspace(-177.5,177.5,72)
# lats_coarse = np.linspace(-87.5,87.5,36)
# P = Panel(items=timeidx,major_axis=lons_coarse,
# minor_axis=lats_coarse)
# data = np.random.randn(72*36).reshape((72,36))
# key = datetime(2009,2,28)
# P[key] = data#
# assert_almost_equal(P[key].values,data)
def test_shift(self):
raise nose.SkipTest("skipping for now")
# # major
# idx = self.panel.major_axis[0]
# idx_lag = self.panel.major_axis[1]
# shifted = self.panel.shift(1)
# assert_frame_equal(self.panel.major_xs(idx),
# shifted.major_xs(idx_lag))
# # minor
# idx = self.panel.minor_axis[0]
# idx_lag = self.panel.minor_axis[1]
# shifted = self.panel.shift(1,axis='minor')
# assert_frame_equal(self.panel.minor_xs(idx),
# shifted.minor_xs(idx_lag))
# self.assertRaises(Exception,self.panel.shift,axis='items')
def test_multiindex_get(self):
raise nose.SkipTest("skipping for now")
# ind = MultiIndex.from_tuples([('a',('a',2),('b',2)],
# names=['first','second'])
# wp = Panel(np.random.random((4,5,5)),
# items=ind,
# major_axis=np.arange(5),
# minor_axis=np.arange(5))
# f1 = wp['a']
# f2 = wp.ix['a']
# assert_panel_equal(f1,f2)
# self.assertTrue((f1.items == [1,2]).all())
# self.assertTrue((f2.items == [1,2]).all())
# ind = MultiIndex.from_tuples([('a',1)],'second'])
def test_unpacker_hook_refcnt(self):
if not hasattr(sys, 'getrefcount'):
raise nose.SkipTest('no sys.getrefcount()')
result = []
def hook(x):
result.append(x)
return x
basecnt = sys.getrefcount(hook)
up = Unpacker(object_hook=hook, list_hook=hook)
assert sys.getrefcount(hook) >= basecnt + 2
up.feed(packb([{}]))
up.feed(packb([{}]))
assert up.unpack() == [{}]
assert up.unpack() == [{}]
assert result == [{}, [{}], {}, [{}]]
del up
assert sys.getrefcount(hook) == basecnt
def test_built_in_round(self):
if not compat.PY3:
raise nose.SkipTest("build in round cannot be overriden "
"prior to Python 3")
# GH11763
# Here's the test frame we'll be working with
df = DataFrame(
{'col1': [1.123, 2.123, 3.123], 'col2': [1.234, 2.234, 3.234]})
# Default round to integer (i.e. decimals=0)
expected_rounded = DataFrame(
{'col1': [1., 2., 3.], 'col2': [1., 3.]})
assert_frame_equal(round(df), expected_rounded)
# Clip
def test_split_block_at(self):
# with dup column support this method was taken out
# GH3679
raise nose.SkipTest("skipping for now")
bs = list(self.fblock.split_block_at('a'))
self.assertEqual(len(bs), 1)
self.assertTrue(np.array_equal(bs[0].items, ['c', 'e']))
bs = list(self.fblock.split_block_at('c'))
self.assertEqual(len(bs), 2)
self.assertTrue(np.array_equal(bs[0].items, ['a']))
self.assertTrue(np.array_equal(bs[1].items, ['e']))
bs = list(self.fblock.split_block_at('e'))
self.assertEqual(len(bs), ['a', 'c']))
# bblock = get_bool_ex(['f'])
# bs = list(bblock.split_block_at('f'))
# self.assertEqual(len(bs),0)
def test_partial_ix_missing(self):
raise nose.SkipTest("skipping for now")
result = self.ymd.ix[2000, 0]
expected = self.ymd.ix[2000]['A']
assert_series_equal(result, expected)
# need to put in some work here
# self.ymd.ix[2000,0] = 0
# self.assertTrue((self.ymd.ix[2000]['A'] == 0).all())
# Pretty sure the second (and maybe even the first) is already wrong.
self.assertRaises(Exception, self.ymd.ix.__getitem__, (2000, 6))
self.assertRaises(Exception, 6), 0)
# ---------------------------------------------------------------------
def test_header_not_enough_lines_as_recarray(self):
if compat.is_platform_windows():
raise nose.SkipTest(
"segfaults on win-64,only when all tests are run")
data = ('skip this\n'
'skip this\n'
'a,c\n'
'1,3\n'
'4,6')
reader = TextReader(StringIO(data), delimiter=',', header=2,
as_recarray=True)
header = reader.header
expected = [['a', 'b', 'c']]
self.assertEqual(header, expected)
recs = reader.read()
expected = {'a': [1, 4], 'b': [2, 5], 'c': [3, 6]}
assert_array_dicts_equal(expected, recs)
# not enough rows
self.assertRaises(parser.CParserError, TextReader, StringIO(data),
delimiter=', header=5, as_recarray=True)
def test_numpy_string_dtype_as_recarray(self):
data = """\
a,1
aa,2
aaa,3
aaaa,4
aaaaa,5"""
if compat.is_platform_windows():
raise nose.SkipTest(
"segfaults on win-64,only when all tests are run")
def _make_reader(**kwds):
return TextReader(StringIO(data), header=None,
**kwds)
reader = _make_reader(dtype='S4', as_recarray=True)
result = reader.read()
self.assertEqual(result['0'].dtype, 'S4')
ex_values = np.array(['a', 'aa', 'aaa', 'aaaa', 'aaaa'], dtype='S4')
self.assertTrue((result['0'] == ex_values).all())
self.assertEqual(result['1'].dtype, 'S4')
def configure(self, options, conf):
super(NoseSQLAlchemy, self).configure(options, conf)
plugin_base.pre_begin(options)
plugin_base.set_coverage_flag(options.enable_plugin_coverage)
plugin_base.set_skip_test(nose.SkipTest)
def skip(reason):
"""
Unconditionally skip a test.
"""
def decorator(test_item):
if not (isinstance(test_item, type) and issubclass(test_item, TestCase)):
@functools.wraps(test_item)
def skip_wrapper(*args, **kwargs):
raise SkipTest(reason)
test_item = skip_wrapper
test_item.__unittest_skip__ = True
test_item.__unittest_skip_why__ = reason
return test_item
return decorator
def setUp(self):
super(BaseZMQTestCase, self).setUp()
if self.green and not have_gevent:
raise SkipTest("requires gevent")
self.context = self.Context.instance()
self.sockets = []
def ping_pong_json(self, s1, s2, o):
if jsonapi.jsonmod is None:
raise SkipTest("No json library")
s1.send_json(o)
o2 = s2.recv_json()
s2.send_json(o2)
o3 = s1.recv_json()
return o3
def assertRaisesErrno(self, errno, func, **kwargs):
if errno == zmq.EAGAIN:
raise SkipTest("Skipping because we're green.")
try:
func(*args, **kwargs)
except zmq.ZMQError:
e = sys.exc_info()[1]
self.assertEqual(e.errno, "wrong error raised,expected '%s' \
got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
else:
self.fail("Function did not raise any error")
def skip_green(self):
raise SkipTest("Skipping because we are green")
def setUp(self):
if not socks:
raise nose.SkipTest('socks module unavailable')
if not subprocess:
raise nose.SkipTest('subprocess module unavailable')
# start a short-lived miniserver so we can get a likely port
# for the proxy
self.httpd, self.proxyport = miniserver.start_server(
miniserver.ThisDirHandler)
self.httpd.shutdown()
self.httpd, self.port = miniserver.start_server(
miniserver.ThisDirHandler)
self.pidfile = tempfile.mktemp()
self.logfile = tempfile.mktemp()
fd, self.conffile = tempfile.mkstemp()
f = os.fdopen(fd, 'w')
our_cfg = tinyproxy_cfg % {'user': os.getlogin(),
'pidfile': self.pidfile,
'port': self.proxyport,
'logfile': self.logfile}
f.write(our_cfg)
f.close()
try:
# TODO use subprocess.check_call when 2.4 is dropped
ret = subprocess.call(['tinyproxy', '-c', self.conffile])
self.assertEqual(0, ret)
except OSError, e:
if e.errno == errno.ENOENT:
raise nose.SkipTest('tinyproxy not available')
raise
def configure(self, conf)
plugin_base.pre_begin(options)
plugin_base.set_coverage_flag(options.enable_plugin_coverage)
plugin_base.set_skip_test(nose.SkipTest)
def test_plugin_load():
try:
# attempt to load dependencies of plugins
import pldns
except ImportError:
raise nose.SkipTest
expected_names = set(['TFO', 'ECN', 'DSCP', 'UDPZero', 'UDPOpts', 'DNSResolv', 'H2', 'EvilBit'])
names = set()
for plugin in pathspider.cmd.measure.plugins:
assert issubclass(plugin, pathspider.base.Spider)
names.add(plugin.__name__)
assert names == expected_names
def test_issue_23():
if not IN_TRAVIS:
raise SkipTest()
pe.get_book(url="https://github.com/pyexcel/pyexcel-ods/raw/master/tests/fixtures/white_space.ods", library='pyexcel-odsr'); # flake8: noqa
def setup_class(cls):
if p.toolkit.check_ckan_version(max_version='2.4.99'):
raise nose.SkipTest()
super(TestTranslations, cls).setup_class()
def setup_class(cls):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
resource = model.Package.get('annakarenina').resources[0]
cls.data = dict(
resource_id=resource.id,
force=True,
fields=[
{'id': 'id'},
{'id': 'date', 'type':'date'},
{'id': 'x'},
{'id': 'y'},
{'id': 'z'},
{'id': 'country'},
{'id': 'title'},
{'id': 'lat'},
{'id': 'lon'}
],
records=[
{'id': 0, 'date': '2011-01-01', 'x': 1, 'y': 2, 'z': 3, 'country': 'DE', 'title': 'first 99', 'lat':52.56, 'lon':13.40},
{'id': 1, 'date': '2011-02-02', 'x': 2, 'y': 4, 'z': 24, 'country': 'UK', 'title': 'second', 'lat':54.97, 'lon':-1.60},
{'id': 2, 'date': '2011-03-03', 'x': 3, 'y': 6, 'z': 9, 'country': 'US', 'title': 'third', 'lat':40.00, 'lon':-75.5},
{'id': 3, 'date': '2011-04-04', 'x': 4, 'y': 8, 'z': 6, 'title': 'fourth', 'lat':57.27, 'lon':-6.20},
{'id': 4, 'date': '2011-05-04', 'x': 5, 'y': 10, 'z': 15, 'title': 'fifth', 'lat':51.58, 'lon':0},
{'id': 5, 'date': '2011-06-02', 'x': 6, 'y': 12, 'z': 18, 'title': 'sixth 53.56', 'lat':51.04, 'lon':7.9}
]
)
postparams = '%s=1' % json.dumps(cls.data)
auth = {'Authorization': str(cls.normal_user.apikey)}
res = cls.app.post('/api/action/datastore_create', params=postparams,
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
def test_pg_version_check(self):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
engine = db._get_engine(
{'connection_url': config['sqlalchemy.url']})
connection = engine.connect()
assert db._pg_version_is_at_least(connection, '8.0')
assert not db._pg_version_is_at_least(connection, '10.0')
def setup_class(cls):
wsgiapp = middleware.make_app(config['global_conf'], **config)
cls.app = paste.fixture.TestApp(wsgiapp)
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
'force': True,
'aliases': 'books',
'fields': [{'id': u'b\xfck',
{'id': 'published'},
{'id': u'characters', u'type': u'_text'},
{'id': 'random_letters', 'type': 'text[]'}],
'records': [{u'b\xfck': 'annakarenina',
'author': 'tolstoy',
'published': '2005-03-01',
'nested': ['b', {'moo': 'moo'}],
u'characters': [u'Princess Anna', u'Sergius'],
'random_letters': ['a', 'e', 'x']},
{u'b\xfck': 'warandpeace',
'nested': {'a': 'b'}, 'random_letters': []}]
}
postparams = '%s=1' % json.dumps(cls.data)
auth = {'Authorization': str(cls.sysadmin_user.apikey)}
res = cls.app.post('/api/action/datastore_create',
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
engine = db._get_engine({
'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def setup_class(cls):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
resource = model.Package.get('annakarenina').resources[0]
cls.data = {
'resource_id': resource.id,
{'id': 'nested', 'type': 'json'},
{'id': 'characters', 'type': 'text[]'},
{'id': 'published'}],
'primary_key': u'b\xfck',
'published': '2005-03-01', 'nested': ['b', {'moo': 'moo'}]},
'nested': {'a':'b'}}
]
}
postparams = '%s=1' % json.dumps(cls.data)
auth = {'Authorization': str(cls.sysadmin_user.apikey)}
res = cls.app.post('/api/action/datastore_create',
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def setup_class(cls):
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
resource = model.Package.get('annakarenina').resources[0]
hhguide = u"hitchhiker's guide to the galaxy"
cls.data = {
'resource_id': resource.id,
'nested': {'a':'b'}},
{'author': 'adams',
'characters': ['Arthur Dent', 'Marvin'],
'nested': {'foo': 'bar'},
u'b\xfck': hhguide}
]
}
postparams = '%s=1' % json.dumps(cls.data)
auth = {'Authorization': str(cls.sysadmin_user.apikey)}
res = cls.app.post('/api/action/datastore_create',
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def setup_class(cls):
if not is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
plugin = p.load('datastore')
if plugin.legacy_mode:
# make sure we undo adding the plugin
p.unload('datastore')
raise nose.SkipTest("Info is not supported in legacy mode")
def setup_class(cls):
wsgiapp = middleware.make_app(config['global_conf'], **config)
cls.app = paste.fixture.TestApp(wsgiapp)
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
def setup_class(cls):
wsgiapp = middleware.make_app(config['global_conf'], **config)
cls.app = paste.fixture.TestApp(wsgiapp)
if not tests.is_datastore_supported():
raise nose.SkipTest("Datastore not supported")
p.load('datastore')
p.load('datapusher')
ctd.CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
engine = db._get_engine(
{'connection_url': config['ckan.datastore.write_url']})
cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
set_url_type(
model.Package.get('annakarenina').resources, cls.sysadmin_user)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。