Python httplib 模块,responses() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用httplib.responses()。
def echo_json_response(handler,code,status=None,results=None):
"""Takes a JSON package and returns it to the user w/ full HTTP headers"""
if handler is None or code is None:
return False
if status is None:
status = httplib.responses[code]
if results is None:
results = {}
json_res = {'code': code, 'status': status, 'results' : results}
json_response = json.dumps(json_res)
if isinstance(handler, BaseHTTPRequestHandler):
handler.send_response(code)
handler.send_header('Content-Type', 'application/json')
handler.end_headers()
handler.wfile.write(json_response)
return True
elif isinstance(handler, tornado.web.RequestHandler):
handler.set_status(code)
handler.set_header('Content-Type', 'application/json')
handler.write(json_response)
return True
else:
return False
def handle_event(self, event):
if isinstance(event, h2.events.ResponseReceived):
headers = self.build_http_headers(event.headers)
status_code = int(headers.pop(':status'))
start_line = ResponseStartLine(
'HTTP/2.0', status_code, httplib.responses[status_code]
)
self.headers_received(start_line, headers)
elif isinstance(event, h2.events.DataReceived):
self.data_received(event.data)
elif isinstance(event, h2.events.StreamEnded):
self._stream_ended = True
self.context.remove_stream_delegate(self.stream_id)
if len(self._pushed_responses) == len(self._pushed_streams):
self.finish()
elif isinstance(event, h2.events.PushedStreamReceived):
stream = self.from_push_stream(event)
self._pushed_streams[event.pushed_stream_id] = stream
else:
logger.warning('ignored event: %r,%r', event, event.__dict__)
def handle_event(self, h2.events.ResponseReceived):
headers = self.build_http_headers(event.headers)
status_code = int(headers.pop(':status'))
start_line = httputil.ResponseStartLine(
'HTTP/2.0', event.__dict__)
def __call__(self, environ, start_response):
handler = web.Application.__call__(self, HTTPRequest(environ)) # ????<????>????:
assert handler._finished # ??
status = str(handler._status_code) + " " + \
httplib.responses[handler._status_code]
headers = handler._headers.items()
for cookie_dict in getattr(handler, "_new_cookies", []):
for cookie in cookie_dict.values():
headers.append(("Set-Cookie", cookie.OutputString(None)))
start_response(status, headers)
return handler._write_buffer
def _test_connectivity(self, param, LADS):
"""
Called when the user depresses the test connectivity button on the Phantom UI.
This query returns a list of configured applications
https://api.a10networks.com/api/v2/applications
"""
self.debug_print("%s _test_connectivity %s" % (A10_LADS_Connector.BANNER, param))
msg = "test connectivity to %s status_code: " % (LADS.dashboard)
if LADS.genericGET(uri="/api/v2/applications"):
# True is success
return self.set_status_save_progress(phantom.APP_SUCCESS, msg + "%s %s apps: %s" %
(LADS.status_code, httplib.responses[LADS.status_code], LADS.get_names(LADS.response)))
else:
# None or False,is a failure based on incorrect IP address,username,passords
return self.set_status_save_progress(phantom.APP_ERROR, msg + "%s %s" % (LADS.status_code, LADS.response))
def _test_connectivity(self, param):
"""
Called when the user depresses the test connectivity button on the Phantom UI.
Use a basic query to determine if the IP address,username and password is correct,
curl -k -u admin:redacted -X GET https://192.0.2.1/mgmt/tm/ltm/
"""
self.debug_print("%s TEST_CONNECTIVITY %s" % (F5_Connector.BANNER, param))
config = self.get_config()
host = config.get("device")
F5 = iControl.BIG_IP(host=host,
username=config.get("username"),
password=config.get("password"),
uri="/mgmt/tm/sys/software/image",
method="GET")
msg = "test connectivity to %s status_code: " % host
if F5.genericGET():
# True is success
return self.set_status_save_progress(phantom.APP_SUCCESS, msg + "%s %s" % (F5.status_code, httplib.responses[F5.status_code]))
else:
# None or False, F5.response))
def __write_error(self, error_message=None):
"""Return the HTTP status line and body for a given error code and message.
Args:
status_code: HTTP status code to be returned.
error_message: Error message to be returned.
Returns:
Tuple (http_status,body):
http_status: HTTP status line,e.g. 200 OK.
body: Body of the HTTP request.
"""
if error_message is None:
error_message = httplib.responses[status_code]
status = '%d %s' % (status_code, httplib.responses[status_code])
message = EndpointsErrorMessage(
state=EndpointsErrorMessage.State.APPLICATION_ERROR,
error_message=error_message)
return status, self.__PROTOJSON.encode_message(message)
def __init__(self, *args, **kwargs):
self.pushed_responses = kwargs.pop('pushed_responses', [])
self.new_request = kwargs.pop('new_request', None)
super(HTTP2Response, self).__init__(*args, **kwargs)
reason = kwargs.pop('reason', None)
self.reason = reason or httputil.responses.get(self.code, "UnkNown")
def write_error(self, **kwargs):
self.render("pages/error.html", message=httplib.responses[status_code], error=status_code)
def write_error(self, **kwargs):
try:
req_resp = stats.request(str(get_ip(self.request)))
except:
error("Errored while handling request IP -- still served...")
self.render("pages/error.html", error=status_code)
def build_response(self, result):
data = {
'result': {
'status': self.status,
},
}
if self.status:
data['result']['value'] = result
body = json.dumps(data)
headers = Headers(SUCCESSFUL_HEADERS)
response = MockResponse(b'HTTP/1.1', self.response_code, httplib.responses[self.response_code], headers, body)
return response
def test_responses(self):
self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
def http_open(self, req):
import mimetools, copy
from StringIO import StringIO
self.requests.append(copy.deepcopy(req))
if self._count == 0:
self._count = self._count + 1
name = httplib.responses[self.code]
msg = mimetools.Message(StringIO(self.headers))
return self.parent.error(
"http", req, MockFile(), self.code, name, msg)
else:
self.req = req
msg = mimetools.Message(StringIO("\r\n\r\n"))
return MockResponse(200, "OK", msg, "", req.get_full_url())
def test_responses(self):
self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
def wasLastResponseDelayed():
"""
Returns True if the last web request resulted in a time-delay
"""
# 99.9999999997440% of all non time-based sql injection affected
# response times should be inside +-7*stdev([normal response times])
# Math reference: http://www.answers.com/topic/standard-deviation
deviation = stdev(kb.responseTimes.get(kb.responseTimeMode, []))
threadData = getCurrentThreadData()
if deviation and not conf.direct and not conf.disableStats:
if len(kb.responseTimes[kb.responseTimeMode]) < MIN_TIME_RESPONSES:
warnMsg = "time-based standard deviation method used on a model "
warnMsg += "with less than %d response times" % MIN_TIME_RESPONSES
logger.warn(warnMsg)
lowerStdLimit = average(kb.responseTimes[kb.responseTimeMode]) + TIME_STDEV_COEFF * deviation
retVal = (threadData.lastQueryDuration >= max(MIN_VALID_DELAYED_RESPONSE, lowerStdLimit))
if not kb.testMode and retVal:
if kb.adjustTimeDelay is None:
msg = "do you want sqlmap to try to optimize value(s) "
msg += "for DBMS delay responses (option '--time-sec')? [Y/n] "
kb.adjustTimeDelay = ADJUST_TIME_DELAY.disABLE if not readInput(msg, default='Y', boolean=True) else ADJUST_TIME_DELAY.YES
if kb.adjustTimeDelay is ADJUST_TIME_DELAY.YES:
adjustTimeDelay(threadData.lastQueryDuration, lowerStdLimit)
return retVal
else:
delta = threadData.lastQueryDuration - conf.timeSec
if Backend.getIdentifiedDbms() in (DBMS.MysqL,): # MysqL's SLEEP(X) lasts 0.05 seconds shorter on average
delta += 0.05
return delta >= 0
def showHttpErrorCodes():
"""
Shows all HTTP error codes raised till Now
"""
if kb.httpErrorCodes:
warnMsg = "HTTP error codes detected during run:\n"
warnMsg += ",".join("%d (%s) - %d times" % (code, httplib.responses[code] \
if code in httplib.responses else '?', count) \
for code, count in kb.httpErrorCodes.items())
logger.warn(warnMsg)
if any((str(_).startswith('4') or str(_).startswith('5')) and _ != httplib.INTERNAL_SERVER_ERROR and _ != kb.originalCode for _ in kb.httpErrorCodes.keys()):
msg = "too many 4xx and/or 5xx HTTP error codes "
msg += "Could mean that some kind of protection is involved (e.g. WAF)"
logger.debug(msg)
def get_error_html(self, **kwargs):
if status_code in [404, 500, 503, 403]:
filename = os.path.join(os.path.join(getsettings('BASE_DIR'),'templates'), '%d.html' % status_code)
if os.path.exists(filename):
with io.open(filename, 'r') as f:
data = f.read()
return data
import httplib
return "<html><title>%(code)d: %(message)s</title>" \
"<body class='bodyErrorPage'>%(code)d: %(message)s</body></html>" % {
"code": status_code,
"message": httplib.responses[status_code],
}
def __call__(self, start_response):
handler = web.Application.__call__(self, HTTPRequest(environ))
assert handler._finished
status = str(handler._status_code) + " " + \
httplib.responses[handler._status_code]
headers = handler._headers.items()
for cookie_dict in getattr(handler, cookie.OutputString(None)))
start_response(status,
[(native_str(k), native_str(v)) for (k,v) in headers])
return handler._write_buffer
def set_status(self, status_code):
"""Sets the status code for our response."""
assert status_code in httplib.responses
self._status_code = status_code
def _generate_headers(self):
lines = [utf8(self.request.version + " " +
str(self._status_code) +
" " + httplib.responses[self._status_code])]
lines.extend([(utf8(n) + b(": ") + utf8(v)) for n, v in
itertools.chain(self._headers.iteritems(), self._list_headers)])
for cookie_dict in getattr(self, []):
for cookie in cookie_dict.values():
lines.append(utf8("Set-Cookie: " + cookie.OutputString(None)))
return b("\r\n").join(lines) + b("\r\n\r\n")
def _handle_request_exception(self, e):
if isinstance(e, HTTPError):
if e.log_message:
format = "%d %s: " + e.log_message
args = [e.status_code, self._request_summary()] + list(e.args)
logging.warning(format, *args)
if e.status_code not in httplib.responses:
logging.error("Bad HTTP status code: %d", e.status_code)
self.send_error(500, exc_info=sys.exc_info())
else:
self.send_error(e.status_code, exc_info=sys.exc_info())
else:
logging.error("Uncaught exception %s\n%r", self._request_summary(),
self.request, exc_info=True)
self.send_error(500, exc_info=sys.exc_info())
def assert_Meta(self, Meta):
"""
Verify that the response Metadata is correct.
:param Meta: the response Meta object
"""
self.assert_equal(Meta.user_xid, self.test_user['xid'])
self.assert_equal(Meta.message, httplib.responses[httplib.OK])
self.assert_equal(Meta.code, httplib.OK)
self.assert_datetime_within(Meta.time, window=11)
def add_response_codes(self, status_dict):
for code, info in status_dict.items():
swagger_rsp = self.responses.setdefault(code, {})
if not info['reason']:
try:
code = int(code)
info['reason'] = http_client.responses[code]
except (KeyError, TypeError, ValueError):
info['reason'] = 'UnkNown'
tokens = info['description'].split(maxsplit=2)
if tokens:
tokens[0] = tokens[0].title()
swagger_rsp['description'] = '{}\n\n{}'.format(
info['reason'], ' '.join(tokens)).strip()
def generate_swagger(self):
swagger = {'summary': self.summary, 'description': self.description}
if self.parameters:
swagger['parameters'] = self.parameters
if self.responses:
swagger['responses'] = self.responses
else: # swagger requires at least one response
swagger['responses'] = {'default': {'description': ''}}
# figure out where to put the response schema and response
# header details. This is probably going to change in the
# future since it is `hinky' at best.
default_code = 'default'
status_codes = sorted(int(code)
for code in swagger['responses']
if code.isdigit())
for code in status_codes:
if 200 <= code < 400:
default_code = str(code)
break
if default_code in swagger['responses']:
if self.default_response_schema:
swagger['responses'][default_code]['schema'] = \
self.default_response_schema
if self.response_headers:
swagger['responses'][default_code]['headers'] = \
self.response_headers
return swagger
def set_status(self, status_code):
"""Sets the status code for our response."""
assert status_code in httplib.responses # ?? assert ?? ?????,???,????
self._status_code = status_code
# ?? HTTP???
# ?? value ??,? ??????
def get_error_html(self, **kwargs):
"""Override to implement custom error pages.
If this error was caused by an uncaught exception,the
exception object can be found in kwargs e.g. kwargs['exception']
"""
return "<html><title>%(code)d: %(message)s</title>" \
"<body>%(code)d: %(message)s</body></html>" % {
"code": status_code,
}
# ????: ?????? ???-?? (?????)
#
def _generate_headers(self):
lines = [self.request.version + " " + str(self._status_code) + " " +
httplib.responses[self._status_code]]
lines.extend(["%s: %s" % (n, v) for n, v in self._headers.iteritems()])
for cookie_dict in getattr(self, []):
for cookie in cookie_dict.values():
lines.append("Set-Cookie: " + cookie.OutputString(None))
return "\r\n".join(lines) + "\r\n\r\n"
# ??????
def _test_connectivity(self, param):
"""
Called when the user depresses the test connectivity button on the Phantom UI.
"""
self.debug_print("%s TEST_CONNECTIVITY %s" % (Tower_Connector.BANNER, param))
status_code = self.aaaLogin()
msg = "Test connectivity to %s,status_code: %s %s" % (self.tower_instance, httplib.responses[status_code])
if status_code == requests.codes.ok: # evaluates True for good status (e.g. 200)
return self.set_status_save_progress(phantom.APP_SUCCESS, msg)
else:
return self.set_status_save_progress(phantom.APP_ERROR, msg)
def launch_job(self, job_template_id):
"""launch the job specified by its job_template_id
"""
self.debug_print("%s LAUNCH_JOB parameters:\n%s\n%s" % (Tower_Connector.BANNER, job_template_id))
action_result = ActionResult(dict(param)) # Add an action result to the App Run
self.add_action_result(action_result)
URI = "https://%s/api/v1/job_templates/%s/launch/" % (self.tower_instance, job_template_id)
header = self.HEADER
header["Authorization"] = "Token %s" % self.token
# Ansible 3.0,Prompt on launch must be specified in the job template on the Tower GUI
# On the Phantom GUI specify as follows: "malicIoUs_ip=203.0.113.10,foo=bar" with no leading spaces
try:
data = dict(extra_vars='%s' % dict((e.split('=') for e in param["extra vars"].split(','))))
except KeyError:
data = {} # extra vars is optional
try:
r = requests.post(URI, headers=header, data=json.dumps(data), verify=False)
except requests.ConnectionError as e:
self.set_status_save_progress(phantom.APP_ERROR, str(e))
return "ConnectionError"
if r.status_code in Tower_Connector.GOOD_LAUNCH_STATUS:
self.job_id = r.json()['job']
return httplib.responses[r.status_code]
def __call__(self,v) in headers])
return handler._write_buffer
def set_status(self, status_code):
"""Sets the status code for our response."""
assert status_code in httplib.responses
self._status_code = status_code
def _generate_headers(self):
lines = [utf8(self.request.version + " " +
str(self._status_code) +
" " + httplib.responses[self._status_code])]
lines.extend([(utf8(n) + b(": ") + utf8(v)) for n, []):
for cookie in cookie_dict.values():
lines.append(utf8("Set-Cookie: " + cookie.OutputString(None)))
return b("\r\n").join(lines) + b("\r\n\r\n")
def _handle_request_exception(self, exc_info=sys.exc_info())
def wasLastResponseDelayed():
"""
Returns True if the last web request resulted in a time-delay
"""
# 99.9999999997440% of all non time-based sql injection affected
# response times should be inside +-7*stdev([normal response times])
# Math reference: http://www.answers.com/topic/standard-deviation
deviation = stdev(kb.responseTimes)
threadData = getCurrentThreadData()
if deviation and not conf.direct:
if len(kb.responseTimes) < MIN_TIME_RESPONSES:
warnMsg = "time-based standard deviation method used on a model "
warnMsg += "with less than %d response times" % MIN_TIME_RESPONSES
logger.warn(warnMsg)
lowerStdLimit = average(kb.responseTimes) + TIME_STDEV_COEFF * deviation
retVal = (threadData.lastQueryDuration >= max(MIN_VALID_DELAYED_RESPONSE, lowerStdLimit))
if not kb.testMode and retVal:
if kb.adjustTimeDelay is None:
msg = "do you want sqlmap to try to optimize value(s) "
msg += "for DBMS delay responses (option '--time-sec')? [Y/n] "
choice = readInput(msg, default='Y')
kb.adjustTimeDelay = ADJUST_TIME_DELAY.disABLE if choice.upper() == 'N' else ADJUST_TIME_DELAY.YES
if kb.adjustTimeDelay is ADJUST_TIME_DELAY.YES:
adjustTimeDelay(threadData.lastQueryDuration, lowerStdLimit)
return retVal
else:
return (threadData.lastQueryDuration - conf.timeSec) >= 0
def showHttpErrorCodes():
"""
Shows all HTTP error codes raised till Now
"""
if kb.httpErrorCodes:
warnMsg = "HTTP error codes detected during run:\n"
warnMsg += ", count in kb.httpErrorCodes.items())
logger.warn(warnMsg)
if any((str(_).startswith('4') or str(_).startswith('5')) and _ != httplib.INTERNAL_SERVER_ERROR and _ != kb.originalCode for _ in kb.httpErrorCodes.keys()):
msg = "too many 4xx and/or 5xx HTTP error codes "
msg += "Could mean that some kind of protection is involved (e.g. WAF)"
logger.debug(msg)
def test_responses(self):
self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
def test_responses(self):
self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
def wasLastResponseDelayed():
"""
Returns True if the last web request resulted in a time-delay
"""
# 99.9999999997440% of all non time-based sql injection affected
# response times should be inside +-7*stdev([normal response times])
# Math reference: http://www.answers.com/topic/standard-deviation
deviation = stdev(kb.responseTimes.get(kb.responseTimeMode, []))
threadData = getCurrentThreadData()
if deviation and not conf.direct:
if len(kb.responseTimes[kb.responseTimeMode]) < MIN_TIME_RESPONSES:
warnMsg = "time-based standard deviation method used on a model "
warnMsg += "with less than %d response times" % MIN_TIME_RESPONSES
logger.warn(warnMsg)
lowerStdLimit = average(kb.responseTimes[kb.responseTimeMode]) + TIME_STDEV_COEFF * deviation
retVal = (threadData.lastQueryDuration >= max(MIN_VALID_DELAYED_RESPONSE, lowerStdLimit)
return retVal
else:
return (threadData.lastQueryDuration - conf.timeSec) >= 0
def showHttpErrorCodes():
"""
Shows all HTTP error codes raised till Now
"""
if kb.httpErrorCodes:
warnMsg = "HTTP error codes detected during run:\n"
warnMsg += ", count in kb.httpErrorCodes.items())
logger.warn(warnMsg)
if any((str(_).startswith('4') or str(_).startswith('5')) and _ != httplib.INTERNAL_SERVER_ERROR and _ != kb.originalCode for _ in kb.httpErrorCodes.keys()):
msg = "too many 4xx and/or 5xx HTTP error codes "
msg += "Could mean that some kind of protection is involved (e.g. WAF)"
logger.debug(msg)
def __call__(self, start_response):
with self._lock:
app = self._app
error = self._error
if app:
return app(environ, start_response)
else:
start_response('%d %s' % (error, httplib.responses[error]), [])
return []
def _redirect_302_path_info(self, updated_path_info, start_response):
"""Redirect to an updated path.
Respond to the current request with a 302 Found status with an updated path
but preserving the rest of the request.
Notes:
- Wsgi does not make the fragment available so we are not able to preserve
it. Luckily prod does not preserve the fragment so it works out.
Args:
updated_path_info: the new HTTP path to redirect to.
environ: Wsgi environ object.
start_response: Wsgi start response callable.
Returns:
Wsgi-compatible iterable object representing the body of the response.
"""
correct_url = urlparse.urlunsplit(
(environ['wsgi.url_scheme'],
environ['HTTP_HOST'],
urllib.quote(updated_path_info),
self._quote_querystring(environ['QUERY_STRING']),
None))
content_type = 'text/html; charset=utf-8'
output = _REDIRECT_HTML % {
'content-type': content_type,
'status': httplib.FOUND,
'correct-url': correct_url
}
start_response('%d %s' % (httplib.FOUND, httplib.responses[httplib.FOUND]),
[('Content-Type', content_type),
('Location', correct_url),
('Content-Length', str(len(output)))])
return output
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。