Python codecs 模块,register_error() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.register_error()。
def error_handler(error):
"""Error handler for surrogateescape decoding.
Should be used with an ASCII-compatible encoding (e.g.,'latin-1' or 'utf-8').
Replaces any invalid byte sequences with surrogate code points.
As specified in
https://docs.python.org/2/library/codecs.html#codecs.register_error.
"""
# We can't use this with UnicodeEncodeError; the UTF-8 encoder doesn't raise
# an error for surrogates. Instead,use encode.
if not isinstance(error, UnicodeDecodeError):
raise error
result = []
for i in range(error.start, error.end):
byte = ord(error.object[i])
if byte < 128:
raise error
result.append(unichr(0xdc00 + byte))
return ''.join(result), error.end
def test_xmlcharnamereplace(self):
# This time use a named character entity for unencodable
# characters,if one is available.
def xmlcharnamereplace(exc):
if not isinstance(exc, UnicodeEncodeError):
raise TypeError("don't kNow how to handle %r" % exc)
l = []
for c in exc.object[exc.start:exc.end]:
try:
l.append("&%s;" % html.entities.codepoint2name[ord(c)])
except KeyError:
l.append("&#%d;" % ord(c))
return ("".join(l), exc.end)
codecs.register_error(
"test.xmlcharnamereplace", xmlcharnamereplace)
sin = "\xab\u211c\xbb = \u2329\u1234\u20ac\u232a"
sout = b"«ℜ» = ⟨ሴ€⟩"
self.assertEqual(sin.encode("ascii", "test.xmlcharnamereplace"), sout)
sout = b"\xabℜ\xbb = ⟨ሴ€⟩"
self.assertEqual(sin.encode("latin-1", sout)
sout = b"\xabℜ\xbb = ⟨ሴ\xa4⟩"
self.assertEqual(sin.encode("iso-8859-15", sout)
def test_decoding_callbacks(self):
# This is a test for a decoding callback handler
# that allows the decoding of the invalid sequence
# "\xc0\x80" and returns "\x00" instead of raising an error.
# All other illegal sequences will be handled strictly.
def relaxedutf8(exc):
if not isinstance(exc, UnicodeDecodeError):
raise TypeError("don't kNow how to handle %r" % exc)
if exc.object[exc.start:exc.start+2] == b"\xc0\x80":
return ("\x00", exc.start+2) # retry after two bytes
else:
raise exc
codecs.register_error("test.relaxedutf8", relaxedutf8)
# all the "\xc0\x80" will be decoded to "\x00"
sin = b"a\x00b\xc0\x80c\xc3\xbc\xc0\x80\xc0\x80"
sout = "a\x00b\x00c\xfc\x00\x00"
self.assertEqual(sin.decode("utf-8", "test.relaxedutf8"), sout)
# "\xc0\x81" is not valid and a UnicodeDecodeError will be raised
sin = b"\xc0\x80\xc0\x81"
self.assertRaises(UnicodeDecodeError, sin.decode,
"utf-8", "test.relaxedutf8")
def test_longstrings(self):
# test long strings to check for memory overflow problems
errors = [ "strict", "ignore", "replace", "xmlcharrefreplace",
"backslashreplace"]
# register the handlers under different names,
# to prevent the codec from recognizing the name
for err in errors:
codecs.register_error("test." + err, codecs.lookup_error(err))
l = 1000
errors += [ "test." + err for err in errors ]
for uni in [ s*l for s in ("x", "\u3042", "a\xe4") ]:
for enc in ("ascii", "latin-1", "iso-8859-1", "iso-8859-15",
"utf-8", "utf-7", "utf-16", "utf-32"):
for err in errors:
try:
uni.encode(enc, err)
except UnicodeError:
pass
def test_badhandlerresults(self):
results = ( 42, "foo", (1,2,3), ("foo", 1, 3), None),),) )
encs = ("ascii", "iso-8859-15")
for res in results:
codecs.register_error("test.badhandler", lambda x: res)
for enc in encs:
self.assertRaises(
TypeError,
"\u3042".encode,
enc,
"test.badhandler"
)
for (enc, bytes) in (
("ascii", b"\xff"),
("utf-8",
("utf-7", b"+x-"),
("unicode-internal", b"\x00"),
):
self.assertRaises(
TypeError,
bytes.decode,
"test.badhandler"
)
def test_incrementalencoder_error_callback(self):
inv = self.unmappedunicode
e = self.incrementalencoder()
self.assertRaises(UnicodeEncodeError, e.encode, inv, True)
e.errors = 'ignore'
self.assertEqual(e.encode(inv, True), b'')
e.reset()
def tempreplace(exc):
return ('called', exc.end)
codecs.register_error('test.incremental_error_callback', tempreplace)
e.errors = 'test.incremental_error_callback'
self.assertEqual(e.encode(inv, b'called')
# again
e.errors = 'ignore'
self.assertEqual(e.encode(inv, b'')
def test_all(self):
api = (
"encode", "decode",
"register", "CodecInfo", "Codec", "IncrementalEncoder",
"IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
"getencoder", "getdecoder", "getincrementalencoder",
"getincrementaldecoder", "getreader", "getwriter",
"register_error", "lookup_error",
"strict_errors", "replace_errors", "ignore_errors",
"xmlcharrefreplace_errors", "backslashreplace_errors",
"open", "EncodedFile",
"iterencode", "iterdecode",
"BOM", "BOM_BE", "BOM_LE",
"BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
"BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
"BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented
"StreamReaderWriter", "StreamRecoder",
)
self.assertEqual(sorted(api), sorted(codecs.__all__))
for api in codecs.__all__:
getattr(codecs, api)
def test_xmlcharnamereplace(self):
# This time use a named character entity for unencodable
# characters, UnicodeEncodeError):
raise TypeError("don't kNow how to handle %r" % exc)
l = []
for c in exc.object[exc.start:exc.end]:
try:
l.append(u"&%s;" % htmlentitydefs.codepoint2name[ord(c)])
except KeyError:
l.append(u"&#%d;" % ord(c))
return (u"".join(l), xmlcharnamereplace)
sin = u"\xab\u211c\xbb = \u2329\u1234\u20ac\u232a"
sout = "«ℜ» = ⟨ሴ€⟩"
self.assertEqual(sin.encode("ascii", sout)
sout = "\xabℜ\xbb = ⟨ሴ€⟩"
self.assertEqual(sin.encode("latin-1", sout)
sout = "\xabℜ\xbb = ⟨ሴ\xa4⟩"
self.assertEqual(sin.encode("iso-8859-15", sout)
def test_decoding_callbacks(self):
# This is a test for a decoding callback handler
# that allows the decoding of the invalid sequence
# "\xc0\x80" and returns "\x00" instead of raising an error.
# All other illegal sequences will be handled strictly.
def relaxedutf8(exc):
if not isinstance(exc, UnicodeDecodeError):
raise TypeError("don't kNow how to handle %r" % exc)
if exc.object[exc.start:exc.start+2] == "\xc0\x80":
return (u"\x00", relaxedutf8)
# all the "\xc0\x80" will be decoded to "\x00"
sin = "a\x00b\xc0\x80c\xc3\xbc\xc0\x80\xc0\x80"
sout = u"a\x00b\x00c\xfc\x00\x00"
self.assertEqual(sin.decode("utf-8", sout)
# "\xc0\x81" is not valid and a UnicodeDecodeError will be raised
sin = "\xc0\x80\xc0\x81"
self.assertRaises(UnicodeDecodeError, "test.relaxedutf8")
def test_longstrings(self):
# test long strings to check for memory overflow problems
errors = [ "strict", codecs.lookup_error(err))
l = 1000
errors += [ "test." + err for err in errors ]
for uni in [ s*l for s in (u"x", u"\u3042", u"a\xe4") ]:
for enc in ("ascii", err)
except UnicodeError:
pass
def test_badhandlerresults(self):
results = ( 42, u"foo", (u"foo",
u"\u3042".encode, "\xff"), "+x-"), "\x00"),
"test.badhandler"
)
def test_incrementalencoder_error_callback(self):
inv = self.unmappedunicode
e = self.incrementalencoder()
self.assertRaises(UnicodeEncodeError, '')
e.reset()
def tempreplace(exc):
return (u'called', 'called')
# again
e.errors = 'ignore'
self.assertEqual(e.encode(inv, '')
def test_decoding_callbacks(self):
# This is a test for a decoding callback handler
# that allows the decoding of the invalid sequence
# "\xc0\x80" and returns "\x00" instead of raising an error.
# All other illegal sequences will be handled strictly.
def relaxedutf8(exc):
if not isinstance(exc, "test.relaxedutf8")
def test_longstrings(self):
# test long strings to check for memory overflow problems
errors = [ "strict", err)
except UnicodeError:
pass
def test_badhandlerresults(self):
results = ( 42,
"test.badhandler"
)
def test_incrementalencoder_error_callback(self):
inv = self.unmappedunicode
e = self.incrementalencoder()
self.assertRaises(UnicodeEncodeError, '')
def test_decoding_callbacks(self):
# This is a test for a decoding callback handler
# that allows the decoding of the invalid sequence
# "\xc0\x80" and returns "\x00" instead of raising an error.
# All other illegal sequences will be handled strictly.
def relaxedutf8(exc):
if not isinstance(exc, "test.relaxedutf8")
def test_longstrings(self):
# test long strings to check for memory overflow problems
errors = [ "strict", err)
except UnicodeError:
pass
def test_decoding_callbacks(self):
# This is a test for a decoding callback handler
# that allows the decoding of the invalid sequence
# "\xc0\x80" and returns "\x00" instead of raising an error.
# All other illegal sequences will be handled strictly.
def relaxedutf8(exc):
if not isinstance(exc, "test.relaxedutf8")
def test_longstrings(self):
# test long strings to check for memory overflow problems
errors = [ "strict", err)
except UnicodeError:
pass
def test_badhandlerresults(self):
results = ( 42,
"test.badhandler"
)
def test_incrementalencoder_error_callback(self):
inv = self.unmappedunicode
e = self.incrementalencoder()
self.assertRaises(UnicodeEncodeError, '')
def test_all(self):
api = (
"encode",
)
self.assertCountEqual(api, codecs.__all__)
for api in codecs.__all__:
getattr(codecs, api)
def test_decoding_callbacks(self):
# This is a test for a decoding callback handler
# that allows the decoding of the invalid sequence
# "\xc0\x80" and returns "\x00" instead of raising an error.
# All other illegal sequences will be handled strictly.
def relaxedutf8(exc):
if not isinstance(exc, "test.relaxedutf8")
def test_longstrings(self):
# test long strings to check for memory overflow problems
errors = [ "strict", err)
except UnicodeError:
pass
def test_incrementalencoder_error_callback(self):
inv = self.unmappedunicode
e = self.incrementalencoder()
self.assertRaises(UnicodeEncodeError, b'')
def test_decoding_callbacks(self):
# This is a test for a decoding callback handler
# that allows the decoding of the invalid sequence
# "\xc0\x80" and returns "\x00" instead of raising an error.
# All other illegal sequences will be handled strictly.
def relaxedutf8(exc):
if not isinstance(exc, "test.relaxedutf8")
def test_longstrings(self):
# test long strings to check for memory overflow problems
errors = [ "strict", err)
except UnicodeError:
pass
def test_badhandlerresults(self):
results = ( 42,
"test.badhandler"
)
def test_incrementalencoder_error_callback(self):
inv = self.unmappedunicode
e = self.incrementalencoder()
self.assertRaises(UnicodeEncodeError, '')
def test_decoding_callbacks(self):
# This is a test for a decoding callback handler
# that allows the decoding of the invalid sequence
# "\xc0\x80" and returns "\x00" instead of raising an error.
# All other illegal sequences will be handled strictly.
def relaxedutf8(exc):
if not isinstance(exc, "test.relaxedutf8")
def test_longstrings(self):
# test long strings to check for memory overflow problems
errors = [ "strict", err)
except UnicodeError:
pass
def __init__(self, name, base_encoding, mapping):
self.name = name
self.base_encoding = base_encoding
self.mapping = mapping
self.reverse = {v:k for k,v in mapping.items()}
self.max_len = max(len(v) for v in mapping.values())
self.info = codecs.CodecInfo(name=self.name, encode=self.encode, decode=self.decode)
codecs.register_error(name, self.error)
def encode(self, input, errors='strict'):
assert errors == 'strict'
#return codecs.encode(input,self.base_encoding,self.name),len(input)
# The above line Could totally be all we needed,relying on the error
# handling to replace the unencodable Unicode characters with our extended
# byte sequences.
#
# However,there seems to be a design bug in Python (probably intentional):
# the error handler for encoding is supposed to return a **Unicode** character,
# that then needs to be encodable itself... Ugh.
#
# So we implement what codecs.encode() should have been doing: which is expect
# error handler to return bytes() to be added to the output.
#
# This seems to have been fixed in Python 3.3. We should try using that and
# use fallback only if that Failed.
# https://docs.python.org/3.3/library/codecs.html#codecs.register_error
length = len(input)
out = b''
while input:
try:
part = codecs.encode(input, self.base_encoding)
out += part
input = '' # All converted
except UnicodeEncodeError as e:
# Convert the correct part
out += codecs.encode(input[:e.start], self.base_encoding)
replacement, pos = self.error(e)
out += replacement
input = input[pos:]
return out, length
def register_strwidth_error(strwidth):
'''Create new encode errors handling method similar to ``replace``
Like ``replace`` this method uses question marks in place of the characters
that cannot be represented in the requested encoding. Unlike ``replace`` the
amount of question marks is identical to the amount of display cells
offending character occupies. Thus encoding ``…`` (U+2026,HORIZONTAL
ELLIPSIS) to ``latin1`` will emit one question mark,but encoding ``?``
(U+FF21,FULLWIDTH LATIN CAPITAL LETTER A) will emit two question marks.
Since width of some characters depends on the terminal settings and
powerline kNows how to respect them a single error handling method cannot be
used. Instead of it the generator function is used which takes ``strwidth``
function (function that kNows how to compute string width respecting all
needed settings) and emits new error handling method name.
:param function strwidth:
Function that computs string width measured in display cells the string
occupies when displayed.
:return: New error handling method name.
'''
global last_swe_idx
last_swe_idx += 1
def powerline_encode_strwidth_error(e):
if not isinstance(e, UnicodeEncodeError):
raise NotImplementedError
return ('?' * strwidth(e.object[e.start:e.end]), e.end)
ename = 'powerline_encode_strwidth_error_{0}'.format(last_swe_idx)
codecs.register_error(ename, powerline_encode_strwidth_error)
return ename
def create_fb_format(lines_file, convo_file, outpath):
print('[building fbformat]')
ftrain = open(os.path.join(outpath, 'train.txt'), 'w')
fvalid = open(os.path.join(outpath, 'valid.txt'), 'w')
ftest = open(os.path.join(outpath, 'test.txt'), 'w')
lines = {}
codecs.register_error('strict', codecs.ignore_errors)
with codecs.open(lines_file, 'r') as f:
for line in f:
l = line.split(' +++$+++ ')
lines[l[0]] = ' '.join(l[4:]).strip('\n').replace('\t', ' ')
cnt = 0
with codecs.open(convo_file, 'r') as f:
for line in f:
l = line.split(' ')
convo = ' '.join(l[6:]).strip('\n').strip('[').strip(']')
c = convo.replace("'",'').replace(' ','').split(',')
s = ''
index = 0
for i in range(0, len(c), 2):
index = index + 1
s = (s + str(index)+ ' ' + lines[c[i]])
if len(c) > i + 1:
s = s + '\t' + lines[c[i+1]]
s = s + '\n'
cnt = cnt + 1
handle = ftrain
if (cnt % 10) == 0:
handle = ftest
if (cnt % 10) == 1:
handle = fvalid
handle.write(s + '\n')
ftrain.close()
fvalid.close()
ftest.close()
def replace_surrogate_encode(mystring):
"""
Returns a (unicode) string,not the more logical bytes,because the codecs
register_error functionality expects this.
"""
decoded = []
for ch in mystring:
# if PY3:
# code = ch
# else:
code = ord(ch)
# The following magic comes from Py3.3's Python/codecs.c file:
if not 0xD800 <= code <= 0xDCFF:
# Not a surrogate. Fail with the original exception.
raise exc
# mybytes = [0xe0 | (code >> 12),
# 0x80 | ((code >> 6) & 0x3f),
# 0x80 | (code & 0x3f)]
# Is this a good idea?
if 0xDC00 <= code <= 0xDC7F:
decoded.append(_unichr(code - 0xDC00))
elif code <= 0xDCFF:
decoded.append(_unichr(code - 0xDC00))
else:
raise NotASurrogateError
return str().join(decoded)
def register_surrogateescape():
"""
Registers the surrogateescape error handler on Python 2 (only)
"""
if PY3:
return
try:
codecs.lookup_error(FS_ERRORS)
except LookupError:
codecs.register_error(FS_ERRORS, surrogateescape_handler)
def replace_surrogate_encode(mystring):
"""
Returns a (unicode) string,because the codecs
register_error functionality expects this.
"""
decoded = []
for ch in mystring:
# if utils.PY3:
# code = ch
# else:
code = ord(ch)
# The following magic comes from Py3.3's Python/codecs.c file:
if not 0xD800 <= code <= 0xDCFF:
# Not a surrogate. Fail with the original exception.
raise exc
# mybytes = [0xe0 | (code >> 12),
# 0x80 | (code & 0x3f)]
# Is this a good idea?
if 0xDC00 <= code <= 0xDC7F:
decoded.append(_unichr(code - 0xDC00))
elif code <= 0xDCFF:
decoded.append(_unichr(code - 0xDC00))
else:
raise NotASurrogateError
return str().join(decoded)
def register_surrogateescape():
"""
Registers the surrogateescape error handler on Python 2 (only)
"""
if utils.PY3:
return
try:
codecs.lookup_error(FS_ERRORS)
except LookupError:
codecs.register_error(FS_ERRORS, surrogateescape_handler)
def test_decode_callback(self):
if sys.maxunicode > 0xffff:
codecs.register_error("UnicodeInternalTest", codecs.ignore_errors)
decoder = codecs.getdecoder("unicode_internal")
ab = "ab".encode("unicode_internal").decode()
ignored = decoder(bytes("%s\x22\x22\x22\x22%s" % (ab[:4], ab[4:]),
"ascii"),
"UnicodeInternalTest")
self.assertEqual(("ab", 12), ignored)
def test_uninamereplace(self):
# We're using the names from the unicode database this time,
# and we're doing "Syntax highlighting" here,i.e. we include
# the replaced text in ANSI escape sequences. For this it is
# useful that the error handler is not called for every single
# unencodable character,but for a complete sequence of
# unencodable characters,otherwise we would output many
# unnecessary escape sequences.
def uninamereplace(exc):
if not isinstance(exc, UnicodeEncodeError):
raise TypeError("don't kNow how to handle %r" % exc)
l = []
for c in exc.object[exc.start:exc.end]:
l.append(unicodedata.name(c, "0x%x" % ord(c)))
return ("\033[1m%s\033[0m" % ",".join(l), exc.end)
codecs.register_error(
"test.uninamereplace", uninamereplace)
sin = "\xac\u1234\u20ac\u8000"
sout = b"\033[1mNOT SIGN,ETHIOPIC SYLLABLE SEE,EURO SIGN,CJK UNIFIED IDEOGRAPH-8000\033[0m"
self.assertEqual(sin.encode("ascii", "test.uninamereplace"), sout)
sout = b"\xac\033[1mETHIOPIC SYLLABLE SEE,CJK UNIFIED IDEOGRAPH-8000\033[0m"
self.assertEqual(sin.encode("latin-1", sout)
sout = b"\xac\033[1mETHIOPIC SYLLABLE SEE\033[0m\xa4\033[1mCJK UNIFIED IDEOGRAPH-8000\033[0m"
self.assertEqual(sin.encode("iso-8859-15", sout)
def test_decodeunicodeinternal(self):
self.assertRaises(
UnicodeDecodeError,
b"\x00\x00\x00\x00\x00".decode,
"unicode-internal",
)
if sys.maxunicode > 0xffff:
def handler_unicodeinternal(exc):
if not isinstance(exc, UnicodeDecodeError):
raise TypeError("don't kNow how to handle %r" % exc)
return ("\x01", 1)
self.assertEqual(
b"\x00\x00\x00\x00\x00".decode("unicode-internal", "ignore"),
"\u0000"
)
self.assertEqual(
b"\x00\x00\x00\x00\x00".decode("unicode-internal", "replace"),
"\u0000\ufffd"
)
codecs.register_error("test.hui", handler_unicodeinternal)
self.assertEqual(
b"\x00\x00\x00\x00\x00".decode("unicode-internal", "test.hui"),
"\u0000\u0001\u0000"
)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。