Python argparse 模块,Namespace() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用argparse.Namespace()。
def _parse_args(args):
parser = argparse.ArgumentParser()
if any([arg == '--version' for arg in args]):
return argparse.Namespace(version=True)
parser.add_argument('script', help='Script to run')
parser.add_argument('target', nargs='?', default='build', help='Target object to build; defaults to \'build\'')
parser.add_argument('--version', action='store_true', help='Print version info and exit')
parser.add_argument('--clear', help='Clear output directory')
parser.add_argument('--clear-cache', help='Clear cache before compiling')
parser.add_argument('--threads', '-t', type=int, help='Set thread count; defaults to cores*2')
parser.add_argument('--no-threading', '-nt', help='disable multithreaded compiling')
# Todo: Make target '*' instead of '?' so multiple targets Could be ran from the same command
return parser.parse_args(args)
def parse(self, argv):
"""
Parse arguments.
:param argv: arguments.
"""
values = dict()
self.data = None
self._errors = list()
for idx, param in enumerate(self.params):
try:
values[param['dest'] or param['name']] = self.parse_parameter(param, argv, idx)
except ParamException as e:
self._errors.append(str(e))
self.data = Namespace(**values)
def list_action_remove(self, player, values, map_dictionary, view, **kwargs):
# Check permission.
if not await self.instance.permission_manager.has_permission(player, 'admin:remove_map'):
await self.instance.chat(
'$f00You don\'t have the permission to perform this action!',
player
)
return
# Ask for confirmation.
cancel = bool(await ask_confirmation(player, 'Are you sure you want to remove the map \'{}\'$z$s from the server?'.format(
map_dictionary['name']
), size='sm'))
if cancel is True:
return
# Simulate command.
await self.remove_map(player, Namespace(nr=map_dictionary['id']))
# Reload parent view.
await view.refresh(player)
def run_with_args(args, parser):
# type: (argparse.Namespace,argparse.ArgumentParser) -> int
set_logging_parameters(args, parser)
start_time = time.time()
ret = OK
try:
if args.profile:
outline("Profiling...")
profile("ret = whatstyle(args,parser)", locals(), globals())
else:
ret = whatstyle(args, parser)
except IOError as exc:
# If the output is piped into a pager like 'less' we get a broken pipe when
# the pager is quit early and that is ok.
if exc.errno == errno.EPIPE:
pass
elif str(exc) == 'Stream closed':
pass
else:
raise
if not PY2:
sys.stderr.close()
iprint(INFO_TIME, 'Run time: %s seconds' % (time.time() - start_time))
return ret
def test_start_object(self):
server = PJFServer(configuration=PJFConfiguration(Namespace(ports={"servers": {"HTTP_PORT": 8080, "HTTPS_PORT": 8443}},
html=False, level=6, command=["radamsa"], stdin=True,
json={"a": "test"}, indent=True, strong_fuzz=False, url_encode=False,
parameters=[], notify=False, debug=False, content_type="text/plain",
utf8=False, nologo=True)))
server.run()
json_http = urllib2.urlopen("http://127.0.0.1:8080").read()
try:
import requests
requests.packages.urllib3.disable_warnings()
json_https = requests.get('https://127.0.0.1:8443', verify=False).content
self.assertTrue(json_https)
except ImportError:
pass
self.assertTrue(json_http)
server.stop()
def cli_args():
"""Parse the command line arguments.
:return: The parsed arguments.
:rtype: argparse.Namespace
"""
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--check',
help="check if the system is vulnerable to WCry")
parser.add_argument('-m', '--mitigate',
help="mitigate the system's vulnerability by disabling the"
" SMBv1 protocol,if necessary; implies --check")
parser.add_argument('-f', '--fix', action='store_true')
parser.add_argument('--download-directory',
help="Optionally specify a directory where the Microsoft"
" KB update is saved when using --fix")
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
# else:
return parser.parse_args()
def register_command(handler: Callable[[argparse.Namespace], None],
main_parser: Optional[ArgParserType]=None,
) -> Callable[[argparse.Namespace], None]:
if main_parser is None:
main_parser = global_argparser
if id(main_parser) not in _subparsers:
subparsers = main_parser.add_subparsers(title='commands',
dest='command')
_subparsers[id(main_parser)] = subparsers
else:
subparsers = _subparsers[id(main_parser)]
@functools.wraps(handler)
def wrapped(args):
handler(args)
doc_summary = handler.__doc__.split('\n\n')[0]
inner_parser = subparsers.add_parser(handler.__name__.replace('_', '-'),
description=handler.__doc__,
help=doc_summary)
inner_parser.set_defaults(function=wrapped)
wrapped.register_command = functools.partial(register_command,
main_parser=inner_parser)
wrapped.add_argument = inner_parser.add_argument
return wrapped
def __call__(
self,
parser, # type: argparse.ArgumentParser
namespace, # type: argparse.Namespace
values, # type: Union[ARGPARSE_TEXT,Sequence[Any],None]
option_string=None # type: Optional[ARGPARSE_TEXT]
):
# type: (...) -> None
"""Checks to make sure that the destination is empty before writing.
:raises parser.error: if destination is already set
"""
if getattr(namespace, self.dest) is not None: # type: ignore # typeshed doesn't kNow about Action.dest yet?
parser.error('{} argument may not be specified more than once'.format(option_string))
return
setattr(namespace, self.dest, values) # type: ignore # typeshed doesn't kNow about Action.dest yet?
def start(self):
self.setup_sockets()
import StaticUPnP_Settings
permissions = Namespace(**StaticUPnP_Settings.permissions)
print(permissions)
if permissions.drop_permissions:
self.drop_privileges(permissions.user, permissions.group)
self.running = Value(ctypes.c_int, 1)
self.queue = Queue()
self.reciever_thread = Process(target=self.socket_handler, args=(self.queue, self.running))
self.reciever_thread.start()
self.schedule_thread = Process(target=self.schedule_handler, args=(self.running,))
self.schedule_thread.start()
self.response_thread = Process(target=self.response_handler, self.running))
self.response_thread.start()
def get_interface_addresses(logger):
import StaticUPnP_Settings
interface_config = Namespace(**StaticUPnP_Settings.interfaces)
ip_addresses = StaticUPnP_Settings.ip_addresses
if len(ip_addresses) == 0:
import netifaces
ifs = netifaces.interfaces()
if len(interface_config.include) > 0:
ifs = interface_config.include
if len(interface_config.exclude) > 0:
for iface in interface_config.exclude:
ifs.remove(iface)
for i in ifs:
addrs = netifaces.ifaddresses(i)
if netifaces.AF_INET in addrs:
for addr in addrs[netifaces.AF_INET]:
ip_addresses.append(addr['addr'])
logger.info("Regestering multicast on %s: %s"%(i, addr['addr']))
return ip_addresses
def parse_args() -> argparse.Namespace: # pragma: no cover
"""
Parses the Command Line Arguments using argparse
:return: The parsed arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument("connection", help="The Type of Connection to use")
parser.add_argument("-v", "--verbose", action="store_true",
help="Activates verbose output")
parser.add_argument("-d", "--debug",
help="Activates debug-level logging output")
parser.add_argument("-q", "--quiet",
help="disables all text output")
parser.add_argument("-c", "--config",
default=os.path.join(os.path.expanduser("~"),
".kudubot"),
help="Overrides the configuration directory location")
return parser.parse_args()
def test_init_2(tmpdir):
"it should open sam file if provided"
make_bam(tmpdir.strpath, """
123456789_123456789_
r1 + ...........
r1 - ......*....
r2 + .........*.
r2 - .....*.......
""")
o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None)
init(o)
assert isinstance(o.cfdna, AlignmentFile)
assert o.gdna == None
def test_init_3(tmpdir):
"it should generate a proper output file name if not provided"
make_bam(tmpdir.strpath, output=None)
init(o)
assert o.output != None
def test_aggregate_reads_3():
"it should ignore when 3+ reads share the same name"
o = Namespace(verbos=False, qual=20, mismatch_limit=-1)
reads = (
("r1", 'A', 60, 2, 11, -1, 9, False, True),
("r1", 'C', -9, True, True),
("r2", 0, True, False)
)
unique_pairs, unique_single, _, nerror, *_ = aggregate_reads(o, reads)
assert len(unique_pairs) == 0
assert len(unique_single) == 1
assert nerror == 3
def test_count_different_type_1():
"it should NOT count dna that more than 10% reads have different bases"
o = Namespace(verbos=False, allow_inconsist=False)
pair = {
('ref', True): [
('A', 60),
('T', 60)
],
('ref', False): [
('A', 60)
]
}
mor, mnr, msr, oor, onr, osr, moa, mna, msa, ooa, ona, osa, inconsis = count_different_type(o, pair, {}, 'T', 'A')
assert inconsis == 6
assert sum((mor, osa)) == 0
def test_get_reads_1(tmpdir):
"it should get all but only the reads that covers the given position"
make_bam(tmpdir.strpath, """
123456789_123456789_12
r1 + ...........
r1 - ......*....
r2 + .........*.
r2 - .....*.......
r3 + ...........
r3 - ....*......
r4 + ...........
r4 - ...........
123456789_123456789_12
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
assert sum( 1 for _ in get_reads(o, sam, 'ref', '4') ) == 2
assert sum( 1 for _ in get_reads(o, '12') ) == 7
assert sum( 1 for _ in get_reads(o, '20') ) == 2
def test_get_reads_2(tmpdir):
"it should read properties correctly"
make_bam(tmpdir.strpath, """
123456789_123
r1 + ...*.......
r1 - .*.........
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
r = next(get_reads(o, '4'))
assert r[0] == "r1" # name
assert r[3] == 0 # 0-based pos
assert r[4] == 11 # length
assert r[5] == -1 # mismatch,not caculated
assert r[6] == 2 # mate pos
assert r[7] == 13 # template length
assert r[8] == False # is_reverse
assert r[9] == True # paired and mapped
def test_pad_softclip_2(tmpdir):
"it should ignore more than two reads which share the same name"
make_bam(tmpdir.strpath, """
r1 + __.*.......
r1 - .*.......__
r1 - .*.......__
r2 + .*.......__
r2 - .*.......__
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
adjusted_pos = pad_softclip(sam)
assert sum(1 for startpos, length in adjusted_pos.values() if startpos != -1) == 1
def test_pad_softclip_3(tmpdir):
"it should pad softclipped bases"
make_bam(tmpdir.strpath, """
123456789_123
r1 + __.*.......
r1 - .*.........
r2 - ...*.......
r2 + .*.......__
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
adjusted_pos = pad_softclip(sam)
assert adjusted_pos["r1"] == (0, 13) # 0-based position
assert adjusted_pos["r2"] == (0, 13)
def get_api_id(config, args):
"""
Get the API ID from terraform,or from AWS if that fails.
:param config: configuration
:type config: :py:class:`~.Config`
:param args: command line arguments
:type args: :py:class:`argparse.Namespace`
:return: API Gateway ID
:rtype: str
"""
try:
logger.debug('Trying to get terraform rest_api_id output')
runner = terraformRunner(config, args.tf_path)
outputs = runner._get_outputs()
depl_id = outputs['rest_api_id']
logger.debug("terraform rest_api_id output: '%s'", depl_id)
except Exception:
logger.info('Unable to find API rest_api_id from terraform state;'
' querying AWS.', exc_info=1)
aws = AWSInfo(config)
depl_id = aws.get_api_id()
logger.debug("AWS API ID: '%s'", depl_id)
return depl_id
def _predict(predictors: Dict[str, str]):
def predict_inner(args: argparse.Namespace) -> None:
predictor = _get_predictor(args, predictors)
output_file = None
if args.silent and not args.output_file:
print("--silent specified without --output-file.")
print("Exiting early because no output will be created.")
sys.exit(0)
# ExitStack allows us to conditionally context-manage `output_file`,which may or may not exist
with ExitStack() as stack:
input_file = stack.enter_context(args.input_file) # type: ignore
if args.output_file:
output_file = stack.enter_context(args.output_file) # type: ignore
_run(predictor, input_file, output_file, args.batch_size, not args.silent, args.cuda_device)
return predict_inner
def get_parsed_args(self, comp_words):
""" gets the parsed args from a patched parser """
active_parsers = self._patch_argument_parser()
parsed_args = argparse.Namespace()
self.completing = True
if USING_PYTHON2:
# Python 2 argparse only properly works with byte strings.
comp_words = [ensure_bytes(word) for word in comp_words]
try:
stderr = sys.stderr
sys.stderr = io.open(os.devnull, "w")
active_parsers[0].parse_kNown_args(comp_words, namespace=parsed_args)
sys.stderr.close()
sys.stderr = stderr
except BaseException:
pass
self.completing = False
return parsed_args
def test_with_config(self, is_file, open_):
# Create our stand-in config file.
config_file = textwrap.dedent(u"""\
---
local_paths:
reporoot: ~/Code
publish: local
""")
is_file.return_value = True
open_.return_value = io.StringIO(config_file)
# Get the config and test the result.
flags = Namespace(user_config='/bogus/file.yaml', command='not_init')
user_config = main.read_user_config(flags)
assert user_config == {
'local_paths': {'reporoot': '~/Code'},
'publish': 'local',
}
def test_with_ssh_repo(self, login):
# Set up test data to return when we attempt to make the
# pull request.
gh = mock.Magicmock(spec=github3.github.GitHub)
login.return_value = gh
url = 'https://github.com/me/repo/pulls/1/'
gh.repository().create_pull.return_value = Namespace(html_url=url)
# Run the task.
task = github.CreateGitHubPullRequest()
pr = task.execute(**self.task_kwargs)
# Assert we got the correct result.
assert pr.html_url == url
# Assert that the correct methods were called.
login.assert_called_once_with('lukesneeringer', '1335020400')
gh.repository.assert_called_with('me', 'repo')
gh.repository().create_pull.assert_called_once_with(
base='master',
body='This pull request was generated by artman. '
'Please review it thoroughly before merging.',
head='pubsub-python-v1',
title='Python GAPIC: Pubsub v1',
)
def test_with_http_url(self, login):
# Set up test data to return when we attempt to make the
# pull request.
gh = mock.Magicmock(spec=github3.github.GitHub)
login.return_value = gh
url = 'https://github.com/me/repo/pulls/1/'
gh.repository().create_pull.return_value = Namespace(html_url=url)
# Run the task.
task = github.CreateGitHubPullRequest()
pr = task.execute(**dict(self.task_kwargs, git_repo={
'location': 'https://github/me/repo/',
}))
# Assert we got the correct result.
assert pr.html_url == url
# Assert that the correct repository method was still called.
gh.repository.assert_called_with('me', 'repo')
def print_args(self):
"""Print out all the arguments in this parser."""
if not self.opt:
self.parse_args(print_args=False)
values = {}
for key, value in self.opt.items():
values[str(key)] = str(value)
for group in self._action_groups:
group_dict = {
a.dest: getattr(self.args, a.dest, None)
for a in group._group_actions
}
namespace = argparse.Namespace(**group_dict)
count = 0
for key in namespace.__dict__:
if key in values:
if count == 0:
print('[ ' + group.title + ': ] ')
count += 1
print('[ ' + key + ': ' + values[key] + ' ]')
def _test_standalone_sequana(qtbot, tmpdir):
wkdir = TemporaryDirectory()
inputdir = os.path.realpath(
sequana_data("Hm2_GTGAAA_L005_R1_001.fastq.gz")).rsplit(os.sep,1)[0]
# Standalone for sequana given a wkdir and pipeline and input_dir
args = Namespace(pipeline="quality_control", wkdir=wkdir.name,
input_directory=inputdir)
widget = sequana_gui.SequanaGUI(ipython=False, user_options=args)
qtbot.addWidget(widget)
assert widget.mode == "sequana"
widget.force = True
widget.save_project()
widget.click_run()
count = 0
while widget.process.state() and count < 5:
time.sleep(0.5)
count+=0.5
widget.click_stop()
time.sleep(1)
def _setup_arg_parser(argv):
""" Parse arguments from command line
Args:
argv list: by default it's command line arguments
Returns:
argparse.Namespace: parsed argument
"""
parser = argparse.ArgumentParser(
description='MysqL binlog to Google Cloud Pub/Sub')
parser.add_argument('conf',
help='configuration file for publishing')
parser.add_argument('--loglevel', '-l', default=None,
help='log level for root')
if os.path.isfile('logging.ini'):
_log_file = 'logging.ini'
else:
_log_file = None
parser.add_argument('--logconf', default=_log_file,
help='INI file log configuration')
args = parser.parse_args(argv)
return args
def RunTestsCommand(args):
"""Checks test type and dispatches to the appropriate function.
Args:
args: argparse.Namespace object.
Returns:
Integer indicated exit code.
Raises:
Exception: UnkNown command name passed in,or an exception from an
individual test runner.
"""
command = args.command
ProcessCommonoptions(args)
logging.info('command: %s', ' '.join(sys.argv))
if args.enable_platform_mode or command in _DEFAULT_PLATFORM_MODE_TESTS:
return RunTestsInPlatformMode(args)
if command == 'python':
return _RunPythonTests(args)
else:
raise Exception('UnkNown test type.')
def __init__(self, args=None, parse_args=True):
self.cli = CLI()
self.args, self.unkNown_args = argparse.Namespace(), []
if parse_args:
self.args, self.unkNown_args = self.parse_arguments(args=args)
self.settings = self.args.settings or os.environ.get('CLINNER_SETTINGS')
# Inject parameters related to current stage as environment variables
self.inject()
# Load settings
settings.build_from_module(self.args.settings)
# Apply quiet mode
if self.args.quiet:
self.cli.disable()
def add_domains(args_or_config, domains):
"""Registers new domains to be used during the current client run.
Domains are not added to the list of requested domains if they have
already been registered.
:param args_or_config: parsed command line arguments
:type args_or_config: argparse.Namespace or
configuration.NamespaceConfig
:param str domain: one or more comma separated domains
:returns: domains after they have been normalized and validated
:rtype: `list` of `str`
"""
validated_domains = []
for domain in domains.split(","):
domain = util.enforce_domain_sanity(domain.strip())
validated_domains.append(domain)
if domain not in args_or_config.domains:
args_or_config.domains.append(domain)
return validated_domains
def update(namespace, username=None):
"""
This updates a Namespace (as returned by ArgumentParser) with config values
if they aren't present in the Namespace already.
"""
if not os.path.isfile(_get_config_path()):
return namespace
if not username:
username = "DEFAULT"
conf = _get_config()
if not username in conf:
print("User {} is not configured.".format(username))
sys.exit(1)
return update_namespace(namespace, conf[username])
def test_remove_values_having_hyphen(self):
api_exts = "dvr,l3-flavors,rbac-policies,project-id"
remove_exts = ["dvr", "project-id"]
args = Namespace(
remove={
"network-feature-enabled.api-extensions": remove_exts
}
)
self.conf = self._get_conf("v2.0", "v3")
self.conf.set("network-feature-enabled", "api-extensions", api_exts)
self.conf.remove_values(args)
conf_exts = self.conf.get("network-feature-enabled", "api-extensions")
conf_exts = conf_exts.split(',')
for ext in api_exts.split(','):
if ext in remove_exts:
self.assertFalse(ext in conf_exts)
else:
self.assertTrue(ext in conf_exts)
def parse_arguments(args: list) -> Namespace:
"""
Parses the arguments passed on invocation in a dict and return it
"""
parser = ArgumentParser(description="A tool to combine multiple merge tools")
parser.add_argument('-b', '--base', required=True)
parser.add_argument('-l', '--local', required=True)
parser.add_argument('-r', '--remote', required=True)
parser.add_argument('-m', '--merged', required=True)
# convert to absolute path
parsed_arg = parser.parse_args(args)
parsed_arg.base = os.path.abspath(parsed_arg.base)
parsed_arg.local = os.path.abspath(parsed_arg.local)
parsed_arg.remote = os.path.abspath(parsed_arg.remote)
parsed_arg.merged = os.path.abspath(parsed_arg.merged)
return parsed_arg
def merge(config: RawConfigParser,
args: Namespace,
launcher: ToolsLauncher,
analyser: ConflictedFileAnalyser) -> int:
"""
Handle the merge tools chain for the given argument
config -- the current amt configuration
args -- the arguments with the base,local,remote and merged file names
launcher -- the launcher helper
"""
if not (config.has_option(SECT_AMT, OPT_TOOLS)):
raise RuntimeError('Missing the {0}.{1} configuration'.format(SECT_AMT, OPT_TOOLS))
tools = config.get(SECT_AMT, OPT_TOOLS).split(';')
merge_result = ERROR_NO_TOOL
for tool in tools:
merge_result = merge_with_tool(tool, config, args, launcher, analyser)
if merge_result == 0:
return 0
print(" [AMT] ? Sorry,it seems we can't solve it this time")
return merge_result
def setUp(self):
options.cfg = argparse.Namespace()
options.cfg.corpus = None
options.cfg.current_server = "MockConnection"
options.cfg.MODE = QUERY_MODE_TOKENS
options.cfg.stopword_list = []
options.cfg.context_mode = CONTEXT_NONE
options.cfg.drop_on_na = True
options.cfg.drop_duplicates = True
options.cfg.benchmark = False
options.cfg.verbose = False
self.Session = Session()
self.Session.Resource = BaseResource()
self.manager = Manager()
def setUp(self):
self.maxDiff = None
options.cfg = argparse.Namespace()
options.cfg.number_of_tokens = 0
options.cfg.limit_matches = False
options.cfg.regexp = False
options.cfg.query_case_sensitive = False
options.get_configuration_type = lambda: sql_MysqL
options.get_resource = _monkeypatch_get_resource
self.Session = MockOptions()
self.Session.Resource = self.resource
self.Session.Lexicon = None
self.Session.Corpus = None
self.link = coquery.links.Link(
self.resource.name, "corpus_word",
self.external.name, "word_label",
join="LEFT JOIN")
options.cfg.current_server = "Default"
options.cfg.table_links = {}
options.cfg.table_links[options.cfg.current_server] = [self.link]
def setUp(self):
self.maxDiff = None
options.cfg = argparse.Namespace()
options.cfg.number_of_tokens = 0
options.cfg.limit_matches = False
options.cfg.regexp = False
options.cfg.query_case_sensitive = False
options.get_configuration_type = lambda: sql_MysqL
options.get_resource = _monkeypatch_get_resource
self.Session = MockOptions()
self.Session.Resource = self.resource
self.Session.Lexicon = None
self.Session.Corpus = None
self.link = coquery.links.Link(
self.resource.name,
join="LEFT JOIN")
options.cfg.current_server = "Default"
options.cfg.table_links = {}
options.cfg.table_links[options.cfg.current_server] = [self.link]
def setUp(self):
self.server = mock.Mock()
self.flow = mock.Mock()
self.storage = mock.Mock()
self.credentials = mock.Mock()
self.flow.step1_get_authorize_url.return_value = (
'http://example.com/auth')
self.flow.step2_exchange.return_value = self.credentials
self.flags = argparse.Namespace(
noauth_local_webserver=True, logging_level='INFO')
self.server_flags = argparse.Namespace(
noauth_local_webserver=False,
logging_level='INFO',
auth_host_port=[8080, ],
auth_host_name='localhost')
def parse(self, args):
"""
Parse the command line arguments.
@param args: the list of user-provided command line arguments --
normally sys.argv[1:]
@type args: tuple of str
@return: an object initialized with the parsed arguments
@rtype: argparse.Namespace
"""
try:
return self.parser.parse_args(args)
except SystemExit as se:
if se.code > 0:
raise AppExit('Bad command line', ExitCodes.USAGE_ERROR)
else:
raise AppExit(code=ExitCodes.CLEAN)
def __init__(self, args):
"""Min Mapping Quality Zero constructor."""
# This needs to happen first,because threshold is initialised here.
super(MQ0FFilter, self).__init__(args)
# Change the threshold to custom gq value.
self.threshold = self._default_threshold
if isinstance(args, argparse.Namespace):
self.threshold = args.mq_score
elif isinstance(args, dict):
try:
self.threshold = float(args.get(self.parameter))
except (TypeError, ValueError):
logging.error("Could not retrieve threshold from %s", args.get(self.parameter))
logging.error("This parameter requires to be a float!")
raise Exception("Could not create MQ0F filter from parameters: %s" % args)
def __init__(self, args):
"""AD Ratio constructor."""
# This needs to happen first,because threshold is initialised here.
super(DP4Filter, self).__init__(args)
# Change the threshold to custom dp value.
self.threshold = self._default_threshold
if isinstance(args, argparse.Namespace):
self.threshold = args.ad_ratio
elif isinstance(args, args.get(self.parameter))
logging.error("This parameter requires to be a float!")
raise Exception("Could not create DP4 filter from parameters: %s" % args)
def __init__(self, args):
"""Min Mapping Quality constructor."""
# This needs to happen first,because threshold is initialised here.
super(MQFilter, dict):
try:
self.threshold = int(args.get(self.parameter))
except (TypeError, args.get(self.parameter))
logging.error("This parameter requires to be an integer!")
raise Exception("Could not create MQ filter from parameters: %s" % args)
def __init__(self, args):
"""Min Depth constructor."""
# This needs to happen first,because threshold is initialised here.
super(GQFilter, argparse.Namespace):
self.threshold = args.gq_score
elif isinstance(args, args.get(self.parameter))
logging.error("This parameter requires to be an integer!")
raise Exception("Could not create GQ filter from parameters: %s" % args)
def parse_linter_args():
"""Parse command-line arguments and set up the command-line
interface and help.
Defaults to current working directory for the directory arg and the
copy of rules.py in the directory this module is in for the rules arg.
Returns
-------
argparse.Namespace
object subclass with arguments as attributes for each given argument
"""
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group()
group.add_argument('-d', '--directory', help="The local path to your repository's base directory. Defaults to the current working directory.",
default=os.getcwd()
)
parser.add_argument('-r', '--rules', help='The path to the rules configuration file,a YAML file containing the rules you would like to check for. Defaults to path/to/openlinter/rules.yml.',
default=os.path.join(get_current_script_dir(), 'rules.yml')
)
parser.add_argument('-v', '--version', action='version', version='1.0.1')
return parser.parse_args()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。