Python requests.auth 模块,HTTPBasicAuth() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用requests.auth.HTTPBasicAuth()。
def auth():
# Basic Authentication
requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass'))
requests.get('https://api.github.com/user', auth=('user', 'pass'))
# Digest Authentication
url = 'http://httpbin.org/digest-auth/auth/user/pass'
requests.get(url, auth=HTTPDigestAuth('user', 'pass'))
# OAuth2 Authentication????requests-oauthlib
url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN')
requests.get(url, auth=auth)
pass
def __init__(self, base_url, *args, **kwargs):
requests.Session.__init__(self, **kwargs)
self.base_url = base_url
# Check for basic authentication
parsed_url = urlparse(self.base_url)
if parsed_url.username and parsed_url.password:
netloc = parsed_url.hostname
if parsed_url.port:
netloc += ":%d" % parsed_url.port
# remove username and password from the base_url
self.base_url = urlunparse((parsed_url.scheme, netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment))
# configure requests to use basic auth
self.auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
def __init__(self, api_url, auth_custom=None, auth_user=None, auth_pass=None, login=None):
"""
:param api_url: str Moira API URL
:param auth_custom: dict auth custom headers
:param auth_user: str auth user
:param auth_pass: str auth password
:param login: str auth login
"""
if not api_url.endswith('/'):
self.api_url = api_url + '/'
else:
self.api_url = api_url
self.auth = None
self.auth_custom = {'X-Webauth-User': login}
if auth_user and auth_pass:
self.auth = HTTPBasicAuth(auth_user, auth_pass)
if auth_custom:
self.auth_custom.update(auth_custom)
def __init__(
self,
*,
broker_url,
pact_dir='.',
user=None,
password=None,
authentication=False
):
self.broker_url = broker_url.rstrip('/')
self.username = user
self.password = password
self.pact_dir = pact_dir
self.authentication = authentication
self._auth = None
if self.authentication:
if not (self.username and self.password):
raise ValueError(
'When authentication is True,username and password '
'must be provided.'
)
else:
self._auth = HTTPBasicAuth(self.username, self.password)
def test_pull_pact_authentication(mock_get, pull_pact_response):
mock_get.return_value = pull_pact_response
broker_client = brokerClient(
broker_url=settings.PACT_broKER_URL,
user='user',
password='password',
authentication=True
)
broker_client.pull_pact(
provider=PROVIDER,
consumer=CONSUMER,
)
mock_get.assert_called_once_with(
EXPECTED_PULL_PACT_URL,
auth=HTTPBasicAuth('user', 'password')
)
def __init__(self, server_url, project="DefaultCollection", user=None, password=None, verify=False,
auth_type=HTTPBasicAuth,
connect_timeout=20, read_timeout=180, ):
"""
:param server_url: url to TFS server,e.g. https://tfs.example.com/
:param project: Collection or Collection\\Project
:param user: username,or DOMAIN\\username
:param password: password
:param verify: True|False - verify HTTPS cert
:param connect_timeout: Requests CONNECTION timeout,sec or None
:param read_timeout: Requests READ timeout,sec or None
"""
if user is None or password is None:
raise ValueError('User name and api-key must be specified!')
self.rest_client = TFSHTTPClient(server_url,
project=project,
user=user, password=password,
verify=verify,
timeout=(connect_timeout, read_timeout),
auth_type=auth_type,
)
def _compose_args(self, method, data=None):
args = {}
if data is None:
data = {}
query = 'params' if method == 'get' else 'data'
args[query] = self._structure.get('common-params', {})
args[query].update(data)
args['headers'] = self._structure.get('common-headers', {})
auth = self._structure.get('auth')
if auth == 'params':
args[query].update({'username': self.billing_username,
'password': self.billing_password})
elif auth == 'headers':
args['auth'] = HTTPBasicAuth(
self.billing_username, self.billing_password)
if self._structure.get('verify') == False:
args['verify'] = False
return args
def grant_access_token(self, code):
res = self._session.post(
'https://bitbucket.org/site/oauth2/access_token',
data={
'grant_type': 'authorization_code',
'code': code,
},
auth=HTTPBasicAuth(self._oauth_key, self._oauth_secret)
)
try:
res.raise_for_status()
except requests.RequestException as reqe:
error_info = res.json()
raise BitbucketAPIError(
res.status_code,
error_info.get('error', ''),
error_info.get('error_description',
request=reqe.request,
response=reqe.response
)
data = res.json()
self._access_token = data['access_token']
self._refresh_token = data['refresh_token']
self._token_type = data['token_type']
return data
def refresh_token(self):
"""
Refresh token regardless of when it expires.
Raises HTTPError on any non 2xx response code
"""
refresh_token = self._token['refresh_token']
params = {
'refresh_token': refresh_token,
'grant_type': 'refresh_token'
}
auth = HTTPBasicAuth(self.client_id, self.client_secret)
response = self.session.post(self.TOKEN_URL, data=params, auth=auth)
response.raise_for_status()
token = response.json()
token['expires_at'] = int(time.time()) + token['expires_in']
if 'refresh_token' not in token:
token['refresh_token'] = refresh_token
self.token = token
def jsonrpc_call(url, params={}, verify_ssl_cert=True,
username=None, password=None):
payload = {"method": method, "params": params, "jsonrpc": "2.0", "id": 0}
kwargs = {
"url": url,
"headers": {'content-type': 'application/json'},
"data": json.dumps(payload),
"verify": verify_ssl_cert,
}
if username and password:
kwargs["auth"] = HTTPBasicAuth(username, password)
global method_cumulative_calltime
begin = time.time()
response = requests.post(**kwargs).json()
method_cumulative_calltime[method] += (time.time() - begin)
if "result" not in response:
raise JsonRpcCallFailed(payload, response)
return response["result"]
def _handle_basic_auth_401(self, r, kwargs):
if self.pos is not None:
r.request.body.seek(self.pos)
# Consume content and release the original connection
# to allow our new request to reuse the same one.
r.content
r.raw.release_conn()
prep = r.request.copy()
if not hasattr(prep, '_cookies'):
prep._cookies = cookies.RequestsCookieJar()
cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
prep.prepare_cookies(prep._cookies)
self.auth = auth.HTTPBasicAuth(self.username, self.password)
prep = self.auth(prep)
_r = r.connection.send(prep, **kwargs)
_r.history.append(r)
_r.request = prep
return _r
def add_strategy(self, domain, strategy):
"""Add a new domain and authentication strategy.
:param str domain: The domain you wish to match against. For example:
``'https://api.github.com'``
:param str strategy: The authentication strategy you wish to use for
that domain. For example: ``('username','password')`` or
``requests.HTTPDigestAuth('username','password')``
.. code-block:: python
a = AuthHandler({})
a.add_strategy('https://api.github.com',('username','password'))
"""
# Turn tuples into Basic Authentication objects
if isinstance(strategy, tuple):
strategy = HTTPBasicAuth(*strategy)
key = self._key_from_url(domain)
self.strategies[key] = strategy
def get_strategy_for(self, url):
"""Retrieve the authentication strategy for a specified URL.
:param str url: The full URL you will be making a request against. For
example,``'https://api.github.com/user'``
:returns: Callable that adds authentication to a request.
.. code-block:: python
import requests
a = AuthHandler({'example.com',('foo','bar')})
strategy = a.get_strategy_for('http://example.com/example')
assert isinstance(strategy,requests.auth.HTTPBasicAuth)
"""
key = self._key_from_url(url)
return self.strategies.get(key, NullAuthStrategy())
def _auth(http_auth):
if not http_auth:
return None
if http_auth['auth_type'] == 'basic':
return HTTPBasicAuth(
http_auth['username'],
http_auth['password']
)
if http_auth['auth_type'] == 'digest':
return HTTPDigestAuth(
http_auth['username'],
http_auth['password']
)
raise Exception('Authorization information is not valid.')
def _handle_basic_auth_401(self, **kwargs)
_r.history.append(r)
_r.request = prep
return _r
def add_strategy(self, tuple):
strategy = HTTPBasicAuth(*strategy)
key = self._key_from_url(domain)
self.strategies[key] = strategy
def test_get_podm_status_Offline_by_http_exception(self, mock_get):
mock_get.side_effect = requests.ConnectionError
self.assertEqual(redfish.pod_status('url', 'username', 'password'),
constants.PODM_STATUS_OFFLINE)
mock_get.asset_called_once_with('url',
auth=auth.HTTPBasicAuth('username',
'password'))
# SSL Error
mock_get.side_effect = requests.exceptions.SSLError
self.assertEqual(redfish.pod_status('url',
constants.PODM_STATUS_OFFLINE)
self.assertEqual(mock_get.call_count, 2)
# Timeout
mock_get.side_effect = requests.Timeout
self.assertEqual(redfish.pod_status('url', 3)
def _upload(url, data_file, username, password):
"""
Upload will submit a results file via polarion ReST interface.
"""
url_match = '(http(s)?)\:\/\/localhost'
if re.search(url_match, url):
print("Please configure url settings.")
exit(1)
polarion_request = post(url,
data=data_file,
auth=auth.HTTPBasicAuth(username,
password))
status_code = polarion_request.status_code
if status_code == codes.ok:
return status_code
else:
print("Results upload Failed with the follow: {}".format(
polarion_request.status_code))
raise exceptions.RequestException
def division_standings():
url = base_url + 'division_team_standings.json'
response = requests.get((url),
auth=HTTPBasicAuth(secret.msf_username, secret.msf_pw))
data = response.json()
for division in data['divisionteamstandings']['division']:
for team in division['teamentry']:
team_id = team['team']['ID']
rank = team['rank']
conn = sqlite3.connect('nflpool.sqlite')
cur = conn.cursor()
cur.execute('''INSERT INTO division_standings(team_id,rank)
VALUES(?,?)''', (team_id, rank))
conn.commit()
conn.close()
# Get Playoff Standings for each team (need number 5 & 6 in each conference for Wild Card picks)
def playoff_standings():
url = base_url + 'playoff_team_standings.json'
response = requests.get((url), secret.msf_pw))
data = response.json()
for conference in data['playoffteamstandings']['conference']:
for team in conference['teamentry']:
team_id = team['team']['ID']
rank = team['rank']
conn = sqlite3.connect('nflpool.sqlite')
cur = conn.cursor()
cur.execute('''INSERT INTO playoff_rankings(team_id, rank))
conn.commit()
conn.close()
# Get individual statistics for each category
def parse_auth_argument(args):
auth = args.auth
if auth == 'basic':
if not (args.username and args.password):
print("For basic authentication,'username' and 'password' "
"parameter are needed")
exit(3)
auth = HTTPBasicAuth(args.username, args.password)
elif auth == 'oauth':
if not (args.consumer_key and args.private_key):
print("For oauth authentication,'consumer-key' "
"and 'private-key' parameter are needed")
exit(3)
auth = get_oauth1session(args.consumer_key, args.consumer_secret,
args.private_key, args.passphrase)
return auth
def parse_auth_argument(args):
auth = args.auth
if auth == 'basic':
if not (args.username and args.password):
print("For basic authentication, args.passphrase)
return auth
def parse_auth_argument(args):
auth = args.auth
if auth == 'basic':
if not (args.username and args.password):
print("For basic authentication, args.passphrase)
return auth
def post_result(url, user, password, verify, data, debug):
r = requests.post(
url,
auth=HTTPBasicAuth(user, password),
verify=verify,
json=data
)
if debug:
err("Request result: " + str(r))
if r.status_code == 403:
err("HTTP 403 Forbidden - Does your bitbucket user have rights to the repo?")
elif r.status_code == 401:
err("HTTP 401 Unauthorized - Are your bitbucket credentials correct?")
# All other errors,just dump the JSON
if r.status_code != 204: # 204 is a success per Bitbucket docs
err(json_pp(r.json()))
return r
# Stop all this from executing if we were imported,say,for testing.
def refresh(self, session=None, auth=None):
"""
Refreshes the token.
:param session: :class:`requests_oauthlib.OAuth2Session` for refreshing token with.
:param auth: :class:`requests.auth.HTTPBasicAuth`
"""
if self.can_refresh:
if not session:
session = OAuth2Session(app_settings.ESI_SSO_CLIENT_ID)
if not auth:
auth = HTTPBasicAuth(app_settings.ESI_SSO_CLIENT_ID, app_settings.ESI_SSO_CLIENT_SECRET)
try:
self.access_token = \
session.refresh_token(app_settings.ESI_TOKEN_URL, refresh_token=self.refresh_token, auth=auth)[
'access_token']
self.created = timezone.Now()
self.save()
except (InvalidGrantError, MissingTokenError):
raise TokenInvalidError()
except InvalidClientError:
raise ImproperlyConfigured('Verify ESI_SSO_CLIENT_ID and ESI_SSO_CLIENT_SECRET settings.')
else:
raise NotRefreshabletokenError()
def whois_search(self, query='', field=''):
"""
Performs a WHOIS search with the given parameters
:param query: the search term
:type str:
:param field: WHOIS field to execute the search on: domain,email,
name,organization,address,phone,nameserver
:type str:
:returns dict using json.loads()
"""
log.debug('Permforming WHOIS search with query: {0} and field: '
'{1}'.format(query, field))
params = {'query': query, 'field': field}
r = requests.get('https://api.passivetotal.org/v2/whois/search',
auth=HTTPBasicAuth(self.username, self.apikey),
proxies=self.proxies, params=params)
if r.status_code == 200:
results = json.loads(r.text)
return results
else:
log.error('HTTP code {0} returned during WHOIS search '
'request.'.format(r.status_code))
return None
def cuckoo_check_if_dup(self, sha256):
"""
Check if file already was analyzed by cuckoo
"""
try:
print("Looking for tasks for: {}".format(sha256))
res = requests.get(urljoin(self.url_base, "/files/view/sha256/{}".format(sha256)),
verify=False,
auth=HTTPBasicAuth(self.api_user,self.api_passwd),
timeout=60)
if res and res.ok and res.status_code == 200:
print("Sample found in SandBox,with ID: {}".format(res.json().get("sample", {}).get("id", 0)))
return True
else:
return False
except Exception as e:
print(e)
return False
def postfile(self, artifact, fileName):
"""
Send a file to Cuckoo
"""
files = {"file": (fileName, open(artifact, "rb").read())}
try:
res = requests.post(urljoin(self.url_base, "tasks/create/file").encode("utf-8"), files=files, auth=HTTPBasicAuth(
self.api_user,
self.api_passwd
),
verify=False)
if res and res.ok:
print("Cuckoo Request: {},Task created with ID: {}".format(res.status_code, res.json()["task_id"]))
else:
print("Cuckoo Request Failed: {}".format(res.status_code))
except Exception as e:
print("Cuckoo Request Failed: {}".format(e))
return
def posturl(self, scanUrl):
"""
Send a URL to Cuckoo
"""
data = {"url": scanUrl}
try:
res = requests.post(urljoin(self.url_base, "tasks/create/url").encode("utf-8"), data=data, res.json()["task_id"]))
else:
print("Cuckoo Request Failed: {}".format(res.status_code))
except Exception as e:
print("Cuckoo Request Failed: {}".format(e))
return
def run(self, args, logger):
response = request('POST',
'https://eu.battle.net/oauth/token',
auth=HTTPBasicAuth(config.API_KEY, config.API_SECRET),
params=dict(grant_type='client_credentials'),
allow_redirects=False)
if response.status_code != 200:
logger.error("Failed to get access token got %s: %s" % (response.status_code, response.content))
return 1
data = response.json()
access_token = data['access_token']
logger.info("writing access_token to %s,expires in %s" % (args.filename, data['expires_in']))
with open(args.filename, 'w') as f:
f.write(access_token)
return 0
def ch138():
ek = 'RZ_CH138_PW'
s = requests.Session()
auth = HTTPBasicAuth('captcha', 'QJc9U6wxD4SFT0u')
url = 'http://captcha.ringzer0team.com:7421'
for _ in xrange(1001):
time.sleep(0.10)
r = s.get('{0}/form1.PHP'.format(url), auth=auth)
m = re.search(r'if \(A == "([a-z0-9]*)"\)', r.text)
captcha = m.group(1)
r = s.get('{0}/captcha/captchabroken.PHP?new'.format(url), auth=auth)
payload = {'captcha':captcha}
r = s.post('{0}/captcha1.PHP'.format(url), auth=auth, data=payload)
doc = lxml.html.document_fromstring(r.text)
alert = doc.xpath('//div[contains(@class,"alert")]')[0]
msg = alert.text_content().strip()
print msg
def __init__(self, host, port, verify):
"""
Initializes the epoRemote with the information for the target ePO instance
:param host: the hostname of the ePO to run remote commands on
:param port: the port of the desired ePO
:param username: the username to run the remote commands as
:param password: the password for the ePO user
:param verify: Whether to verify the ePO server's certificate
"""
logger.debug('Initializing epoRemote for ePO {} on port {} with user {}'.format(host, username))
self._baseurl = 'https://{}:{}/remote'.format(host, port)
self._auth = HTTPBasicAuth(username, password)
self._session = requests.Session()
self._verify = verify
def add_strategy(self, tuple):
strategy = HTTPBasicAuth(*strategy)
key = self._key_from_url(domain)
self.strategies[key] = strategy
def artifactory_repo_present(data):
del data['state']
headers = {
"Content-Type": "application/json"
}
user = data['user']
password = data['password']
del data['user']
del data['password']
url = "{}/{}/{}".format(data['artifactory'], 'api/repositories', data['key'])
result = requests.put(url, json.dumps(data), headers=headers, auth=HTTPBasicAuth(user, password))
if result.status_code == 200:
return False, True, {"status": result.status_code}
elif result.status_code == 400 and 'errors' in result.json():
for errors in result.json()['errors']:
if 'key already exists' in errors['message']:
return False, False, result.json()
# default: something went wrong
Meta = {"status": result.status_code, 'response': result.json()}
return True, Meta
def artifactory_license(data):
headers = {
"Content-Type": "application/json"
}
user = data['user']
password = data['password']
del data['user']
del data['password']
url = "{}/{}".format(data['artifactory'], 'api/system/license')
result = requests.post(url, result.json()
# default: something went wrong
Meta = {"status": result.status_code, Meta
def post(self, data):
"""
Post data to the associated endpoint and await the server's response.
:param data: the data to be posted.
:type data: str or json
"""
auth = None
if self._username is None or self._password is None:
raise ValueError("Username or Password is not set")
else:
auth = HTTPBasicAuth(self._username, self._password)
resp = requests.post(self._endpoint, json=None,
verify=self._verify_ssl, timeout=self._timeout,
auth=auth)
if resp.text == '':
return {"code": resp.status_code, "name": resp.reason, "message": ""}
return resp.text
def overwrite_comment(self, url, body):
""" overwrite comment on the given api url with body """
payload = { "body": body }
res = self.requests.patch(
url,
data = json.dumps(payload),
auth = HTTPBasicAuth(
self.name,
self.token
)
)
res.raise_for_status()
def post_comment(self, body):
""" Add github comment using url endpoint """
payload = { "body": body }
res = self.requests.post(
url,
self.token
)
)
res.raise_for_status()
def _get_access_token(self):
"""Get token and refresh_token from mailup."""
data = {
"grant_type": "password",
"username": self.username,
"password": self.password,
}
response = self.post(
ENDPOINT["token"],
data=data,
auth=HTTPBasicAuth(self.client_id, self.client_secret),
)
data = response_parser(response)
self.refresh_token = data["refresh_token"]
return data["access_token"]
def get_host_id(name):
s = requests.Session()
url = tower_base_url+"hosts/"
headers = {
'content-type': "application/json"
}
querystring = {"name": name}
response = s.request("GET", params=querystring,
auth=HTTPBasicAuth(args.tower_username, args.tower_password))
results = response.json()['results']
if len(results) < 1:
print("No host found with that name.")
sys.exit(1)
elif len(results) > 1:
print("Multiple hosts found with that name,so I won't remove any of them.")
sys.exit(1)
else:
return results[0]['id']
def add_host(host_name, inventory):
s = requests.Session()
url = tower_base_url+"hosts/"
headers = {
'content-type': "application/json"
}
payload = {
"name": host_name,
"enabled": True,
"inventory": inventory
}
s.request("POST", data=json.dumps(payload),
auth=HTTPBasicAuth(args.tower_username, args.tower_password))
def get_jwt_for_registry(auth_url, registry, appname):
# get auth username and password from dockercfg
try:
cfg = auth.resolve_authconfig(auth.load_config(), registry=registry)
username = cfg['username'] if 'username' in cfg else cfg['Username']
password = cfg['password'] if 'password' in cfg else cfg['Password']
# phase,phase_config = get_phase_config_from_registry(registry)
# domain = phase_config.get(user_config.domain_key,'')
# only use `lain.local` as service
url = "%s?service=%s&scope=repository:%s:push,pull&account=%s" % (
auth_url, "lain.local", appname, username)
response = requests.get(url, auth=HTTPBasicAuth(username, password))
if response.status_code < 400 and response.json()['token']:
return response.json()['token']
except Exception as e:
warn("can not load registry auth config : %s,need lain login first." % e)
return ''
def test_oauth_token(self):
"""oauth_token() makes a POST to /oauth/token with the appropriate headers and query params"""
uaac = UAAClient('http://example.com', 'foo', False)
m = Mock()
uaac._request = m
uaac.oauth_token('foo', 'bar', 'baz')
args, kwargs = m.call_args
assert args == ('/oauth/token', 'POST')
assert kwargs['params'] == {
'code': 'foo',
'grant_type': 'authorization_code',
'response_type': 'token'
}
assert isinstance(kwargs['auth'], HTTPBasicAuth)
assert kwargs['auth'].username == 'bar'
assert kwargs['auth'].password == 'baz'
def test_get_client_token(self):
"""_get_client_token() makes a POST to /oauth/token with the appropriate headers and query params"""
uaac = UAAClient('http://example.com', False)
m = Mock()
uaac._request = m
uaac._get_client_token('bar', 'POST')
assert kwargs['params'] == {
'grant_type': 'client_credentials', HTTPBasicAuth)
assert kwargs['auth'].username == 'bar'
assert kwargs['auth'].password == 'baz'
def _get_client_token(self, client_id, client_secret):
""" Returns the client credentials token
Args:
client_id: The oauth client id that this code was generated for
client_secret: The secret for the client_id above
Raises:
UAAError: there was an error getting the token
Returns:
dict: An object representing the token
"""
response = self._request(
'/oauth/token',
'POST',
params={
'grant_type': 'client_credentials',
'response_type': 'token'
},
auth=HTTPBasicAuth(client_id, client_secret)
)
return response.get('access_token', None)
def complete_pair(self, pin):
# The user should have a PIN on the screen Now,pass it in here to complete the pairing process
payload = self._build_json_payload("actRegister",
[{"clientid":self.device_id, "nickname":self.nickname},
[{"value":"no", "function":"WOL"}]])
self.auth = HTTPBasicAuth('',pin) # Going to keep this in the object,just in case we need it again later
r = self.do_POST(url='/sony/accessControl', payload=payload, auth=self.auth)
if r.status_code == 200:
print("have paired")
self.paired = True
# let's call connect again to get the cookies all set up properly
a,b = self.connect()
if b is True:
return r,True
else: return r,False
else:
return None,False
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。