Python cryptography.hazmat.primitives.asymmetric.padding 模块,PSS 实例源码
我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用cryptography.hazmat.primitives.asymmetric.padding.PSS。
def test_verify_signature_PSS(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = TEST_RSA_PRIVATE_KEY.signer(
padding.PSS(
mgf=padding.MGF1(hash_alg),
salt_length=padding.PSS.MAX_LENGTH
),
hash_alg
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
hash_name, signature,
signature_utils.RSA_PSS)
verifier.update(data)
verifier.verify()
def create_verifier_for_pss(signature, hash_method, public_key):
"""Create the verifier to use when the key type is RSA-PSS.
:param signature: the decoded signature to use
:param hash_method: the hash method to use,as a cryptography object
:param public_key: the public key to use,as a cryptography object
:raises: SignatureVerificationError if the RSA-PSS specific properties
are invalid
:returns: the verifier to use to verify the signature for RSA-PSS
"""
# default to MGF1
mgf = padding.MGF1(hash_method)
# default to max salt length
salt_length = padding.PSS.MAX_LENGTH
# return the verifier
return public_key.verifier(
signature,
padding.PSS(mgf=mgf, salt_length=salt_length),
hash_method
)
def signing(cls, message, private_key, algorithm='sha1'):
"""signing."""
if not isinstance(message, bytes):
message = message.encode()
algorithm = cls.ALGORITHM_DICT.get(algorithm)
signer = private_key.signer(
padding.PSS(
mgf=padding.MGF1(algorithm),
salt_length=padding.PSS.MAX_LENGTH
),
algorithm
)
signer.update(message)
return signer.finalize()
def verification(cls, public_key, algorithm='sha1'):
"""Verification."""
if not isinstance(message, bytes):
message = message.encode()
algorithm = cls.ALGORITHM_DICT.get(algorithm)
verifier = public_key().verifier(
signature,
padding.PSS(
mgf=padding.MGF1(algorithm),
algorithm
)
verifier.update(message)
return verifier.verify()
def verify(self, msg, key, sig):
verifier = key.verifier(
sig,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
def rsa(public_key, message):
"""Verifies an RSA signature.
Args:
public_key (str): Public key with BEGIN and END sections.
signature (str): Hex value of the signature with its leading 0x stripped.
message (str): Message that was signed,unhashed.
"""
try:
public_rsa = load_pem_public_key(bytes(public_key), backend=default_backend())
hashed = util.sha256(message)
public_rsa.verify(
binascii.unhexlify(signature),
hashed,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
hashes.SHA256()
)
except InvalidSignature:
raise Exception('Invalid signature')
def rsa_sign(private_key, message):
"""Signs a message with the provided Rsa private key.
Args:
private_key (str): Rsa private key with BEGIN and END sections.
message (str): Message to be hashed and signed.
Returns:
str: Hex signature of the message,with its leading 0x stripped.
"""
private_rsa = load_pem_private_key(bytes(private_key), password=None, backend=default_backend())
hashed = sha256(message)
signature = private_rsa.sign(
hashed,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return binascii.hexlify(bytearray(signature))
def verify(self,
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
def _sign_image(self, image_file):
LOG.debug("Creating signature for image data")
signer = self.private_key.signer(
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
hashes.SHA256()
)
chunk_bytes = 8192
with open(image_file, 'rb') as f:
chunk = f.read(chunk_bytes)
while len(chunk) > 0:
signer.update(chunk)
chunk = f.read(chunk_bytes)
signature = signer.finalize()
signature_b64 = base64.b64encode(signature)
return signature_b64
def sign_and_upload_image(self):
img_signature = self._sign_image(self.img_file)
img_properties = {
'img_signature': img_signature,
'img_signature_certificate_uuid': self.signing_cert_uuid,
'img_signature_key_type': 'RSA-PSS',
'img_signature_hash_method': 'SHA-256',
}
LOG.debug("Uploading image with signature Metadata properties")
img_uuid = self._image_create(
'signed_img',
CONF.scenario.img_container_format,
self.img_file,
disk_format=CONF.scenario.img_disk_format,
properties=img_properties
)
LOG.debug("Uploaded image %s", img_uuid)
return img_uuid
def _rsa_verify_sig(sig_value, formatted, public_key_jsonld):
"""
- sig_value: data to be verified
- public_key: creator of this document's public_key
- tbv: to be verified
"""
# Todo: Support other formats than just PEM
public_key = serialization.load_pem_public_key(
_get_value(public_key_jsonld, "publicKeyPem").encode("utf-8"),
backend=default_backend())
try:
public_key.verify(
base64.b64decode(sig_value.encode("utf-8")),
salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256())
return True
except InvalidSignature:
return False
# In the future,we'll be doing a lot more work based on what suite is
# selected.
def test_verify_signature_PSS(self,
signature_utils.RSA_PSS)
verifier.update(data)
verifier.verify()
def create_verifier_for_pss(signature,
hash_method
)
def test_signature_key_type_lookup_fail(self):
self.assertRaisesRegex(exception.SignatureVerificationError,
'Invalid signature key type: .*',
signature_utils.SignatureKeyType.lookup,
'RSB-PSS')
def verify(self, signature: bytes, message: bytes):
return self._hazmat_public_key.verify(
signature,
message,
hashes.SHA256()
)
def sign(self, message: bytes):
return self._hazmat_private_key.sign(
message,
hashes.SHA256()
)
def _generate_sign(self, pri_key, data):
"""
?? ??? ??
:param pri_key: ??? ???
:param data: ?? ?? ???
:return: ??? ?? ???
"""
_signature = None
# ???? Type(RSA,ECC)? ?? ?? ?? ??
if isinstance(pri_key, ec.EllipticCurvePrivateKeyWithSerialization):
# ECDSA ??
logging.debug("Sign ECDSA")
signer = pri_key.signer(ec.ECDSA(hashes.SHA256()))
signer.update(data)
_signature = signer.finalize()
elif isinstance(pri_key, rsa.RSAPrivateKeyWithSerialization):
# RSA ??
logging.debug("Sign RSA")
_signature = pri_key.sign(
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
hashes.SHA256()
)
else:
logging.debug("UnkNown PrivateKey Type : %s", type(pri_key))
return _signature
def sign(self, key):
signer = key.signer(
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
self.hash_alg()
)
signer.update(msg)
return signer.finalize()
def sign_message(privkey, message):
plaintext = b64decode(message.encode('utf-8'))
signature = privkey.sign(
plaintext,
hashes.SHA256()
)
return b64encode(signature).decode('utf-8')
def __sign_content(self, content, user_sk):
""" Produce a signature of an input content using RSASSA-PSS scheme
@developer: vsmysle
:param content: bytes content to sign
:param user_sk: instance of cryptography.hazmat.primitives.rsa.
RSAPrivateKey
:return: bytes of signature of the input content
"""
# Todo: add exceptions
self.logger.debug("generating a signature of an input content")
# creating signer that will sign our content
try:
signer = user_sk.signer(
# we use RSASSA-PSS padding for the signature scheme
asym_padding.PSS(
mgf=asym_padding.MGF1(SHA1()),
salt_length=asym_padding.PSS.MAX_LENGTH
),
SHA1()
)
except InvalidKey:
self.logger.warning("Invalid key!")
return
signer.update(content)
signature = signer.finalize()
self.logger.info("signature generation finished")
return signature
def __verify_signature(self, signer_pk, content):
""" Verify RSASSA-PSS signature
@developer: vsmysle
:param signature: signature bytes to verify
:param signer_pk: instance of cryptography.hazmat.primitives.
rsa.RSAPublicKey that is a public key of a signer
:param content: content to verify a signature of
:return: bool verification result
"""
self.logger.debug("starting signature verification routine")
try:
signer_pk.verify(
signature,
content,
asym_padding.PSS(
mgf=asym_padding.MGF1(SHA1()),
SHA1()
)
except InvalidSignature:
self.logger.warn("signature verification Failed")
return False
self.logger.info("signature OK")
return True
def verify(self, sig):
try:
key.verify(
sig,
msg,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
return True
except InvalidSignature:
return False
def sign(self, private_key):
return private_key.sign(
self.string_to_sign.encode("utf-8"),
padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
salt_length=32),
hashes.SHA256())
def _set_signature(self, signature):
"""Set the signature manually.
The signature must be a valid RSA-PSS siganture.
Args:
signature (bytes): RSA signature.
"""
if not isinstance(signature, bytes):
raise TypeError('Signature must be bytes,was: ' + signature)
self.signature = signature
def sign(self, private_key):
"""Sign the message.
This method will take the provided message and create a
signature using the provided RSA private key. The resulting
signature is stored in the fulfillment.
The key should be provided as a PEM encoded private key string.
The message is padded using RSA-PSS with SHA256.
Args:
message (bytes): Message to sign.
private_key (bytes): RSA private key.
"""
private_key_obj = serialization.load_pem_private_key(
private_key,
password=None,
backend=default_backend(),
)
if self.modulus is None:
m_int = private_key_obj.public_key().public_numbers().n
m_bytes = m_int.to_bytes(
(m_int.bit_length() + 7) // 8, 'big')
self._set_public_modulus(m_bytes)
signer = private_key_obj.signer(
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=SALT_LENGTH,
),
hashes.SHA256(),
)
signer.update(message)
self.signature = signer.finalize()
def validate(self, message):
"""Verify the signature of self RSA fulfillment.
The signature of self RSA fulfillment is verified against the
provided message and the condition's public modulus.
Args:
message (bytes): Message to verify.
Returns:
bool: Whether self fulfillment is valid.
"""
if not isinstance(message, bytes):
raise Exception(
'Message must be provided as bytes,was: ' + message)
public_numbers = RSAPublicNumbers(
PUBLIC_EXPONENT,
int.from_bytes(self.modulus, byteorder='big'),
)
public_key = public_numbers.public_key(default_backend())
verifier = public_key.verifier(
self.signature,
hashes.SHA256()
)
verifier.update(message)
try:
verifier.verify()
except InvalidSignature as exc:
raise ValidationError('Invalid RSA signature') from exc
return True
def get_signer(self):
"""Gets an incremental verifier.
Must call .update() on the verifier and then .finalize() to check.
"""
hash_algorithm = hashes.SHA256()
padding_algorithm = padding.PSS(mgf=padding.MGF1(hash_algorithm),
salt_length=padding.PSS.MAX_LENGTH)
padding_algorithm = padding.PKCS1v15()
return self._value.signer(padding_algorithm, hash_algorithm)
def sign(self,
self.hash_alg()
)
signer.update(msg)
return signer.finalize()
def sign(self, message):
signer = self._key.signer(
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
hashes.SHA256()
)
signer.update(message)
return signer.finalize()
def verify(self, signature):
verifier = self._key.verifier(
signature,
hashes.SHA256()
)
verifier.update(message)
verifier.verify()
return True
def sign(self, caller: NodeId, message: bytes) -> str:
signature = self._private_keys[caller].sign(
message,
hashes.SHA256()
)
return base64.b64encode(signature)
def verify(self, signature: str, message: bytes, sender_id: NodeId) -> bool:
public_key = self._private_keys[sender_id].public_key()
try:
public_key.verify(
base64.b64decode(signature),
message,
hashes.SHA256()
)
return True
except InvalidSignature:
return False
def _basic_rsa_signature(formatted, options):
private_key = serialization.load_pem_private_key(
options["privateKeyPem"],
password=None,
backend=default_backend())
signed = private_key.sign(
formatted,
# I'm guessing this is the right padding function...?
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256())
return base64.b64encode(signed).decode("utf-8")
def test_signature_key_type_lookup_fail(self):
self.assertRaisesRegex(exception.SignatureVerificationError,
'RSB-PSS')
def pkcs_emsa_pss_encode(M, emBits, h, mgf, sLen):
"""
Implements EMSA-PSS-ENCODE() function described in Sect. 9.1.1 of RFC 3447
Input:
M : message to be encoded,an octet string
emBits: maximal bit length of the integer resulting of pkcs_os2ip(EM),
where EM is the encoded message,output of the function.
h : hash function name (in 'md2','md4','md5','sha1','tls',
'sha256','sha384'). hLen denotes the length in octets of
the hash function output.
mgf : the mask generation function f : seed,maskLen -> mask
sLen : intended length in octets of the salt
Output:
encoded message,an octet string of length emLen = ceil(emBits/8)
On error,None is returned.
"""
# 1) is not done
hLen = _hashFuncParams[h][0] # 2)
hFunc = _hashFuncParams[h][1]
mHash = hFunc(M)
emLen = int(math.ceil(emBits/8.))
if emLen < hLen + sLen + 2: # 3)
warning("encoding error (emLen < hLen + sLen + 2)")
return None
salt = randstring(sLen) # 4)
MPrime = '\x00'*8 + mHash + salt # 5)
H = hFunc(MPrime) # 6)
PS = '\x00'*(emLen - sLen - hLen - 2) # 7)
DB = PS + '\x01' + salt # 8)
dbMask = mgf(H, emLen - hLen - 1) # 9)
maskedDB = strxor(DB, dbMask) # 10)
l = (8*emLen - emBits)/8 # 11)
rem = 8*emLen - emBits - 8*l # additionnal bits
andMask = l*'\x00'
if rem:
j = chr(reduce(lambda x,y: x+y, map(lambda x: 1<<x, range(8-rem))))
andMask += j
l += 1
maskedDB = strand(maskedDB[:l], andMask) + maskedDB[l:]
EM = maskedDB + H + '\xbc' # 12)
return EM # 13)
def pkcs_emsa_pss_verify(M, EM, sLen):
"""
Implements EMSA-PSS-VERIFY() function described in Sect. 9.1.2 of RFC 3447
Input:
M : message to be encoded,an octet string
EM : encoded message,an octet string of length emLen = ceil(emBits/8)
emBits: maximal bit length of the integer resulting of pkcs_os2ip(EM)
h : hash function name (in 'md2','sha384'). hLen denotes the length in octets of
the hash function output.
mgf : the mask generation function f : seed,maskLen -> mask
sLen : intended length in octets of the salt
Output:
True if the verification is ok,False otherwise.
"""
# 1) is not done
hLen = _hashFuncParams[h][0] # 2)
hFunc = _hashFuncParams[h][1]
mHash = hFunc(M)
emLen = int(math.ceil(emBits/8.)) # 3)
if emLen < hLen + sLen + 2:
return False
if EM[-1] != '\xbc': # 4)
return False
l = emLen - hLen - 1 # 5)
maskedDB = EM[:l]
H = EM[l:l+hLen]
l = (8*emLen - emBits)/8 # 6)
rem = 8*emLen - emBits - 8*l # additionnal bits
andMask = l*'\xff'
if rem:
val = reduce(lambda x, range(8-rem)))
j = chr(~val & 0xff)
andMask += j
l += 1
if strand(maskedDB[:l], andMask) != '\x00'*l:
return False
dbMask = mgf(H, emLen - hLen - 1) # 7)
DB = strxor(maskedDB, dbMask) # 8)
l = (8*emLen - emBits)/8 # 9)
rem = 8*emLen - emBits - 8*l # additionnal bits
andMask = l*'\x00'
if rem:
j = chr(reduce(lambda x, range(8-rem))))
andMask += j
l += 1
DB = strand(DB[:l], andMask) + DB[l:]
l = emLen - hLen - sLen - 1 # 10)
if DB[:l] != '\x00'*(l-1) + '\x01':
return False
salt = DB[-sLen:] # 11)
MPrime = '\x00'*8 + mHash + salt # 12)
HPrime = hFunc(MPrime) # 13)
return H == HPrime # 14)
def _rsassa_pss_sign(self, M, h=None, mgf=None, sLen=None):
"""
Implements RSASSA-PSS-SIGN() function described in Sect. 8.1.1 of
RFC 3447.
Input:
M: message to be signed,an octet string
Output:
signature,an octet string of length k,where k is the length in
octets of the RSA modulus n.
On error,None is returned.
"""
# Set default parameters if not provided
if h is None: # By default,sha1
h = "sha1"
if not h in _hashFuncParams:
warning("Key._rsassa_pss_sign(): unkNown hash function "
"provided (%s)" % h)
return None
if mgf is None: # use mgf1 with underlying hash function
mgf = lambda x,y: pkcs_mgf1(x, y, h)
if sLen is None: # use Hash output length (A.2.3 of RFC 3447)
hLen = _hashFuncParams[h][0]
sLen = hLen
# 1) EMSA-PSS encoding
modBits = self.modulusLen
k = modBits / 8
EM = pkcs_emsa_pss_encode(M, modBits - 1, sLen)
if EM is None:
warning("Key._rsassa_pss_sign(): unable to encode")
return None
# 2) RSA signature
m = pkcs_os2ip(EM) # 2.a)
s = self._rsasp1(m) # 2.b)
S = pkcs_i2osp(s, k) # 2.c)
return S # 3)
def sign(self, t=None, sLen=None):
"""
Sign message 'M' using 't' signature scheme where 't' can be:
- None: the message 'M' is directly applied the RSASP1 signature
primitive,as described in PKCS#1 v2.1,i.e. RFC 3447 Sect
5.2.1. Simply put,the message undergo a modular exponentiation
using the private key. Additionnal method parameters are just
ignored.
- 'pkcs': the message 'M' is applied RSASSA-PKCS1-v1_5-SIGN signature
scheme as described in Sect. 8.2.1 of RFC 3447. In that context,
the hash function name is passed using 'h'. Possible values are
"md2","md4","md5","sha1","tls","sha224","sha256","sha384"
and "sha512". If none is provided,sha1 is used. Other additionnal
parameters are ignored.
- 'pss' : the message 'M' is applied RSASSA-PSS-SIGN signature scheme as
described in Sect. 8.1.1. of RFC 3447. In that context,
o 'h' parameter provides the name of the hash method to use.
Possible values are "md2",
"sha256","sha384" and "sha512". if none is provided,sha1
is used.
o 'mgf' is the mask generation function. By default,mgf
is derived from the provided hash function using the
generic MGF1 (see pkcs_mgf1() for details).
o 'sLen' is the length in octet of the salt. You can overload the
default value (the octet length of the hash value for provided
algorithm) by providing another one with that parameter.
"""
if t is None: # RSASP1
M = pkcs_os2ip(M)
n = self.modulus
if M > n-1:
warning("Message to be signed is too long for key modulus")
return None
s = self._rsasp1(M)
if s is None:
return None
return pkcs_i2osp(s, self.modulusLen/8)
elif t == "pkcs": # RSASSA-PKCS1-v1_5-SIGN
if h is None:
h = "sha1"
return self._rsassa_pkcs1_v1_5_sign(M, h)
elif t == "pss": # RSASSA-PSS-SIGN
return self._rsassa_pss_sign(M, sLen)
else:
warning("Key.sign(): UnkNown signature type (%s) provided" % t)
return None
def _verify_signature(self, pub_key, data, signature):
"""
?? ??? ??
:param pub_key: ??? ???
:param data: ?? ?? ???
:param signature: ?? ???
:return: ?? ?? ??(True/False)
"""
validation_result = False
# ???? Type(RSA,ECC)? ?? ?? ?? ??
if isinstance(pub_key, ec.EllipticCurvePublicKeyWithSerialization):
# ECDSA ??
logging.debug("Verify ECDSA")
try:
pub_key.verify(
signature,
data,
ec.ECDSA(hashes.SHA256())
)
validation_result = True
except InvalidSignature:
logging.debug("InvalidSignature_ECDSA")
elif isinstance(pub_key, rsa.RSAPublicKeyWithSerialization):
# RSA ??
logging.debug("Verify RSA")
try:
pub_key.verify(
signature,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
validation_result = True
except InvalidSignature:
logging.debug('InvalidSignature_RSA')
else:
logging.debug("UnkNown PublicKey Type : %s", type(pub_key))
return validation_result
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。