Python cryptography.hazmat.primitives.asymmetric.padding 模块,MGF1 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.hazmat.primitives.asymmetric.padding.MGF1。
def decrypt_pk(priv_key, ciphertext):
"""
Decrypt a b64encoded ciphertext string with the RSA private key priv_key,
using CryptoHash() as the OAEP/MGF1 padding hash.
Returns the plaintext.
Decryption failures result in an exception being raised.
"""
try:
plaintext = priv_key.decrypt(
b64decode(ciphertext),
padding.OAEP(
mgf=padding.MGF1(algorithm=CryptoHash()),
algorithm=CryptoHash(),
label=None
)
)
except UnsupportedAlgorithm as e:
# a failure to dencrypt someone else's data is not typically a fatal
# error,but in this particular case,the most likely cause of this
# error is an old cryptography library
logging.error("Fatal error: encryption hash {} unsupported,try upgrading to cryptography >= 1.4. Exception: {}".format(
CryptoHash, e))
# re-raise the exception for the caller to handle
raise e
return plaintext
def test_verify_signature_PSS(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = TEST_RSA_PRIVATE_KEY.signer(
padding.PSS(
mgf=padding.MGF1(hash_alg),
salt_length=padding.PSS.MAX_LENGTH
),
hash_alg
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
hash_name, signature,
signature_utils.RSA_PSS)
verifier.update(data)
verifier.verify()
def create_verifier_for_pss(signature, hash_method, public_key):
"""Create the verifier to use when the key type is RSA-PSS.
:param signature: the decoded signature to use
:param hash_method: the hash method to use,as a cryptography object
:param public_key: the public key to use,as a cryptography object
:raises: SignatureVerificationError if the RSA-PSS specific properties
are invalid
:returns: the verifier to use to verify the signature for RSA-PSS
"""
# default to MGF1
mgf = padding.MGF1(hash_method)
# default to max salt length
salt_length = padding.PSS.MAX_LENGTH
# return the verifier
return public_key.verifier(
signature,
padding.PSS(mgf=mgf, salt_length=salt_length),
hash_method
)
def signing(cls, message, private_key, algorithm='sha1'):
"""signing."""
if not isinstance(message, bytes):
message = message.encode()
algorithm = cls.ALGORITHM_DICT.get(algorithm)
signer = private_key.signer(
padding.PSS(
mgf=padding.MGF1(algorithm),
salt_length=padding.PSS.MAX_LENGTH
),
algorithm
)
signer.update(message)
return signer.finalize()
def verification(cls, public_key, algorithm='sha1'):
"""Verification."""
if not isinstance(message, bytes):
message = message.encode()
algorithm = cls.ALGORITHM_DICT.get(algorithm)
verifier = public_key().verifier(
signature,
padding.PSS(
mgf=padding.MGF1(algorithm),
algorithm
)
verifier.update(message)
return verifier.verify()
def encrypt(cls, algorithm='sha1'):
"""Public key encrypt."""
if not isinstance(message, bytes):
message = message.encode()
algorithm = cls.ALGORITHM_DICT.get(algorithm)
ciphertext = public_key.encrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=algorithm),
algorithm=algorithm,
label=None
)
)
return ciphertext
def create_decryptor(private_location, public_location):
try:
with open(private_location, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None,
backend=default_backend()
)
except FileNotFoundError:
with open(private_location, "wb") as key_file:
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
key_file.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
with open(public_location, "wb") as public_file:
public_key = private_key.public_key()
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
public_file.write(pem)
def decrypt(ciphertext):
return private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
)
)
return decrypt
def verify(self, msg, key, sig):
verifier = key.verifier(
sig,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
def encrypt_key_with_public_key(secret_key, public_encryption_key):
"""
Encrypts the given secret key with the public key.
:param bytes secret_key: the key to encrypt
:param public_encryption_key: the public encryption key
:type public_encryption_key: :class:`~rsa.RSAPublicKey`
:return: the encrypted key
:rtype: bytes
"""
return public_encryption_key.encrypt(
secret_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None))
def decrypt_with_private_key(secret_key, private_encryption_key):
"""
Decrypts the given secret key with the private key.
:param bytes secret_key: the secret key to decrypt
:param private_encryption_key: the private encryption key
:type private_encryption_key: :class:`~rsa.RSAPrivateKey`
:return: the decrypted key
:rtype: bytes
"""
return private_encryption_key.decrypt(
secret_key,
label=None))
def rsa(public_key, message):
"""Verifies an RSA signature.
Args:
public_key (str): Public key with BEGIN and END sections.
signature (str): Hex value of the signature with its leading 0x stripped.
message (str): Message that was signed,unhashed.
"""
try:
public_rsa = load_pem_public_key(bytes(public_key), backend=default_backend())
hashed = util.sha256(message)
public_rsa.verify(
binascii.unhexlify(signature),
hashed,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
hashes.SHA256()
)
except InvalidSignature:
raise Exception('Invalid signature')
def rsa_sign(private_key, message):
"""Signs a message with the provided Rsa private key.
Args:
private_key (str): Rsa private key with BEGIN and END sections.
message (str): Message to be hashed and signed.
Returns:
str: Hex signature of the message,with its leading 0x stripped.
"""
private_rsa = load_pem_private_key(bytes(private_key), password=None, backend=default_backend())
hashed = sha256(message)
signature = private_rsa.sign(
hashed,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return binascii.hexlify(bytearray(signature))
def verify(self,
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
def _mgf1_sha256_supported():
wk = serialization.load_pem_private_key(
data=VALUES['raw'][b'asym1'][EncryptionKeyType.PRIVATE],
password=None,
backend=default_backend()
)
try:
wk.public_key().encrypt(
plaintext=b'aosdjfoiajfoiaj;foijae;rogijaerg',
padding=padding.OAEP(
mgf=padding.MGF1(hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
except cryptography.exceptions.UnsupportedAlgorithm:
return False
return True
def _sign_image(self, image_file):
LOG.debug("Creating signature for image data")
signer = self.private_key.signer(
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
hashes.SHA256()
)
chunk_bytes = 8192
with open(image_file, 'rb') as f:
chunk = f.read(chunk_bytes)
while len(chunk) > 0:
signer.update(chunk)
chunk = f.read(chunk_bytes)
signature = signer.finalize()
signature_b64 = base64.b64encode(signature)
return signature_b64
def _rsa_verify_sig(sig_value, formatted, public_key_jsonld):
"""
- sig_value: data to be verified
- public_key: creator of this document's public_key
- tbv: to be verified
"""
# Todo: Support other formats than just PEM
public_key = serialization.load_pem_public_key(
_get_value(public_key_jsonld, "publicKeyPem").encode("utf-8"),
backend=default_backend())
try:
public_key.verify(
base64.b64decode(sig_value.encode("utf-8")),
salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256())
return True
except InvalidSignature:
return False
# In the future,we'll be doing a lot more work based on what suite is
# selected.
def test_verify_signature_PSS(self,
signature_utils.RSA_PSS)
verifier.update(data)
verifier.verify()
def create_verifier_for_pss(signature,
hash_method
)
def encrypt_pk(pub_key, plaintext):
"""
Encrypt plaintext with the RSA public key pub_key,using CryptoHash()
as the OAEP/MGF1 padding hash.
plaintext is limited to the size of the RSA key,minus the padding,or a
few hundred bytes.
Returns a b64encoded ciphertext string.
Encryption failures result in an exception being raised.
"""
try:
ciphertext = pub_key.encrypt(
plaintext,
label=None
)
)
except UnsupportedAlgorithm as e:
# a failure to encrypt our own data is a Fatal error
# the most likely cause of this error is an old cryptography library
# although some newer binary cryptography libraries are linked with
# old OpenSSL versions,to fix,check 'openssl version' >= 1.0.2,then:
# pip install -I --no-binary cryptography cryptography
logging.error("Fatal error: encryption hash {} unsupported,try upgrading to cryptography >= 1.4 compiled with OpenSSL >= 1.0.2. Exception: {}".format(
CryptoHash, e))
# re-raise the exception for the caller to handle
raise e
return b64encode(ciphertext)
def encrypt_rsa(text, key):
private_key = load_key(key)
public_key = private_key.public_key()
return public_key.encrypt(
text,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
)
)
def decrypt_rsa(text, key):
private_key = load_key(key)
return private_key.decrypt(
text,
label=None
)
)
def __init__(self, key_pair):
self.key_pair = key_pair
self._padding = asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=hashes.SHA1()),
label=None
)
def decrypt(cls, ciphertext, algorithm='sha1'):
"""Private key descrption."""
algorithm = cls.ALGORITHM_DICT.get(algorithm)
plaintext = private_key.decrypt(
ciphertext,
label=None
)
)
return plaintext.decode()
def verify(self, signature: bytes, message: bytes):
return self._hazmat_public_key.verify(
signature,
message,
hashes.SHA256()
)
def encrypt(self, message: bytes):
symkey = generate_sym_key()
ciphertext = symkey.encrypt(message)
ciphersymkey = self._hazmat_public_key.encrypt(
symkey.key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
label=None
)
)
return struct.pack(">I", len(ciphersymkey)) + ciphersymkey + ciphertext
def sign(self, message: bytes):
return self._hazmat_private_key.sign(
message,
hashes.SHA256()
)
def decrypt(self, ciphertext: bytes):
lenciphersymkey, = struct.unpack(">I", ciphertext[:4])
ciphersymkey = ciphertext[4:4 + lenciphersymkey]
ciphertext = ciphertext[4 + lenciphersymkey:]
symkey = load_sym_key(self._hazmat_private_key.decrypt(
ciphersymkey,
label=None
)
))
return symkey.decrypt(ciphertext)
def _generate_sign(self, pri_key, data):
"""
?? ??? ??
:param pri_key: ??? ???
:param data: ?? ?? ???
:return: ??? ?? ???
"""
_signature = None
# ???? Type(RSA,ECC)? ?? ?? ?? ??
if isinstance(pri_key, ec.EllipticCurvePrivateKeyWithSerialization):
# ECDSA ??
logging.debug("Sign ECDSA")
signer = pri_key.signer(ec.ECDSA(hashes.SHA256()))
signer.update(data)
_signature = signer.finalize()
elif isinstance(pri_key, rsa.RSAPrivateKeyWithSerialization):
# RSA ??
logging.debug("Sign RSA")
_signature = pri_key.sign(
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
hashes.SHA256()
)
else:
logging.debug("UnkNown PrivateKey Type : %s", type(pri_key))
return _signature
def sign(self, key):
signer = key.signer(
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
self.hash_alg()
)
signer.update(msg)
return signer.finalize()
def test_unsupported_oaep_label(self):
private_key = RSA_KEY_512.private_key(backend)
with pytest.raises(ValueError):
private_key.decrypt(
b"0" * 64,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=b"label"
)
)
def test_unsupported_mgf1_hash_algorithm(self):
private_key = RSA_KEY_512.private_key(backend)
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
private_key.decrypt(
b"0" * 64,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
label=None
)
)
def test_unsupported_oaep_hash_algorithm(self):
private_key = RSA_KEY_512.private_key(backend)
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
private_key.decrypt(
b"0" * 64,
algorithm=hashes.SHA256(),
label=None
)
)
def decrypt(privkey, ciphertext):
plaintext = privkey.decrypt(
ciphertext,
label=None
)
)
return plaintext
def encrypt(pubkey, plaintext):
ciphertext= pubkey.encrypt(plaintext=plaintext,
padding=padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
label=None
)
)
return ciphertext
def sign_message(privkey, message):
plaintext = b64decode(message.encode('utf-8'))
signature = privkey.sign(
plaintext,
hashes.SHA256()
)
return b64encode(signature).decode('utf-8')
def wrap_rsa_key(public_key, private_key_bytes):
# Use the Google public key to encrypt the customer private key.
# This means that only the Google private key is capable of decrypting
# the customer private key.
wrapped_key = public_key.encrypt(
private_key_bytes,
label=None))
encoded_wrapped_key = base64.b64encode(wrapped_key)
return encoded_wrapped_key
def __encrypt_with_rsa(self, content, recipient_pk):
""" Encrypt content with RSAES-OAEP scheme
@developer: vsmysle
This method handles an encryption of a *single* RSA block with a
specified above scheme. It does not handle splitting of a header into
several blocks. It has to be done by other method that would use this
one only for single block encryption purpose.
Todo: what is a maximum size of a content that can be padded and
encrypted given a particular size of RSA key?
:param content: bytes content to encrypt (probably a part of
ASN.1 DER-encoded MPHeader block)
:param recipient_pk: instance of cryptography.hazmat.primitives.rsa
.RSAPublicKey to use for a content encryption
:return: string encryption of an input content
"""
# Todo: add exceptions
self.logger.debug("rsa encryption")
ciphertext = recipient_pk.encrypt(
content, asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=SHA1()),
algorithm=SHA1(),
label=None
)
)
self.logger.info("encrypted")
return ciphertext
def __decrypt_with_rsa(self, user_sk):
""" Decrypt RSAES-OAEP encrypted content (single block)
@developer: vsmysle
This method decrypts a single RSA ciphertext block only
:param content: bytes content to decrypt
:param user_sk: instance of cryptography.hazmat.primitives.rsa
.RSAPrivateKey to use for a decryption
:return: string decryption of an input content
"""
# Todo: add exceptions
self.logger.debug("rsa decryption")
try:
plaintext = user_sk.decrypt(
content, asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=SHA1()),
algorithm=SHA1(),
label=None
)
)
except InvalidKey:
self.logger.warning("Invalid key!")
return
return plaintext
def __sign_content(self, user_sk):
""" Produce a signature of an input content using RSASSA-PSS scheme
@developer: vsmysle
:param content: bytes content to sign
:param user_sk: instance of cryptography.hazmat.primitives.rsa.
RSAPrivateKey
:return: bytes of signature of the input content
"""
# Todo: add exceptions
self.logger.debug("generating a signature of an input content")
# creating signer that will sign our content
try:
signer = user_sk.signer(
# we use RSASSA-PSS padding for the signature scheme
asym_padding.PSS(
mgf=asym_padding.MGF1(SHA1()),
salt_length=asym_padding.PSS.MAX_LENGTH
),
SHA1()
)
except InvalidKey:
self.logger.warning("Invalid key!")
return
signer.update(content)
signature = signer.finalize()
self.logger.info("signature generation finished")
return signature
def __verify_signature(self, signer_pk, content):
""" Verify RSASSA-PSS signature
@developer: vsmysle
:param signature: signature bytes to verify
:param signer_pk: instance of cryptography.hazmat.primitives.
rsa.RSAPublicKey that is a public key of a signer
:param content: content to verify a signature of
:return: bool verification result
"""
self.logger.debug("starting signature verification routine")
try:
signer_pk.verify(
signature,
content,
asym_padding.PSS(
mgf=asym_padding.MGF1(SHA1()),
SHA1()
)
except InvalidSignature:
self.logger.warn("signature verification Failed")
return False
self.logger.info("signature OK")
return True
def verify(self, sig):
try:
key.verify(
sig,
msg,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
return True
except InvalidSignature:
return False
def sign(self, private_key):
return private_key.sign(
self.string_to_sign.encode("utf-8"),
padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
salt_length=32),
hashes.SHA256())
def sign(self, private_key):
"""Sign the message.
This method will take the provided message and create a
signature using the provided RSA private key. The resulting
signature is stored in the fulfillment.
The key should be provided as a PEM encoded private key string.
The message is padded using RSA-PSS with SHA256.
Args:
message (bytes): Message to sign.
private_key (bytes): RSA private key.
"""
private_key_obj = serialization.load_pem_private_key(
private_key,
password=None,
backend=default_backend(),
)
if self.modulus is None:
m_int = private_key_obj.public_key().public_numbers().n
m_bytes = m_int.to_bytes(
(m_int.bit_length() + 7) // 8, 'big')
self._set_public_modulus(m_bytes)
signer = private_key_obj.signer(
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=SALT_LENGTH,
),
hashes.SHA256(),
)
signer.update(message)
self.signature = signer.finalize()
def validate(self, message):
"""Verify the signature of self RSA fulfillment.
The signature of self RSA fulfillment is verified against the
provided message and the condition's public modulus.
Args:
message (bytes): Message to verify.
Returns:
bool: Whether self fulfillment is valid.
"""
if not isinstance(message, bytes):
raise Exception(
'Message must be provided as bytes,was: ' + message)
public_numbers = RSAPublicNumbers(
PUBLIC_EXPONENT,
int.from_bytes(self.modulus, byteorder='big'),
)
public_key = public_numbers.public_key(default_backend())
verifier = public_key.verifier(
self.signature,
hashes.SHA256()
)
verifier.update(message)
try:
verifier.verify()
except InvalidSignature as exc:
raise ValidationError('Invalid RSA signature') from exc
return True
def encrypt(self, message):
if self._value is None:
raise ValueError("Can't Encrypt with empty key.")
try:
return self._value.encrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None))
except ValueError as e:
raise CipherError(e)
def get_signer(self):
"""Gets an incremental verifier.
Must call .update() on the verifier and then .finalize() to check.
"""
hash_algorithm = hashes.SHA256()
padding_algorithm = padding.PSS(mgf=padding.MGF1(hash_algorithm),
salt_length=padding.PSS.MAX_LENGTH)
padding_algorithm = padding.PKCS1v15()
return self._value.signer(padding_algorithm, hash_algorithm)
def decrypt(self, message):
if self._value is None:
raise ValueError("Can't Decrypt with empty key.")
try:
return self._value.decrypt(
message,
label=None))
except ValueError as e:
raise CipherError(e)
def sign(self,
self.hash_alg()
)
signer.update(msg)
return signer.finalize()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。