Python unittest2 模块,SkipTest() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用unittest2.SkipTest()。
def download_test_files_if_not_present(self):
'''
Download %s file at G-node for testing
url_for_tests is global at beginning of this file.
''' % self.ioclass.__name__
if not self.use_network:
raise unittest.SkipTest("Requires download of data from the web")
url = url_for_tests+self.shortname
try:
make_all_directories(self.files_to_download, self.local_test_dir)
download_test_file(self.files_to_download,
self.local_test_dir, url)
except IOError as exc:
raise unittest.SkipTest(exc)
def test_paged_result_handling(self):
if PROTOCOL_VERSION < 2:
raise unittest.SkipTest("Paging requires native protocol 2+,currently using: {0}".format(PROTOCOL_VERSION))
# addresses #225
class PagingTest(Model):
id = columns.Integer(primary_key=True)
val = columns.Integer()
sync_table(PagingTest)
PagingTest.create(id=1, val=1)
PagingTest.create(id=2, val=2)
session = get_session()
with mock.patch.object(session, 'default_fetch_size', 1):
results = PagingTest.objects()[:]
assert len(results) == 2
def setUp(self):
"""
Test is skipped if run with native protocol version <4
"""
self.support_v5 = True
if PROTOCOL_VERSION < 4:
raise unittest.SkipTest(
"Native protocol 4,0+ is required for custom payloads,currently using %r"
% (PROTOCOL_VERSION,))
try:
self.cluster = Cluster(protocol_version=ProtocolVersion.MAX_SUPPORTED, allow_beta_protocol_version=True)
self.session = self.cluster.connect()
except NoHostAvailable:
log.info("Protocol Version 5 not supported,")
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
self.session = self.cluster.connect()
self.support_v5 = False
self.nodes_currently_failing = []
self.node1, self.node2, self.node3 = get_cluster().nodes.values()
def setUp(self):
"""
Test is skipped if run with cql version < 2
"""
if PROTOCOL_VERSION < 2:
raise unittest.SkipTest(
"Protocol 2.0+ is required for Lightweight transactions,currently testing against %r"
% (PROTOCOL_VERSION,))
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
self.session = self.cluster.connect()
ddl = '''
CREATE TABLE test3rf.lwt (
k int PRIMARY KEY,
v int )'''
self.session.execute(ddl)
def test_cql_compatibility(self):
if CASS_SERVER_VERSION >= (3, 0):
raise unittest.SkipTest("cql compatibility does not apply Cassandra 3.0+")
# having more than one non-PK column is okay if there aren't any
# clustering columns
create_statement = self.make_create_statement(["a"], [], ["b", "c", "d"], compact=True)
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
self.assertEqual([], tablemeta.clustering_key)
self.assertEqual([u'a', u'b', u'c', u'd'], sorted(tablemeta.columns.keys()))
self.assertTrue(tablemeta.is_cql_compatible)
# ... but if there are clustering columns,it's not CQL compatible.
# This is a hacky way to simulate having clustering columns.
tablemeta.clustering_key = ["foo", "bar"]
tablemeta.columns["foo"] = None
tablemeta.columns["bar"] = None
self.assertFalse(tablemeta.is_cql_compatible)
def test_replicas(self):
"""
Ensure cluster.metadata.get_replicas return correctly when not attached to keyspace
"""
if murmur3 is None:
raise unittest.SkipTest('the murmur3 extension is not available')
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
self.assertEqual(cluster.metadata.get_replicas('test3rf', 'key'), [])
cluster.connect('test3rf')
self.assertNotEqual(list(cluster.metadata.get_replicas('test3rf', six.b('key'))), [])
host = list(cluster.metadata.get_replicas('test3rf', six.b('key')))[0]
self.assertEqual(host.datacenter, 'dc1')
self.assertEqual(host.rack, 'r1')
cluster.shutdown()
def test_paged_result_handling(self):
if PROTOCOL_VERSION < 2:
raise unittest.SkipTest(
"Paging requires native protocol 2+,"
"currently using: {0}".format(PROTOCOL_VERSION)
)
# addresses #225
class PagingTest(Model):
id = columns.Integer(primary_key=True)
val = columns.Integer()
sync_table(self.conn, PagingTest)
PagingTest.create(self.conn, id=1, val=1)
PagingTest.create(self.conn, id=2, val=2)
with mock.patch.object(self.conn.session, 1):
results = PagingTest.objects().find_all(self.conn)
assert len(results) == 2
drop_table(self.conn, PagingTest)
def test_skiptest_in_setupclass(self):
class Test(unittest2.TestCase):
@classmethod
def setUpClass(cls):
raise unittest2.SkipTest('foo')
def test_one(self):
pass
def test_two(self):
pass
result = self.runTests(Test)
self.assertEqual(result.testsRun, 0)
self.assertEqual(len(result.errors), 0)
self.assertEqual(len(result.skipped), 1)
skipped = result.skipped[0][0]
self.assertEqual(str(skipped),
'setUpClass (%s.%s)' % (__name__,
getattr(Test, '__qualname__', Test.__name__)))
def test_skiptest_in_setupmodule(self):
class Test(unittest2.TestCase):
def test_one(self):
pass
def test_two(self):
pass
class Module(object):
@staticmethod
def setUpModule():
raise unittest2.SkipTest('foo')
Test.__module__ = 'Module'
sys.modules['Module'] = Module
result = self.runTests(Test)
self.assertEqual(result.testsRun, 'setUpModule (Module)')
def test_discover_with_init_module_that_raises_SkipTest_on_import(self):
vfs = {abspath('/foo'): ['my_package'],
abspath('/foo/my_package'): ['__init__.py', 'test_module.py']}
self.setup_import_issue_package_tests(vfs)
import_calls = []
def _get_module_from_name(name):
import_calls.append(name)
raise unittest.SkipTest('skipperoo')
loader = unittest.TestLoader()
loader._get_module_from_name = _get_module_from_name
suite = loader.discover(abspath('/foo'))
self.assertIn(abspath('/foo'), sys.path)
self.assertEqual(suite.countTestCases(), 1)
result = unittest.TestResult()
suite.run(result)
self.assertEqual(len(result.skipped), 1)
self.assertEqual(result.testsRun, 1)
self.assertEqual(import_calls, ['my_package'])
# Check picklability
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
pickle.loads(pickle.dumps(suite, proto))
def assert_garbage_collect_test_after_run(self, TestSuiteClass):
if not unittest.BaseTestSuite._cleanup:
raise unittest.SkipTest("Suite cleanup is disabled")
class Foo(unittest.TestCase):
def test_nothing(self):
pass
test = Foo('test_nothing')
wref = weakref.ref(test)
suite = TestSuiteClass([wref()])
suite.run(unittest.TestResult())
del test
# for the benefit of non-reference counting implementations
gc.collect()
self.assertEqual(suite._tests, [None])
self.assertIsNone(wref())
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def test_json(self):
args = self.databases[0].copy()
args["charset"] = "utf8mb4"
conn = pymysql.connect(**args)
if not self.mysql_server_is(conn, (5, 7, 0)):
raise SkipTest("JSON type is not supported on MySQL <= 5.6")
self.safe_create_table(conn, "test_json", """\
create table test_json (
id int not null,
json JSON not null,
primary key (id)
);""")
cur = conn.cursor()
json_str = u'{"hello": "?????"}'
cur.execute("INSERT INTO test_json (id,`json`) values (42,%s)", (json_str,))
cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
res = cur.fetchone()[0]
self.assertEqual(json.loads(res), json.loads(json_str))
cur.execute("SELECT CAST(%s AS JSON) AS x",))
res = cur.fetchone()[0]
self.assertEqual(json.loads(res), json.loads(json_str))
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def test_json(self):
args = self.databases[0].copy()
args["charset"] = "utf8mb4"
conn = pymysql.connect(**args)
if not self.mysql_server_is(conn, json.loads(json_str))
def requires(resource, msg=None):
"""Raise ResourceDenied if the specified resource is not available.
If the caller's module is __main__ then automatically return True. The
possibility of False being returned occurs when regrtest.py is
executing.
"""
if resource == 'gui' and not _is_gui_available():
raise unittest.SkipTest("Cannot use the 'gui' resource")
# see if the caller's module is __main__ - if so,treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
def bigaddrspacetest(f):
"""Decorator for tests that fill the address space."""
def wrapper(self):
if max_memuse < MAX_Py_ssize_t:
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
raise unittest.SkipTest(
"not enough memory: try a 32-bit build instead")
else:
raise unittest.SkipTest(
"not enough memory: %.1fG minimum needed"
% (MAX_Py_ssize_t / (1024 ** 3)))
else:
return f(self)
return wrapper
#=======================================================================
# unittest integration.
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def test_json(self):
args = self.databases[0].copy()
args["charset"] = "utf8mb4"
conn = pymysql.connect(**args)
if not self.mysql_server_is(conn, json.loads(json_str))
def requires(resource,treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
def bigaddrspacetest(f):
"""Decorator for tests that fill the address space."""
def wrapper(self):
if max_memuse < MAX_Py_ssize_t:
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
raise unittest.SkipTest(
"not enough memory: try a 32-bit build instead")
else:
raise unittest.SkipTest(
"not enough memory: %.1fG minimum needed"
% (MAX_Py_ssize_t / (1024 ** 3)))
else:
return f(self)
return wrapper
#=======================================================================
# unittest integration.
def requires(resource,treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
def bigaddrspacetest(f):
"""Decorator for tests that fill the address space."""
def wrapper(self):
if max_memuse < MAX_Py_ssize_t:
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
raise unittest.SkipTest(
"not enough memory: try a 32-bit build instead")
else:
raise unittest.SkipTest(
"not enough memory: %.1fG minimum needed"
% (MAX_Py_ssize_t / (1024 ** 3)))
else:
return f(self)
return wrapper
#=======================================================================
# unittest integration.
def get_test_client(nowait=False, **kwargs):
# construct kwargs from the environment
kw = {'timeout': 30}
if 'TEST_ES_CONNECTION' in os.environ:
from elasticsearch import connection
kw['connection_class'] = getattr(connection, os.environ['TEST_ES_CONNECTION'])
kw.update(kwargs)
client = Elasticsearch([os.environ.get('TEST_ES_SERVER', {})], **kw)
# wait for yellow status
for _ in range(1 if nowait else 100):
try:
client.cluster.health(wait_for_status='yellow')
return client
except ConnectionError:
time.sleep(.1)
else:
# timeout
raise SkipTest("Elasticsearch failed to start.")
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def test_json(self):
args = self.databases[0].copy()
args["charset"] = "utf8mb4"
conn = pymysql.connect(**args)
if not self.mysql_server_is(conn, json.loads(json_str))
def requires(resource,treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
def bigaddrspacetest(f):
"""Decorator for tests that fill the address space."""
def wrapper(self):
if max_memuse < MAX_Py_ssize_t:
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
raise unittest.SkipTest(
"not enough memory: try a 32-bit build instead")
else:
raise unittest.SkipTest(
"not enough memory: %.1fG minimum needed"
% (MAX_Py_ssize_t / (1024 ** 3)))
else:
return f(self)
return wrapper
#=======================================================================
# unittest integration.
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def test_json(self):
args = self.databases[0].copy()
args["charset"] = "utf8mb4"
conn = pymysql.connect(**args)
if not self.mysql_server_is(conn, json.loads(json_str))
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def test_json(self):
args = self.databases[0].copy()
args["charset"] = "utf8mb4"
conn = pymysql.connect(**args)
if not self.mysql_server_is(conn, json.loads(json_str))
def requires(resource,treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
def bigaddrspacetest(f):
"""Decorator for tests that fill the address space."""
def wrapper(self):
if max_memuse < MAX_Py_ssize_t:
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
raise unittest.SkipTest(
"not enough memory: try a 32-bit build instead")
else:
raise unittest.SkipTest(
"not enough memory: %.1fG minimum needed"
% (MAX_Py_ssize_t / (1024 ** 3)))
else:
return f(self)
return wrapper
#=======================================================================
# unittest integration.
def requires(resource,treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
def bigaddrspacetest(f):
"""Decorator for tests that fill the address space."""
def wrapper(self):
if max_memuse < MAX_Py_ssize_t:
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
raise unittest.SkipTest(
"not enough memory: try a 32-bit build instead")
else:
raise unittest.SkipTest(
"not enough memory: %.1fG minimum needed"
% (MAX_Py_ssize_t / (1024 ** 3)))
else:
return f(self)
return wrapper
#=======================================================================
# unittest integration.
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
os.unlink(cls.tempdir.name)
_scala_HiveContext =\
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()
def quiet_run(self, result, func, *args, **kwargs):
try:
func(*args, **kwargs)
except (KeyboardInterrupt, SystemExit):
raise
except unittest.SkipTest as e:
if hasattr(result, 'addSkip'):
result.addSkip(self, str(e))
else:
warnings.warn("TestResult has no addSkip method,skips not reported",
RuntimeWarning, 2)
result.addSuccess(self)
return False
except:
result.addError(self, self.__exc_info())
return False
return True
def testSocketAuthInstallPlugin(self):
# needs plugin. lets install it.
cur = self.connections[0].cursor()
try:
cur.execute("install plugin auth_socket soname 'auth_socket.so'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'auth_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
try:
cur.execute("install soname 'auth_socket'")
TestAuthentication.socket_found = True
self.socket_plugin_name = 'unix_socket'
self.realtestSocketAuth()
except pymysql.err.InternalError:
TestAuthentication.socket_found = False
raise unittest2.SkipTest('we couldn\'t install the socket plugin')
finally:
if TestAuthentication.socket_found:
cur.execute("uninstall plugin %s" % self.socket_plugin_name)
def setUp(self):
self.dirname = os.path.join(tempfile.gettempdir(),
'files_for_testing_neo',
'klustakwik/test1')
if not os.path.exists(self.dirname):
raise unittest.SkipTest('data directory does not exist: ' +
self.dirname)
def test1(self):
"""Tests that files can be loaded by basename"""
kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'basename'))
if not BaseTestIO.use_network:
raise unittest.SkipTest("Requires download of data from the web")
fetfiles = kio._fp.read_filenames('fet')
self.assertEqual(len(fetfiles), 2)
self.assertEqual(os.path.abspath(fetfiles[0]),
os.path.abspath(os.path.join(self.dirname,
'basename.fet.0')))
self.assertEqual(os.path.abspath(fetfiles[1]),
'basename.fet.1')))
def test3(self):
"""Tests that files can be loaded by basename2"""
kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'basename2'))
if not BaseTestIO.use_network:
raise unittest.SkipTest("Requires download of data from the web")
clufiles = kio._fp.read_filenames('clu')
self.assertEqual(len(clufiles), 1)
self.assertEqual(os.path.abspath(clufiles[1]),
'basename2.clu.1')))
def setUp(self):
self.dirname = os.path.join(tempfile.gettempdir(),
'klustakwik/test2')
if not os.path.exists(self.dirname):
raise unittest.SkipTest('data directory does not exist: ' +
self.dirname)
def setUp(self):
self.dirname = os.path.join(tempfile.gettempdir(),
'klustakwik/test3')
if not os.path.exists(self.dirname):
raise unittest.SkipTest('data directory does not exist: ' +
self.dirname)
def setUp(self):
super(CommonTests, self).setUp()
data_dir = os.path.join(self.local_test_dir,
'Cheetah_v{}'.format(self.cheetah_version))
self.sn = os.path.join(data_dir, 'original_data')
self.pd = os.path.join(data_dir, 'plain_data')
if not os.path.exists(self.sn):
raise unittest.SkipTest('data file does not exist:' + self.sn)
def setUp(self):
BaseTestIO.setUp(self)
if sys.platform.startswith('win'):
distantfile = 'http://download.multichannelsystems.com/download_data/software/neuroshare/nsMCDLibrary_3.7b.zip'
localfile = os.path.join(tempfile.gettempdir(),'nsMCDLibrary_3.7b.zip')
if not os.path.exists(localfile):
urlretrieve(distantfile, localfile)
if platform.architecture()[0].startswith('64'):
self.dllname = os.path.join(tempfile.gettempdir(),'Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary64.dll')
if not os.path.exists(self.dllname):
zip = zipfile.ZipFile(localfile)
zip.extract('Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary64.dll', path = tempfile.gettempdir())
else:
self.dllname = os.path.join(tempfile.gettempdir(),'Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary.dll')
if not os.path.exists(self.dllname):
zip = zipfile.ZipFile(localfile)
zip.extract('Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary.dll', path = tempfile.gettempdir())
elif sys.platform.startswith('linux'):
if platform.architecture()[0].startswith('64'):
distantfile = 'http://download.multichannelsystems.com/download_data/software/neuroshare/nsMCDLibrary_Linux64_3.7b.tar.gz'
localfile = os.path.join(tempfile.gettempdir(),'nsMCDLibrary_Linux64_3.7b.tar.gz')
else:
distantfile = 'http://download.multichannelsystems.com/download_data/software/neuroshare/nsMCDLibrary_Linux32_3.7b.tar.gz'
localfile = os.path.join(tempfile.gettempdir(),'nsMCDLibrary_Linux32_3.7b.tar.gz')
if not os.path.exists(localfile):
urlretrieve(distantfile, localfile)
self.dllname = os.path.join(tempfile.gettempdir(),'nsMCDLibrary/nsMCDLibrary.so')
if not os.path.exists(self.dllname):
tar = tarfile.open(localfile)
tar.extract('nsMCDLibrary/nsMCDLibrary.so', path = tempfile.gettempdir())
else:
raise unittest.SkipTest("Not currently supported on OS X")
def setUp(self):
self.dirname = os.path.join(tempfile.gettempdir(),
'klustakwik/test1')
if not os.path.exists(self.dirname):
raise unittest.SkipTest('data directory does not exist: ' +
self.dirname)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。