微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python urllib.error 模块-code() 实例源码

Python urllib.error 模块,code() 实例源码

我们从Python开源项目中,提取了以下10代码示例,用于说明如何使用urllib.error.code()

项目:gaffer-tools    作者:gchq    | 项目源码 | 文件源码
def execute_get(self, operation, headers={}):
        url = self._host + operation.get_url()
        headers['Content-Type'] = 'application/json;charset=utf-8'
        request = urllib.request.Request(url, headers=headers)

        try:
            response = self._opener.open(request)
        except urllib.error.HTTPError as error:
            error_body = error.read().decode('utf-8')
            new_error_string = ('HTTP error ' +
                                str(error.code) + ' ' +
                                error.reason + ': ' +
                                error_body)
            raise ConnectionError(new_error_string)

        return response.read().decode('utf-8')
项目:gaffer-tools    作者:gchq    | 项目源码 | 文件源码
def is_operation_supported(self, operation=None, headers={}):
        url = self._host + '/graph/operations/' + operation.get_operation()
        headers['Content-Type'] = 'application/json;charset=utf-8'

        request = urllib.request.Request(url, headers=headers)

        try:
            response = self._opener.open(request)
        except urllib.error.HTTPError as error:
            error_body = error.read().decode('utf-8')
            new_error_string = ('HTTP error ' +
                                str(error.code) + ' ' +
                                error.reason + ': ' +
                                error_body)
            raise ConnectionError(new_error_string)

        response_text = response.read().decode('utf-8')

        return response_text
项目:stewie    作者:nnewman    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()

    parser.add_argument('-q', '--term', dest='term', default=DEFAULT_TERM,
                        type=str, help='Search term (default: %(default)s)')
    parser.add_argument('-l', '--location', dest='location',
                        default=DEFAULT_LOCATION, type=str,
                        help='Search location (default: %(default)s)')

    input_values = parser.parse_args()

    try:
        query_api(input_values.term, input_values.location)
    except urllib.error.HTTPError as error:
        sys.exit(
                'Encountered HTTP error {0}. Abort '
                'program.'.format(error.code))
项目:lib9    作者:Jumpscale    | 项目源码 | 文件源码
def getDetailedBinaryLog(self, module, starttime, stoptime, filename):
        url = self.getDetailed_url % {
            "addr": module, "start": starttime, "stop": stoptime}

        if isinstance(self.urlopener, urllib.request.OpenerDirector):
            url_f = self.urlopener.open(url, timeout=30)
        else:
            url_f = self.urlopener.open(url)

        if url_f.code != 200:
            url_f.close()
            raise j.exceptions.RuntimeError(
                "Server fault code %s" % (url_f.code))
        else:
            with open(filename, 'wb') as fp:
                while True:
                    chunk = url_f.read(65536)
                    if not chunk:
                        break
                    fp.write(chunk)
            url_f.close()
项目:py-search    作者:vieira-rafael    | 项目源码 | 文件源码
def defaultPrintAnswerOut(code,additional):
项目:py-search    作者:vieira-rafael    | 项目源码 | 文件源码
def http_error_301(self, req, fp, code, msg, headers):
 self.preProcessingRedirection(req, headers)     result = super(SmartRedirectHandler, self).http_error_301(req, headers) self.postProcessingRedirection(result)
 return result  def http_error_302(self, headers): self.preProcessingRedirection(req, headers)       result = super(SmartRedirectHandler, self).http_error_302(req, headers) self.postProcessingRedirection(result) return result
 def preProcessingRedirection(self, headers):       location = '' for i in headers._headers: if i[0] == 'Location':             location = i[1].strip()         req.add_header('Host',urlparse(location).netloc)
        printAnswer(code, str(msg) + " " + location)        printHeaders(headers._headers,'Set-Cookie')
项目:py-search    作者:vieira-rafael    | 项目源码 | 文件源码
def requestC(opener,url, headers, data, method = 'POST'):   [answer, code] = requestB(opener, method)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_includes_code_and_msg(self):
        def raise_error():
            raise UCSM_XML_API_Error('bad', 4224)

        error = self.assertRaises(UCSM_XML_API_Error, raise_error)

        self.assertEqual('bad', error.args[0])
        self.assertEqual(4224, error.code)
项目:lib9    作者:Jumpscale    | 项目源码 | 文件源码
def _send_data(self, url):
        if isinstance(self.urlopener, timeout=30)
        else:
            url_f = self.urlopener.open(url)
        data = url_f.read()
        url_f.close()
        if url_f.code != 200:
            raise j.exceptions.RuntimeError(
                "Server fault code %s message %s" % (url_f.code, data))
        return data
项目:gaffer-tools    作者:gchq    | 项目源码 | 文件源码
def execute_operation_chain(self, operation_chain, headers={}):
        """
        This method queries Gaffer with the provided operation chain.
        """

        # Construct the full URL path to the Gaffer server
        url = self._host + '/graph/operations/execute'

        if hasattr(operation_chain, "to_json"):
            op_chain_json_obj = operation_chain.to_json()
        else:
            op_chain_json_obj = operation_chain

        # Query Gaffer
        if self._verbose:
            print('\nQuery operations:\n' +
                  json.dumps(op_chain_json_obj, indent=4) + '\n')

        # Convert the query dictionary into JSON and post the query to Gaffer
        json_body = bytes(json.dumps(op_chain_json_obj), 'ascii')
        headers['Content-Type'] = 'application/json;charset=utf-8'

        request = urllib.request.Request(url, headers=headers, data=json_body)

        try:
            response = self._opener.open(request)
        except urllib.error.HTTPError as error:
            error_body = error.read().decode('utf-8')
            new_error_string = ('HTTP error ' +
                                str(error.code) + ' ' +
                                error.reason + ': ' +
                                error_body)
            raise ConnectionError(new_error_string)
        response_text = response.read().decode('utf-8')

        if self._verbose:
            print('Query response: ' + response_text)

        if response_text is not None and response_text is not '':
            result = json.loads(response_text)
        else:
            result = None

        return g.JsonConverter.from_json(result)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐