diff --git a/prefect_huaweicloud/__init__.py b/prefect_huaweicloud/__init__.py index e5be0e0db4e10ff186c46c66f08dd528c7c057ec..df80bbbb6f7674f92a8d05ec19f32cf176085bc5 100644 --- a/prefect_huaweicloud/__init__.py +++ b/prefect_huaweicloud/__init__.py @@ -1,5 +1,7 @@ # ensure core blocks are registered from prefect_huaweicloud.obs_block import ObsBlock +from prefect_huaweicloud.dew_block import DewBlock +from prefect_huaweicloud.dew_client import DewClient +__all__ = ["ObsBlock","DewBlock","DewClient"] -__all__ = ["ObsBlock"] diff --git a/prefect_huaweicloud/dew_block.py b/prefect_huaweicloud/dew_block.py new file mode 100644 index 0000000000000000000000000000000000000000..c4e5e2d0590df6d173996b50a54b635c239cb1cd --- /dev/null +++ b/prefect_huaweicloud/dew_block.py @@ -0,0 +1,2495 @@ +from typing import Optional, Dict, Any +from logging import Logger +from pydantic import Field, SecretStr +from prefect.blocks.abstract import Block +from prefect.logging.loggers import get_logger, get_run_logger +from prefect.exceptions import MissingContextError +from prefect import task +from pydantic import VERSION as PYDANTIC_VERSION +import json + +if PYDANTIC_VERSION.startswith("2."): + from pydantic.v1 import Field +else: + from pydantic import Field +from dew_client import DewClient +from huaweicloudsdkkms.v2.model import * +from huaweicloudsdkcore.exceptions import exceptions +from huaweicloudsdkcsms.v1.model import * +from huaweicloudsdkkps.v3.model import * +from dataclasses import dataclass + + +class DewBlock(Block): + _logo_url = "https://res-static.hc-cdn.cn/cloudbu-site/public/new-product-icon/SecurityCompliance/DEW.png)" # noqa + _block_type_name = "HuaweiCloud DEW" + _documentation_url = ("https://support.huaweicloud.com/intl/zh-cn/sdkreference-dew/dew_02_0001.html") # noqa + + huawei_cloud_access_key_id: Optional[SecretStr] = Field( + default=None, + description = "A specific Huawei Cloud access key ID.", + title = "Huawei Cloud Access Key ID:", + ) + huawei_cloud_secret_access_key: Optional[SecretStr] = Field( + default=None, + description = "A specific Huawei Cloud secret access key.", + title = "Huawei Cloud Access Key Secret:", + ) + huawei_cloud_project_id: Optional[SecretStr] = Field( + default=None, + description = "A specific Huawei Cloud project id.", + title = "Huawei Cloud project id", + ) + huawei_cloud_security_token: Optional[SecretStr] = Field( + default=None, + description = "SecurityToken in the temporary access key, " + "You can select a temporary token or AK/SK for authentication", + title = "Huawei Cloud Security Token", + ) + region_id: Optional[str] = Field( + default=None, + title = "region id", + ) + huawei_cloud_dew_parameters: Dict = Field( + default=None, + description = "Extra parameters for using dew.", + title = "dew_parameters", + ) + + @dataclass + class CreateSecretOptions: + def __init__(self, name=None, kms_key_id=None, description=None, secret_binary=None, secret_string=None, + secret_type=None, auto_rotation=None, rotation_period=None, rotation_config=None, + event_subscriptions=None, enterprise_project_id=None): + self.name = name + self.kms_key_id = kms_key_id + self.description = description + self.secret_binary = secret_binary + self.secret_string = secret_string + self.secret_type = secret_type + self.auto_rotation = auto_rotation + self.rotation_period = rotation_period + self.rotation_config = rotation_config + self.event_subscriptions = event_subscriptions + self.enterprise_project_id = enterprise_project_id + + @dataclass + class CreatekeyOptions: + def __init__(self, key_alias=None, key_spec=None, key_usage=None, key_description=None, + origin=None, enterprise_project_id=None, sequence=None, keystore_id=None): + self.key_alias = key_alias + self.key_spec = key_spec + self.key_usage = key_usage + self.key_description = key_description + self.origin = origin + self.enterprise_project_id = enterprise_project_id + self.sequence = sequence + self.keystore_id = keystore_id + + @dataclass + class ImportkeyMaterialOptions: + def __init__(self, key_id=None, import_token=None, encrypted_key_material=None, encrypted_privatekey=None, expiration_time=None, sequence=None): + self.key_id=key_id + self.import_token=import_token + self.encrypted_key_material=encrypted_key_material, + self.encrypted_privatekey=encrypted_privatekey + self.expiration_time=expiration_time + self.sequence=sequence + + @dataclass + class CreateGrantOptions: + def __init__(self, key_id=None, grantee_principal=None, listOperationsbody=None, name=None, + grantee_principal_type=None, retiring_principal=None, sequence=None): + self.key_id = key_id + self.grantee_principal = grantee_principal + self.listOperationsbody = listOperationsbody + self.name = name + self.grantee_principal_type = grantee_principal_type + self.retiring_principal = retiring_principal + self.sequence = sequence + + @dataclass + class ValidateSignatureOptions: + def __init__(self, asymmetric_key_id=None, message=None, signing_algorithm=None, signature=None, + message_type=None, sequence=None): + self.asymmetric_key_id = asymmetric_key_id + self.message = message + self.signing_algorithm = signing_algorithm + self.signature = signature + self.message_type = message_type + self.sequence = sequence + + @dataclass + class ListKeysOptions: + def __init__(self, limit=None, marker=None, key_state=None, key_spec=None, enterprise_project_id=None, + sequence=None): + self.limit = limit + self.marker = marker + self.key_state = key_state + self.key_spec = key_spec + self.enterprise_project_id = enterprise_project_id + self.sequence = sequence + + @dataclass + class CreateKeypairOptions: + def __init__(self, name=None, type=None, public_key=None, scope=None, user_id=None, key_protection=None): + self.name = name + self.type = type + self.public_key = public_key + self.scope = scope + self.user_id = user_id + self.key_protection = key_protection + + @dataclass + class AssociateKeypairOptions: + def __init__(self, keypair_name=None, type=None, key=None, id =None, disable_password = False, port=None): + self.keypair_name = keypair_name + self.type = type + self.key = key + self.id = id + self.disable_password = disable_password + self.port = port + + @dataclass + class BatchAssociateKeypairOptions: + def __init__(self, keypair_name=None, type=None, key=None, id=None, disable_password = False, port=None): + self.keypair_name = keypair_name + self.type = type + self.key = key + self.id = id + self.disable_password = disable_password + self.port = port + + @dataclass + class UpdateSecretOptions: + def __init__(self, kms_key_id=None, secret_name=None, description=None, auto_rotation=None, + rotation_period=None, listEventSubscriptionsbody=None): + self.kms_key_id = kms_key_id + self.secret_name = secret_name + self.description = description + self.auto_rotation = auto_rotation + self.rotation_period = rotation_period + self.listEventSubscriptionsbody = listEventSubscriptionsbody + + @dataclass + class CreateSecretEventOptions: + def __init__(self, target_type=None, target_id=None, target_name=None, event_name=None, state=None, event_types=None): + self.target_type = target_type + self.target_id = target_id + self.target_name = target_name + self.event_name = event_name + self.state = state + self.event_types = event_types + + @dataclass + class ListResourceInstancesOptions: + def __init__(self, resource_instances=None, values=None, key=None, limit=None, offset=None, action=None, + matches=None, sequence=None): + self.resource_instances = resource_instances + self.values = values + self.key = key + self.limit = limit + self.offset = offset + self.action = action + self.matches = matches + self.sequence = sequence + + @dataclass + class ListKmsByTagsOptions: + def __init__(self, key=None, values=None, limit=None, offset=None, action=None, tags=None, matches=None, + sequence=None): + self.key = key + self.values = values + self.limit = limit + self.offset = offset + self.action = action + self.tags = tags + self.matches = matches + self.sequence = sequence + + def __init__(self, **kwargs): + self._dew_client = DewClient(region_id = kwargs.get('region_id'), + ak = kwargs.get('huawei_cloud_access_key_id'), + sk = kwargs.get('huawei_cloud_secret_access_key'), + project_id = kwargs.get('huawei_cloud_project_id')) + self.dew_parameters = kwargs.get('huawei_cloud_dew_parameters') + + @property + def logger(self) -> Logger: + """ + Returns a logger based on whether the ObjectStorageBlock + is called from within a flow or task run context. + If a run context is present, the logger property returns a run logger. + Else, it returns a default logger labeled with the class's name. + + Returns: + The run logger or a default logger with the class's name. + """ + try: + return get_run_logger() + except MissingContextError: + return get_logger(self.__class__.__name__) + + def create_key(self, options: CreatekeyOptions): + """ + Create a user master key, which can be a symmetric or asymmetric key. + + Args: + key_alias: Non default master key alias, with a value range of 1 to 255 characters, satisfying + regular matching "^ [a-zA-Z0-9:/_ -] {1255} $", and not having the same name as the default + master key alias created by the system service. + type key_alias: str + """ + try: + request = CreateKeyRequest() + request.body = CreateKeyRequestBody( + key_alias = options.key_alias, + key_spec = options.key_spec, + key_usage = options.key_usage, + key_description = options.key_description, + origin = options.origin, + enterprise_project_id = options.enterprise_project_id, + sequence = options.sequence, + keystore_id = options.keystore_id + ) + response = self._dew_client.kms_client.create_key(request) + self.logger.debug("Created key") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def delete_key(self, key_id=None, pending_days=None, sequence=None): + """ + How many days do you plan to delete the key? The key can be deleted within 7-1096 days. + + Args: + key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_ id: str + :pending_days: How many days do you plan to delete the key, with values ranging from 7 to 1096. + :type pending_days: str + """ + try: + request = DeleteKeyRequest() + request.body = ScheduleKeyDeletionRequestBody( + key_id = key_id, + pending_days = pending_days, + sequence = sequence + ) + response = self._dew_client.kms_client.delete_key(request) + self.logger.debug("Successfully delete_key") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def disable_key(self, key_id=None, sequence=None): + """ + Disabled key, cannot be used after disabling the key. + + Args: + :key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = DisableKeyRequest() + request.body = OperateKeyRequestBody( + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.disable_key(request) + self.logger.info("Successfully disable_key:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def enable_key(self, key_id=None, sequence=None): + """ + Enable the key, which can only be used after it is enabled. + + Args: + :key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = EnableKeyRequest() + request.body = OperateKeyRequestBody( + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.enable_key(request) + self.logger.info("Successfully enable_key:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def update_key_alias(self, key_id=None, key_alias=None, sequence=None): + """ + Modify the user master key alias + + Args: + :key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + :key_alias: Non default master key alias, ranging from 1 to 255 characters, satisfies regular matching + "^[a-zA-Z0-9:/_-]{1, 255}$"and cannot have the suffix "/default". + :type key_alias: str + """ + try: + request = UpdateKeyAliasRequest() + request.body = UpdateKeyAliasRequestBody( + key_alias = key_alias, + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.update_key_alias(request) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def update_key_description(self, key_id=None, key_description=None, sequence=None): + """ + Modify user master key description information. + + Args: + :key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + :key_description: Key description, ranging from 0 to 255 characters. + :type key_description: str + """ + try: + request = UpdateKeyDescriptionRequest() + request.body = UpdateKeyDescriptionRequestBody( + key_id = key_id, + key_description = key_description, + sequence = sequence + ) + response = self._dew_client.kms_client.update_key_description(request) + self.logger.info("Successfully update_key_description:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def cancel_key_deletion(self, key_id=None, sequence=None): + """ + Cancel plan to delete key. + + Args: + :key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = CancelKeyDeletionRequest() + request.body = OperateKeyRequestBody( + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.cancel_key_deletion(request) + self.logger.info("Successfully canccel key daletion:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_data_key(self, key_id, key_spec=None, datakey_length=None, sequence=None): + """ + Create a data key and return a result containing both plaintext and ciphertext. + + Args: + :key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = CreateDatakeyRequest() + request.body = CreateDatakeyRequestBody( + key_id = key_id, + key_spec = key_spec, + datakey_length = datakey_length, + sequence = sequence + ) + response = self._dew_client.kms_client.create_datakey(request) + self.logger.info("Successfully created datakey:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_data_key_without_plaintext(self, key_id, key_spec=None, datakey_length=None, sequence=None): + """ + Create a data key and return a result that only contains ciphertext. + + Args: + :key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = CreateDatakeyWithoutPlaintextRequest() + request.body = CreateDatakeyRequestBody( + key_id = key_id, + key_spec = key_spec, + datakey_length = datakey_length, + sequence = sequence + ) + response = self._dew_client.kms_client.create_datakey_without_plaintext(request) + self.logger.info("Successfully created datakey without plaintext:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_random(self, random_data_length=None, sequence=None): + """ + Generate random numbers within the range of 8-8192 bits. + + Args: + :param random_ Data_ Length: The bit length of a random number. The value is a multiple of 8, with a range of 8 to 8192. + The bit length of a random number, with a value of 512. + :type random_ Data_ Length: str + """ + try: + request = CreateRandomRequest() + request.body = GenRandomRequestBody( + random_data_length = random_data_length, + sequence = sequence + ) + response = self._dew_client.kms_client.create_random(request) + self.logger.info("Successfully created random:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def decrypt_data_key(self, key_id=None, cipher_text=None, datakey_cipher_length=None, sequence=None): + """ + Decrypt data key, decrypt data key with specified master key. + + Args: + :key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + + :cipher_text: The hexadecimal string of DEK ciphertext and metadata. + The value is the cipher in the encrypted data key result_ The value of text. + :type cipher_text: str + + :datakey_cipher_length: The key byte length, with a range of 1 to 1024. Key byte length, with a value of "64". + :type datakey_cipher_length: str + """ + try: + request = DecryptDatakeyRequest() + request.body = DecryptDatakeyRequestBody( + key_id = key_id, + cipher_text = cipher_text, + datakey_cipher_length = datakey_cipher_length, + sequence = sequence + ) + response = self._dew_client.kms_client.decrypt_datakey(request) + self.logger.info("Successfully decrypted datakey:") + self.logger.info(response) + return response + + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def encrypt_data_key(self, key_id=None, plain_text=None, datakey_plain_length=None, sequence=None): + """ + Encrypt the data key with the specified master key. + + Args: + :key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + + :param plain_ Text: When CMK is AES, SHA256 (32 bytes) for DEK plaintext and DEK plaintext; When CMK is SM4, + both DEK plaintext and SM3 (32 bytes) of DEK plaintext are represented as hexadecimal strings. + :type plain_ Text: str + + :param datakey_ Plain_ Length: DEK plaintext byte length, with a range of 1 to 1024. DEK plaintext byte length, with a value of "64". + :type datakey_ Plain_ Length: str + """ + try: + request = EncryptDatakeyRequest() + request.body = EncryptDatakeyRequestBody( + key_id = key_id, + plain_text = plain_text, + datakey_plain_length = datakey_plain_length, + sequence = sequence + ) + response = self._dew_client.kms_client.encrypt_datakey(request) + self.logger.info("Successfully encrypted datakey:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_parameters_for_import(self, key_id, wrapping_algorithm=None, sequence=None): + """ + Obtain the necessary parameters for importing keys, including key import tokens and key encryption public keys. + + Args: + :key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + :wrapping_algorithm: Key material encryption algorithm, enumeration as follows: - RSAES_ OAEP_ SHA_ 256- SM2_ ENCRYPT, + some offices do not support this import type + :type wrapping_ algorithm: str + """ + try: + request = CreateParametersForImportRequest() + request.body = GetParametersForImportRequestBody( + key_id = key_id, + wrapping_algorithm = wrapping_algorithm, + sequence = sequence + ) + response = self._dew_client.kms_client.create_parameters_for_import(request) + self.logger.info("Successfully created parameters for import:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def delete_imported_key_material(self, key_id, sequence=None): + """ + Delete key material information. + + Args: + :key_id: Key ID, 36 bytes, satisfies regular matching“^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$”. + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = DeleteImportedKeyMaterialRequest() + request.body = OperateKeyRequestBody( + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.delete_imported_key_material(request) + self.logger.info("Successfully deleted imported key material:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def import_key_material(self, options: ImportkeyMaterialOptions): + """ + Import key materials. + + Args: + : param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + + :param import_token: Key import token, in base64 format, that satisfies regular matching "^[0-9a-zA-Z+/ = ]{200, 6144}$". + :type import_ Token: str + + :param encrypted_key_material: The encrypted symmetric key material, in base64 format, satisfies regular matching + "^[0-9a-zA-Z+/ = ]{344, 360}$". If importing an asymmetric key, this parameter is the temporary intermediate key used to encrypt the private key. + :type encrypted_key_material: str + """ + try: + request = ImportKeyMaterialRequest() + request.body = ImportKeyMaterialRequestBody( + key_id = options.key_id, + import_token = options.import_token, + encrypted_key_material = options.encrypted_key_material, + encrypted_privatekey = options.encrypted_privatekey, + expiration_time = options.expiration_time, + sequence = options.sequence + ) + response = self._dew_client.kms_client.import_key_material(request) + self.logger.info("Successfully imported key material:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_grant(self, options: CreateGrantOptions): + """ + Create authorization, authorized users can operate on the authorization key. + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + + :param grantee_principal:Authorized user ID, 1~64 bytes, satisfies regular matching "^[a-zA-Z0-9]{1,64}$". + For example:0d0466b00d0466b00d0466b00d0466b0 + :type grantee_principal: str + + :param operations: List of authorized and allowed operations. Valid values: "create datakey", "create datakey without plaintext", + "encrypt-datakey", "decrypt-datakey", "describe-key", "create-grant", "retain grant", "encrypt-data", "decrypt-data". The valid + values cannot be just "create-grant"- "create-datakey" Create data key - "Create datakey without plaintext" Create data key without + plaintext - "encrypt-datakey" Encrypt data key - "decrypt-datakey" Decrypt data key - "describe-key" Query key information - + "retain-grant" Retirement authorization - "encrypt-data" Encrypt data - "decrypt data" Decrypt data + :type operations: list[str] + """ + try: + request = CreateGrantRequest() + request.body = CreateGrantRequestBody( + key_id = options.key_id, + grantee_principal = options.grantee_principal, + operations = options.listOperationsbody, + grantee_principal_type = options.grantee_principal_type, + retiring_principal = options.retiring_principal, + name = options.name, + sequence = options.sequence + ) + response = self._dew_client.kms_client.create_grant(request) + self.logger.info("Successfully create grant:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_retirable_grants(self, limit=None, marker=None, sequence=None): + """ + Query the list of authorizations that users can retire from. + """ + try: + request = ListRetirableGrantsRequest() + request.body = ListRetirableGrantsRequestBody( + limit = limit, + marker = marker, + sequence = sequence + ) + response = self._dew_client.kms_client.list_retirable_grants(request) + self.logger.info("Successfully list retirable grants:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_grants(self, key_id=None, limit=None, marker=None, sequence=None): + """ + Authorization list for querying keys + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = ListGrantsRequest() + request.body = ListGrantsRequestBody( + key_id = key_id, + limit = limit, + marker = marker, + sequence = sequence + ) + response = self._dew_client.kms_client.list_grants(request) + self.logger.info("Successfully list grants:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def cancel_self_grant(self, key_id=None, grant_id=None, sequence=None): + """ + Retirement authorization means that the authorized user no longer has the right to operate the authorization key. + For example, if user A authorizes user B to operate key A/key, and authorizes user C to revoke the authorization, + then users A, B, and C can all retire the authorization. After retiring the authorization, user B can no longer use key A. + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + + :param grant_id: Authorization ID, 64 bytes, satisfies regular matching "^[A-Fa-f0-9]{64}$". + For example:7c9a3286af4fcca5f0a385ad13e1d21a50e27b6dbcab50f37f30f93b8939827d + :type grant_id: str + """ + try: + request = CancelSelfGrantRequest() + request.body = RevokeGrantRequestBody( + grant_id = grant_id, + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.cancel_self_grant(request) + self.logger.info("Successfully cancel self grant:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def cancel_grant(self, key_id=None, grant_id=None, sequence=None): + """ + Revoke authorization, authorize the user to revoke the authorized user's permission to operate the key + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + + :param grant_id: Authorization ID, 64 bytes, satisfies regular matching "^[A-Fa-f0-9]{64}$". + For example:7c9a3286af4fcca5f0a385ad13e1d21a50e27b6dbcab50f37f30f93b8939827d + :type grant_id: str + """ + try: + request = CancelGrantRequest() + request.body = RevokeGrantRequestBody( + grant_id = grant_id, + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.cancel_grant(request) + self.logger.info("Successfully cancel grant:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def decrypt_data(self, cipher_text=None, encryption_algorithm=None, key_id=None, sequence=None): + """ + Decrypting data. + + Args: + :param cipher_text: Encrypted data ciphertext. The value is the cipher in the encrypted data result. + The value of text satisfies regular matching "^[0-9a-zA-Z+/ = ]{128, 5648}$". + :type cipher_text: str + + """ + try: + request = DecryptDataRequest() + request.body = DecryptDataRequestBody( + cipher_text = cipher_text, + encryption_algorithm = encryption_algorithm, + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.decrypt_data(request) + self.logger.info("Successfully decrypted data:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def encrypt_data(self, key_id=None, plain_text=None, encryption_algorithm=None, sequence=None): + """ + Encrypt data using the specified user master key. + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + + :param plain_text: Clear text data, 1-4096 bytes, satisfies regular matching + "^.{14096}$", and the length range after converting to a byte array is 1-4096 bytes.. + :type plain_text: str + """ + try: + request = EncryptDataRequest() + request.body = EncryptDataRequestBody( + plain_text = plain_text, + key_id = key_id, + encryption_algorithm = encryption_algorithm, + sequence = sequence + ) + response = self._dew_client.kms_client.encrypt_data(request) + self.logger.info("Successfully encrypted data:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def sign(self, asymmetric_key_id=None, message=None, signing_algorithm=None, message_type=None, sequence=None): + """ + Digitally sign a message or message digest using a private key with an asymmetric key. + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + + :param signing_algorithm: The signature algorithm is listed as follows: + - RSASSA_PSS_SHA_256 - RSASSA_PSS_SHA_384 - RSASSA_PSS_SHA_512 - RSASSA_PKCS1_V1_5_SHA_256 + - RSASSA_PKCS1_V1_5_SHA_384 - RSASSA_PKCS1_V1_5_SHA_512 - ECDSA_SHA_256 - ECDSA_SHA_384 + - ECDSA_SHA_512 - SM2DSA_SM3 + :type signing_algorithm: str + + :param message: The message digest or message to be signed must have a message length of less than 4096 bytes and be encoded using Base64. + :type message: str + """ + try: + request = SignRequest() + request.body = SignRequestBody( + signing_algorithm = signing_algorithm, + message = message, + key_id = asymmetric_key_id, + message_type = message_type, + sequence = sequence + ) + response = self._dew_client.kms_client.sign(request) + self.logger.info("Successfully sign:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def validate_signature(self, options: ValidateSignatureOptions): + """ + Verify the signature of a message or message digest using a public key with an asymmetric key. + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + + :param signing_algorithm: The signature algorithm is listed as follows: + - RSASSA_PSS_SHA_256 - RSASSA_PSS_SHA_384 - RSASSA_PSS_SHA_512 - RSASSA_PKCS1_V1_5_SHA_256 + - RSASSA_PKCS1_V1_5_SHA_384 - RSASSA_PKCS1_V1_5_SHA_512 - ECDSA_SHA_256 - ECDSA_SHA_384 + - ECDSA_SHA_512 - SM2DSA_SM3 + :type signing_algorithm: str + + :param message: The message digest or message to be signed must have a message length of less than 4096 bytes and be encoded using Base64. + :type message: str + + :param signature: The signature value to be verified is encoded using Base64. + :type signature: str + """ + try: + request = ValidateSignatureRequest() + request.body = VerifyRequestBody( + key_id = options.asymmetric_key_id, + message = options.message, + signature = options.signature, + signing_algorithm = options.signing_algorithm, + message_type = options.message_type, + sequence = options.sequence + ) + response = self._dew_client.kms_client.validate_signature(request) + self.logger.info("Successfully list validate signature:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_key_detail(self, key_id=None, sequence=None): + """ + Query key details. + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = ListKeyDetailRequest() + request.body = OperateKeyRequestBody( + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.list_key_detail(request) + self.logger.info("Successfully list key detail:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_keys(self, options: ListKeysOptions): + """ + Query the list of all user keys. + """ + try: + request = ListKeysRequest() + request.body = ListKeysRequestBody( + limit = options.limit, + marker = options.marker, + key_state = options.key_state, + key_spec = options.key_spec, + enterprise_project_id = options.enterprise_project_id, + sequence = options.sequence + ) + response = self._dew_client.kms_client.list_keys(request) + self.logger.info(json.dumps(response.to_dict()).replace('}', '}\n')) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def show_public_key(self, asymmetric_key_id=None, sequence=None): + """ + Query public key information for user specified asymmetric keys. + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = ShowPublicKeyRequest() + request.body = OperateKeyRequestBody( + key_id = asymmetric_key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.show_public_key(request) + self.logger.info("Successfully show public key:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def show_user_instances(self): + """ + Query the number of instances to obtain the number of user master keys that have already been created by the user + """ + try: + request = ShowUserInstancesRequest() + response = self._dew_client.kms_client.show_user_instances(request) + self.logger.info("Successfully show user instances:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def show_user_quotas(self): + """ + Query quotas, query the total number of user master key quotas that users can create and current usage information. + """ + try: + request = ShowUserQuotasRequest() + response = self._dew_client.kms_client.show_user_quotas(request) + self.logger.info("Successfully show user quotas:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def show_version(self, version_id=None): + """ + Check the specified API version information + + Args: + :param version_id: API version number + :type version_id: str + """ + try: + request = ShowVersionRequest(version_id = version_id) + response = self._dew_client.kms_client.show_version(request) + self.logger.info("Successfully show_version:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def show_versions(self): + """ + Query API version information list. + """ + try: + request = ShowVersionsRequest() + response = self._dew_client.kms_client.show_versions(request) + self.logger.info("Successfully show versions:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def disable_key_rotation(self, key_id=None, sequence=None): + """ + Turn off user master key rotation. + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = DisableKeyRotationRequest() + request.body = OperateKeyRequestBody( + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.disable_key_rotation(request) + self.logger.info("Successfully disable key rotation:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def enable_key_rotation(self, key_id=None, sequence=None): + """ + Enable user master key rotation. + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = EnableKeyRotationRequest() + request.body = OperateKeyRequestBody( + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.enable_key_rotation(request) + self.logger.info("Successfully enable key rotation:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def show_key_rotation_status(self, key_id=None, sequence=None): + """ + Query user master key rotation status + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + """ + try: + request = ShowKeyRotationStatusRequest() + request.body = OperateKeyRequestBody( + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.show_key_rotation_status(request) + self.logger.info("Successfully show key rotation status:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def update_key_rotation_interval(self, key_id=None, rotation_interval=None, sequence=None): + """ + Modify the user master key rotation cycle. + + Args: + :param key_id: Key ID, 36 bytes, satisfies regular matching + "^[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}$". + For example: 0d0466b0-e727-4d9c-b35d-f84bb474a37f. + :type key_id: str + + :param rotation_interval: Rotation period, an integer with a range of values from 30 to 365. + The cycle range is set based on the frequency of key usage. If the frequency of key usage is high, + it is recommended to set it to a short cycle; On the contrary, set it to long cycle. + :type rotation_interval: int + """ + try: + request = UpdateKeyRotationIntervalRequest() + request.body = UpdateKeyRotationIntervalRequestBody( + rotation_interval = rotation_interval, + key_id = key_id, + sequence = sequence + ) + response = self._dew_client.kms_client.update_key_rotation_interval(request) + self.logger.info("Successfully update key rotation interval:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_keypair(self, options: CreateKeypairOptions): + """ + Create and import SSH key pairs. + + Args: + :param name: The name of the SSH key pair.-The name of a newly created key pair cannot be the same as the name of an existing key pair. + - The SSH key pair name consists of English letters, numbers, underscores, and midlines, and the length cannot exceed 64 bytes + :type name: str + """ + try: + request = CreateKeypairRequest() + key_pair_body = CreateKeypairAction( + name = options.name, + type = options.type, + public_key = options.public_key, + scope = options.scope, + user_id = options.user_id, + key_protection = options.key_protection + ) + request.body = CreateKeypairRequestBody( + keypair = key_pair_body + ) + response = self._dew_client.kps_client.create_keypair(request) + self.logger.info("Successfully created keypair:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_keypairs(self, limit=None, marker=None): + """ + Query SSH key pair list. + """ + try: + request = ListKeypairsRequest(limit = limit, marker = marker) + response = self._dew_client.kps_client.list_keypairs(request) + self.logger.info("Successfully list keypairs:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def clear_private_key(self, keypair_name=None): + """ + Clear SSH key pair private key + + Args: + :param keypair_name: Key pair name. + :type keypair_name: str + """ + try: + request = ClearPrivateKeyRequest(keypair_name = keypair_name) + response = self._dew_client.kps_client.clear_private_key(request) + self.logger.info("Successfully cleared private key:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def import_private_key(self, user_id=None, name=None, type=None, kms_key_name=None, private_key=None): + """ + Import the private key into the specified key pair. + + Args: + :param name: The name of the SSH key pair.- The name of a newly created key pair cannot be the same as the name of an existing key pair. + - The SSH key pair name consists of English letters, numbers, underscores, and midlines, and the length cannot exceed 64 bytes + :type name: str + + :param private_key: Import the private key of an SSH key pair. + :type private_key: str + + :param type: 取值范围: "kms" or "default"- "Default" is the default encryption method, suitable for offices without KMS services. + - "KMS" refers to the use of KMS service encryption method. If the station does not have KMS service, please fill in "default". + :type type: str + + :param kms_key_name: The name of the KMS key.- If "type" is "kms", the kms service key name must be filled in. + :type kms_key_name: str + """ + try: + request = ImportPrivateKeyRequest() + encryption_key_protection = Encryption( + type = type, + kms_key_name = kms_key_name + ) + key_protection_keypair = ImportPrivateKeyProtection( + private_key = private_key, + encryption = encryption_key_protection + ) + key_pair_body = ImportPrivateKeyKeypairBean( + name = name, + key_protection = key_protection_keypair, + user_id = user_id + ) + request.body = ImportPrivateKeyRequestBody(keypair = key_pair_body) + response = self._dew_client.kps_client.import_private_key(request) + self.logger.info("Successfully import_private_key:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_keypair_detail(self, keypair_name=None): + """ + Query SSH key pair details + + Args: + :param keypair_name: Key pair name. + :type keypair_name: str + """ + try: + request = ListKeypairDetailRequest(keypair_name = keypair_name) + response = self._dew_client.kps_client.list_keypair_detail(request) + self.logger.info("Successfully list_keypair_detail") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def update_keypair_description(self, keypair_name=None, description=None): + """ + Update SSH key pair description. + + Args: + :param keypair_name: Key pair name. + :type keypair_name: str + + :param description: Descriptive information + :type description: str + """ + try: + request = UpdateKeypairDescriptionRequest( + keypair_name = keypair_name + ) + key_pair_body = UpdateKeypairDescriptionReq( + description = description + ) + request.body = UpdateKeypairDescriptionRequestBody( + keypair = key_pair_body + ) + response = self._dew_client.kps_client.update_keypair_description(request) + self.logger.info("Successfully update keypair description:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def export_private_key(self, name=None): + """ + Export the private key of the specified key pair. + + :param name: SSH key pair name. + :type name: str + """ + try: + request = ExportPrivateKeyRequest() + key_pair_body = KeypairBean(name = name) + request.body = ExportPrivateKeyRequestBody(keypair = key_pair_body) + response = self._dew_client.kps_client.export_private_key(request) + self.logger.info("Successfully exported private key:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def delete_keypair(self, keypair_name=None): + """ + Delete SSH key pair. + + Args: + :param keypair_name: Key pair name. + :type keypair_name: str + """ + try: + request = DeleteKeypairRequest(keypair_name = keypair_name) + response = self._dew_client.kps_client.delete_keypair(request) + self.logger.info("Successfully deleted keypair") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def associate_keypair(self, options: AssociateKeypairOptions): + """ + Bind a new SSH key pair to the specified virtual machine (replace or reset, replacement requires providing + the configured SSH key pair private key of the virtual machine; reset does not require providing the SSH key + pair private key of the virtual machine). + + Args: + :param keypair_name: Key pair name. + :type keypair_name: str + + :param id: Virtual machine id that needs to bind (replace or reset) SSH key pairs. + :type id: str + + :param type: The value is of enumeration type. password or keypair. + :type type: str + + :param key: - When type is the enumeration value password, key represents the password- When type is the enumeration + value keypair, key represents the private key. + :type key: str + """ + try: + request = AssociateKeypairRequest() + auth_server = Auth(type = options.type, key = options.key) + server_body = EcsServerInfo( + auth = auth_server, + id = options.id, + disable_password = options.disable_password, + port = options.port + ) + request.body = AssociateKeypairRequestBody( + server = server_body, + keypair_name = options.keypair_name + ) + response = self._dew_client.kps_client.associate_keypair(request) + self.logger.info("Successfully associate_keypair:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.info(id) + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def batch_associate_keypair(self, options: BatchAssociateKeypairOptions): + """ + Batch bind new SSH key pairs to the specified virtual machine. + + Args: + :param keypair_name: The name of the SSH key pair + :type keypair_name: list[str] + + :param id: List of virtual machine IDs that require binding (replacing or resetting) SSH key pairs. + :type id: list[str] + """ + try: + request = BatchAssociateKeypairRequest() + list_batch_keypairs_body = [] + if isinstance(options.keypair_name, list): + for t, k, i, d, p, kn in zip(options.type, options.key, options.id, options.disable_password, + options.port, options.keypair_name): + auth_server = Auth( + type = t if t is not None else None, + key = k if k is not None else None + ) + server_satch_keypairs = EcsServerInfo( + id = i if i is not None else None, + auth = auth_server, + disable_password = d if d is not None else None, + port = p if p is not None else None + ) + list_batch_keypairs_body.append(AssociateKeypairRequestBody( + keypair_name = kn if kn is not None else None, + server = server_satch_keypairs + )) + else: + auth_server = Auth( + type = options.type, + key = options.key + ) + server_satch_keypairs = EcsServerInfo( + id = options.id, + auth = auth_server, + disable_password = options.disable_password, + port = options.port + ) + list_batch_keypairs_body.append(AssociateKeypairRequestBody( + keypair_name = options.keypair_name, + server = server_satch_keypairs + )) + + request.body = BatchAssociateKeypairRequestBody( + batch_keypairs = list_batch_keypairs_body + ) + response = self._dew_client.kps_client.batch_associate_keypair(request) + self.logger.info("Successfully batch_associate_keypair:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def delete_all_failed_task(self): + """ + Delete task information that failed the operation. + """ + try: + request = DeleteAllFailedTaskRequest() + response = self._dew_client.kps_client.delete_all_failed_task(request) + self.logger.info("Successfully delete_all_failed_task:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def delete_failed_task(self, task_id=None): + """ + Delete failed tasks. + + :param task_id: Task ID + :type task_id: str + """ + try: + request = DeleteFailedTaskRequest( + task_id = task_id, + ) + response = self._dew_client.kps_client.delete_failed_task(request) + self.logger.info("Successfully delete_failed_task:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def disassociate_keypair(self, id=None, type=None, key=None): + """ + Unbind SSH key pairs to the specified virtual machine and restore SSH password login. + :param id: Virtual machine ID that needs to bind (replace or reset) SSH key pairs + :type id: str + """ + try: + request = DisassociateKeypairRequest() + auth_server = Auth(type = type, key = key) + server_body = DisassociateEcsServerInfo(id = id, auth = auth_server) + request.body = DisassociateKeypairRequestBody(server = server_body) + response = self._dew_client.kps_client.disassociate_keypair(request) + self.logger.info("Successfully disassociate_keypair:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_failed_task(self, limit=None, offset=None): + """ + Query task information for failed binding, unbinding, and other operations. + """ + try: + request = ListFailedTaskRequest() + request.limit = limit + request.offset = offset + response = self._dew_client.kps_client.list_failed_task(request) + self.logger.info("Successfully list_failed_task:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_keypair_task(self, task_id=None): + """ + The task returned by the interface based on the SSH key_ ID, query the execution status of the SSH key for the current task. + + Args: + :param task_id: task ID + :type task_id: str + """ + try: + request = ListKeypairTaskRequest(task_id = task_id) + response = self._dew_client.kps_client.list_keypair_task(request) + self.logger.info("Successfully list_keypair_task:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_running_task(self, limit=None, offset=None): + """ + Query the task information being processed. + """ + try: + request = ListRunningTaskRequest() + request.limit = limit + request.offset = offset + response = self._dew_client.kps_client.list_running_task(request) + self.logger.info("Successfully list_running_task:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_secret(self, options: CreateSecretOptions): + """ + Create a new credential and store the credential value in the initial version of the credential. + The credential management service encrypts the credential values and stores them in the version under the credential object. + Each version can be associated with multiple credential version states, which are used to identify the stage in which the credential + version is in. A version without a version state marker is considered deprecated and can be automatically deleted using the credential + management service. + The initial version status is marked as SYSCURRENT. + + Args: + :param name: The name of the credential to be created. Constraint: The value range is 1 to 64 characters, satisfying the regular matching + "^[a-zA-Z0-9_-]{1, 64}$". + :type name: str + """ + try: + request = CreateSecretRequest() + request.body = CreateSecretRequestBody( + secret_string = options.secret_string, + kms_key_id = options.kms_key_id, + name = options.name, + description = options.description, + secret_binary = options.secret_binary, + secret_type = options.secret_type, + auto_rotation = options.auto_rotation, + rotation_period = options.rotation_period, + rotation_config = options.rotation_config, + event_subscriptions = options.event_subscriptions, + enterprise_project_id = options.enterprise_project_id + ) + response = self._dew_client.cms_client.create_secret(request) + self.logger.info("Successfully create_secret:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def download_secret_blob(self, secret_name=None): + """ + Download backup files for specified credentials. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + """ + try: + request = DownloadSecretBlobRequest(secret_name = secret_name) + response = self._dew_client.cms_client.download_secret_blob(request) + self.logger.info("Successfully download_secret_blob:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_secrets(self, limit=None, marker=None, event_name=None): + """ + Query all credentials created by the current user under this project. + """ + try: + request = ListSecretsRequest() + request.limit = limit + request.marker = marker + request.event_name = event_name + response = self._dew_client.cms_client.list_secrets(request) + self.logger.info("Successfully list_secrets:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def show_secret(self, secret_name): + """ + Query information for specified credentials. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + """ + try: + request = ShowSecretRequest(secret_name = secret_name) + response = self._dew_client.cms_client.show_secret(request) + self.logger.info("Successfully show_secret:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def update_secret(self, options:UpdateSecretOptions): + """ + Update metadata information for specified credentials. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + """ + try: + request = UpdateSecretRequest() + request.body = UpdateSecretRequestBody( + event_subscriptions = options.listEventSubscriptionsbody, + description = options.description, + kms_key_id = options.kms_key_id, + auto_rotation = options.auto_rotation, + rotation_period = options.rotation_period, + ) + request.secret_name = options.secret_name + response = self._dew_client.cms_client.update_secret(request) + self.logger.info("Successfully update_secret:") + + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_secret_event(self, options: CreateSecretEventOptions): + """ + Create an event that can be configured on one or more credential objects. When an event is enabled and the + underlying event type is triggered on the credential object, the cloud service will send the corresponding + event notification to the notification subject specified by the event. + + Args: + :param name: The name of the newly created event notification. Constraint: The value range is 1 to 64 + characters, satisfying the regular matching "^ [a-zA-Z0-9_ -] {1, 64} $". + :type name: str + + :param event_types:he basic event list for this event notification, with the following basic event types. + SECRET_ VERSION_ CREATED: Version Creation SECRET_ VERSION_ EXPIRED: Version expired SECRET_ ROTATED: credential + rotation SECRET_ DELETED: The credential deletion list cannot contain duplicate underlying event types. + :type event_types: list[str] + + :param state: controls whether an event takes effect. Only the enabled state can trigger the underlying event types + included.ENABLED: enable DisaBLED: disable + :type state: str + + :param target_ Type: The object type of the event notification. + :type target_ Type: str + + :param target_id: The object ID of the event notification. + :type target_id: str + + :param target_ Name: The name of the object notified by the event. + :type target_ Name: str + """ + try: + request = CreateSecretEventRequest() + notification_body = Notification( + target_type = options.target_type, + target_id = options.target_id, + target_name = options.target_name + ) + request.body = CreateSecretEventRequestBody( + notification = notification_body, + state = options.state, + event_types = options.event_types, + name = options.event_name, + ) + response = self._dew_client.cms_client.create_secret_event(request) + self.logger.info("Successfully create_secret_event:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def upload_secret_blob(self, secret_blob=None): + """ + Restore credential objects by uploading credential backup files. + + Args: + :param secret_blob: The credential backup file obtained by backing up the specified credential object contains + all current credential version information. The backup file is encrypted and encoded, and its content cannot be read directly. + :type secret_blob: str + """ + try: + request = UploadSecretBlobRequest() + request.body = UploadSecretBlobRequestBody(secret_blob = secret_blob) + response = self._dew_client.cms_client.upload_secret_blob(request) + self.logger.info("Successfully upload_secret_blob:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def rotate_secret(self, secret_name=None): + """ + Immediately execute the rotation of credentials. Create a new credential version within the specified credentials to encrypt + and store randomly generated credential values in the background. At the same time, mark the newly created credential version + as SYSCURRENT status. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + """ + try: + request = RotateSecretRequest(secret_name = secret_name) + response = self._dew_client.cms_client.rotate_secret(request) + self.logger.info("Successfully rotate_secret:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def restore_secret(self, secret_name): + """ + Cancel the scheduled deletion task of credentials and restore the usable state of the credential object. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + """ + try: + request = RestoreSecretRequest(secret_name = secret_name) + response = self._dew_client.cms_client.restore_secret(request) + self.logger.info("Successfully restore_secret:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def delete_secret_for_schedule(self, recovery_window_in_days=None, secret_name=None): + """ + Specify a delayed deletion time, create a scheduled task to delete credentials, and set a delayed deletion time of 7-30 days. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + + :param recovery_ Window_ In_ Days: Create a task to delete credentials on a scheduled basis, and specify the number of days + that can be recovered. Constraints: 7-30. Default value: 30. + :type recovery_ Window_ In_ Days: int + """ + try: + request = DeleteSecretForScheduleRequest(secret_name = secret_name) + request.body = DeleteSecretForScheduleRequestBody( + recovery_window_in_days = recovery_window_in_days + ) + response = self._dew_client.cms_client.delete_secret_for_schedule(request) + self.logger.info("Successfully delete_secret_for_schedule:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def delete_secret(self, secret_name): + """ + Immediately delete the specified credentials and cannot recover them. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + """ + try: + request = DeleteSecretRequest(secret_name = secret_name) + response = self._dew_client.cms_client.delete_secret(request) + self.logger.info("Successfully delete_secret:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_secret_version(self, secret_name=None, secret_binary=None, + secret_string=None, version_stages=None, expire_time=None): + """ + Create a new credential version within the specified credentials to encrypt and store the new credential values. + By default, newly created credential versions are marked as SYSCURRENT, while the previous credential version marked + with SYSCURRENT is marked as SYSPREVIOUS. You can override the default behavior by specifying the Version Stage parameter. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + """ + try: + request = CreateSecretVersionRequest(secret_name = secret_name) + request.body = CreateSecretVersionRequestBody( + secret_string = secret_string, + secret_binary = secret_binary, + version_stages = version_stages, + expire_time = expire_time + ) + response = self._dew_client.cms_client.create_secret_version(request) + self.logger.info("Successfully create_secret_version:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_secret_versions(self, secret_name=None, marker=None, limit=None): + """ + Query version list information under specified credentials. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + """ + try: + request = ListSecretVersionsRequest() + request.secret_name = secret_name + request.marker = marker + request.limit = limit + response = self._dew_client.cms_client.list_secret_versions(request) + self.logger.info("Successfully list_secret_versions:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def show_secret_version(self, secret_name=None, version_id=None): + """ + Query the information of the specified credential version and the plaintext credential value in the version, + only the credentials with ENABLED status can be queried. The latest version of the credentials can be accessed + through/v1/{project_id}/secrets/{secretname}/versions/latest (i.e. assigning {version _id} in the current interface URL as latest) + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + + :param version_id: The version identifier of the credential. + :type version_id: str + """ + try: + request = ShowSecretVersionRequest() + request.secret_name = secret_name + request.version_id = version_id + response = self._dew_client.cms_client.show_secret_version(request) + self.logger.info("Successfully show_secret_version:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def update_version(self, secret_name=None, version_id=None, expire_time=None): + """ + Currently, it supports updating the validity period of specified credential versions, and can only update + credentials with ENABLED status. When the event associated with the subscription includes a basic event type + of "version expiration", only one event notification will be triggered after each version expiration update. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + + :param version_id: The version identifier of the credential. + :type version_id: str + + :param expire_time: The expiration time of the credential version, timestamp, is the total number of seconds + from January 1, 1970 to that time. The default is empty, and the value used to determine the validity period + when subscribing to the "version expiration" event type for credentials. + :type expire_time: int + """ + try: + request = UpdateVersionRequest() + request.version_id = version_id + request.secret_name = secret_name + request.body = UpdateVersionRequestBody( + expire_time = expire_time + ) + response = self._dew_client.cms_client.update_version(request) + self.logger.info("Successfully update_version:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def show_secret_stage(self, secret_name=None, stage_name=None): + """ + Query the version information of the specified credential version status marker. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + + :param stage_name: The name of the credential version status. + :type stage_name: str + """ + try: + request = ShowSecretStageRequest(secret_name = secret_name, stage_name = stage_name) + response = self._dew_client.cms_client.show_secret_stage(request) + self.logger.info("Successfully show_secret_stage:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def update_secret_stage(self, secret_name=None, stage_name=None, version_id=None): + """ + Update the version status of credentials. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + + :param stage_name: The name of the credential version status.matching '^[a-zA-Z0-9_-]{1, 64}$' + :type stage_name: str + """ + try: + request = UpdateSecretStageRequest(secret_name = secret_name, stage_name = stage_name) + request.body = UpdateSecretStageRequestBody(version_id = version_id) + response = self._dew_client.cms_client.update_secret_stage(request) + self.logger.info("Successfully update_secret_stage:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def delete_secret_stage(self, secret_name=None, stage_name=None): + """ + Delete the version status of credentials. + + Args: + :param secret_name: The name of the credential. + :type secret_name: str + + :param stage_name: The name of the credential version status. + :type stage_name: str + """ + try: + request = DeleteSecretStageRequest(secret_name = secret_name, stage_name = stage_name) + response = self._dew_client.cms_client.delete_secret_stage(request) + self.logger.info("Successfully delete_secret_stage:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def batch_create_or_delete_tags(self, secret_id=None, key=None, value=None, action=None, sequence=None): + """ + Batch add or remove credential labels. + + Args: + :param secret_id: Credential ID + :type secret_id: str + + :param key: The name of the tag. A label key can only correspond to one label value for the same credential; + Different credentials can use the same label key. Users can add up to 20 tags to a single credential. Constraint: + The value range is from 1 to 128 characters, which satisfies regular matching\"^((?!\\\\s)(?!_sys_)[\\\\p{L} + \\\\p{Z}\\\\p{N}_.:=+\\\\-@]*)(?<!\\\\s)$\" + :type key: list[str] + + :param action: Operation identifier: limited to "create" and "delete" only. + :type action: str + """ + try: + request = BatchCreateOrDeleteTagsRequest(secret_id = secret_id) + + listTagsbody = [] + if isinstance(key, list): + for k, v in zip(key, value): + listTagsbody.append(TagItem(key = k, + value = v if v is not None else None)) + else: + listTagsbody.append(TagItem(key = key, value = value)) + + request.body = BatchCreateOrDeleteTagsRequestBody( + action = action, + tags = listTagsbody, + sequence = sequence + ) + response = self._dew_client.cms_client.batch_create_or_delete_tags(request) + self.logger.info("Successfully batch_create_or_delete_tags:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_secret_tag(self, secret_id=None, key=None, value=None): + """ + Add credential labels. + + Args: + :param secret_id: Credential ID + :type secret_id: str + + :param key: The name of the tag. A label key can only correspond to one label value for the same credential; + Different credentials can use the same label key. Users can add up to 20 tags to a single credential. Constraint: + The value range is from 1 to 128 characters, which satisfies regular matching\"^((?!\\\\s)(?!_sys_)[\\\\p{L} + \\\\p{Z}\\\\p{N}_.:=+\\\\-@]*)(?<!\\\\s)$\" + :type key: str + """ + try: + request = CreateSecretTagRequest(secret_id = secret_id) + tagbody = TagItem( + key = key, + value = value, + ) + request.body = CreateSecretTagRequestBody( + tag = tagbody + ) + response = self._dew_client.cms_client.create_secret_tag(request) + self.logger.info("Successfully create_secret_tag:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def delete_secret_tag(self, secret_id=None, key=None): + """ + Delete credential labels. + + Args: + :param secret_id: Credential ID + :type secret_id: str + + :param key: The name of the tag. A label key can only correspond to one label value for the same credential; + Different credentials can use the same label key. Users can add up to 20 tags to a single credential. Constraint: + The value range is from 1 to 128 characters, which satisfies regular matching\"^((?!\\\\s)(?!_sys_)[\\\\p{L} + \\\\p{Z}\\\\p{N}_.:=+\\\\-@]*)(?<!\\\\s)$\" + :type key: str + """ + try: + request = DeleteSecretTagRequest(secret_id = secret_id, key = key) + response = self._dew_client.cms_client.delete_secret_tag(request) + self.logger.info("Successfully delete_secret_tag:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_project_secrets_tags(self): + """ + Query the collection of all credential labels for the user under the specified project. + """ + try: + request = ListProjectSecretsTagsRequest() + response = self._dew_client.cms_client.list_project_secrets_tags(request) + self.logger.info("Successfully list_project_secrets_tags:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_resource_instances(self, options: ListResourceInstancesOptions): + """ + Query credential instances. Filter user credentials through tag filtering and return a list of credentials. + + Args: + :param action: Operation identifier (can be set as "filter" or "count")- Filter: represents filtering + - Count: represents the total number of queries. + :type action: str + + :param resource_instances: Set the value to resource_instances. + :type resource_instances: str + """ + try: + request = ListResourceInstancesRequest(resource_instances = options.resource_instances) + listTagsbody = [] + if isinstance(options.key, list): + for k, v in zip(options.key, options.values): + listTagsbody.append(Tag(key = k if k is not None else None, + values = v if v is not None else None)) + else: + listTagsbody.append(Tag(key = options.key, values = options.values)) + request.body = ListResourceInstancesRequestBody( + tags = listTagsbody if options.key is not None else None, + action = options.action, + matches = options.matches, + sequence = options.sequence, + offset = options.offset, + limit = options.limit + ) + response = self._dew_client.cms_client.list_resource_instances(request) + self.logger.info("Successfully list_resource_instances:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_secret_tags(self, secret_id=None): + """ + Query credential labels. + + Args: + :param secret_id: Credential ID + :type secret_id: str + """ + try: + request = ListSecretTagsRequest(secret_id = secret_id) + response = self._dew_client.cms_client.list_secret_tags(request) + self.logger.info("Successfully list_secret_tags:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def delete_secret_event(self, event_name=None): + """ + Immediately delete the specified event and cannot be restored. If there is a credential reference in the event, + it cannot be deleted. Please dissociate it first. + + Args: + :param event_name: The name of the event notification. + :type event_name: str + """ + try: + request = DeleteSecretEventRequest(event_name = event_name) + response = self._dew_client.cms_client.delete_secret_event(request) + self.logger.info("Successfully delete_secret_event:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_secret_events(self, marker=None, limit=None): + """ + Query all events created by the current user under this project + """ + try: + request = ListSecretEventsRequest() + request.limit = limit + request.marker = marker + response = self._dew_client.cms_client.list_secret_events(request) + self.logger.info("Successfully list_secret_events:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def show_secret_event(self, event_name=None): + """ + Query information for a specified event. + + Args: + :param event_name: The name of the event notification. + :type event_name: str + """ + try: + request = ShowSecretEventRequest(event_name = event_name) + response = self._dew_client.cms_client.show_secret_event(request) + self.logger.info("Successfully show_secret_event:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def update_secret_event(self, event_name=None, target_type=None, target_id=None, target_name=None): + """ + Update metadata information for specified events. The metadata that supports updates includes event enable status, + basic type list, and notification topic. + + Args: + :param event_name: The name of the event notification. + :type event_name: str + """ + try: + request = UpdateSecretEventRequest(event_name = event_name) + notificationbody = Notification( + target_type = target_type, + target_id = target_id, + target_name = target_name + ) + request.body = UpdateSecretEventRequestBody( + notification = notificationbody + ) + response = self._dew_client.cms_client.update_secret_event(request) + self.logger.info("Successfully update_secret_event:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_notification_records(self, marker=None, limit=None): + """ + Query triggered event notification records. + """ + try: + request = ListNotificationRecordsRequest( + limit = limit, + marker = marker + ) + response = self._dew_client.cms_client.list_notification_records(request) + self.logger.info("Successfully list_notification_records:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def batch_create_kms_tags(self, key_id=None, key=None, value=None, action=None): + """ + + Args: + :param key_id: key ID + :type key_id: str + + :param action: Operation identifier: limited to "create" and "delete". + :type action: str + + :param key: The name of the label. A label key can only correspond to one label value for the same credential; + Different credentials can use the same label key. Users can add up to 20 tags to a single credential. Constraint: + Value range from 1 to 128 characters, satisfying regular matching\"^((?!\\\\s)(?!_sys_)[\\\\p{L}\\\\p{Z}\\\\p{N} + _.:=+\\\\-@]*)(?<!\\\\s)$\" + :type key: list[str] + """ + try: + request = BatchCreateKmsTagsRequest(key_id = key_id) + listTagsbody = [] + if isinstance(key, list): + if not isinstance(value, list): + value = [value] + for k, v in zip(key, value): + listTagsbody.append(TagItem(key = k, + value = v if v is not None else None)) + else: + listTagsbody.append(TagItem(key = key, value = value)) + request.body = BatchCreateKmsTagsRequestBody( + action = action, + tags = listTagsbody + ) + response = self._dew_client.kms_client.batch_create_kms_tags(request) + self.logger.info("Successfully batch_create_kms_tags:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_kms_tag(self, key_id=None, key=None, value=None): + """ + Add key labels. + + Args: + :param key_id: key ID + :type key_id: str + + :param key: The name of the label. A label key can only correspond to one label value for the same credential; + Different credentials can use the same label key. Users can add up to 20 tags to a single credential. Constraint: + Value range from 1 to 128 characters, satisfying regular matching\"^((?!\\\\s)(?!_sys_)[\\\\p{L}\\\\p{Z}\\\\p{N} + _.:=+\\\\-@]*)(?<!\\\\s)$\" + :type key: str + """ + try: + request = CreateKmsTagRequest(key_id = key_id) + tagbody = TagItem( + key = key, + value = value + ) + request.body = CreateKmsTagRequestBody( + tag = tagbody + ) + response = self._dew_client.kms_client.create_kms_tag(request) + self.logger.info("Successfully create_kms_tag:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_kms_tags(self): + """ + Query all label collections of the user under the specified project. + """ + try: + request = ListKmsTagsRequest() + response = self._dew_client.kms_client.list_kms_tags(request) + self.logger.info("Successfully list_kms_tags:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def delete_tag(self, key_id=None, key=None): + """ + Delete key label. + + Args: + :param key_id: Key ID + :type key_id: str + :param key: Value of label key + :type key: str + """ + try: + request = DeleteTagRequest(key_id = key_id, key = key) + response = self._dew_client.kms_client.delete_tag(request) + self.logger.info("Successfully delete_tag:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def list_kms_by_tags(self, options: ListKmsByTagsOptions): + """ + Query key instances. Filter through tags to query detailed information about the specified user's master key. + """ + try: + request = ListKmsByTagsRequest(resource_instances = "resource_instances") + listTagsbody = [] + if isinstance(options.key, list): + for k, v in zip(options.key, options.values): + listTagsbody.append(Tag(key = k if k is not None else None, + values = v if v is not None else None)) + else: + listTagsbody.append(Tag(key = options.key, values = options.values)) + request.body = ListKmsByTagsRequestBody( + tags = listTagsbody if options.key is not None else None, + action = options.action, + offset = options.offset, + limit = options.limit, + sequence = options.sequence, + matches = options.matches + ) + response = self._dew_client.kms_client.list_kms_by_tags(request) + self.logger.info("Successfully list_kms_by_tags:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def show_kms_tags(self, key_id=None): + """ + Query the key label. + + Args: + :param key_id: Key ID + :type key_id: str + """ + try: + request = ShowKmsTagsRequest(key_id = key_id) + response = self._dew_client.kms_client.show_kms_tags(request) + self.logger.info("Successfully show_kms_tags:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e + + def create_key_store(self, keystore_alias=None, hsm_cluster_id=None, hsm_ca_cert=None): + """" + Create a tenant exclusive key repository that uses DHSM instances as key storage. + + Args: + :param keystore_alias: Exclusive key repository alias, with a value range of 1 to 255 characters, satisfies regular matching + "^[a-zA-Z0-9:/_-]{1, 255}$", and does not have the same name as existing exclusive key repository aliases. + :type keystore_alias: str + :param hsm_cluster_id: The DHSM cluster ID requires that the cluster has not yet created a dedicated key repository. + :type hsm_cluster_id: str + :param hsm_ca_cert: CA certificate for DHSM cluster + :type hsm_ca_cert: str + """ + try: + request = CreateKeyStoreRequest() + request.body = CreateKeyStoreRequestBody( + hsm_ca_cert = hsm_ca_cert, + hsm_cluster_id = hsm_cluster_id, + keystore_alias = keystore_alias + ) + response = self._dew_client.kms_client.create_key_store(request) + self.logger.info("Successfully create_key_store:") + self.logger.info(response) + return response + except exceptions.ClientRequestException as e: + self.logger.error(f"status_code = {e.status_code}, request_id = {e.request_id}, \ + error_code = {e.error_code}, error_msg = {e.error_msg}") + return e diff --git a/prefect_huaweicloud/dew_client.py b/prefect_huaweicloud/dew_client.py new file mode 100644 index 0000000000000000000000000000000000000000..b933607ad95c49600eeaad9d4373933934c710f6 --- /dev/null +++ b/prefect_huaweicloud/dew_client.py @@ -0,0 +1,29 @@ +from huaweicloudsdkcore.auth.credentials import BasicCredentials +from huaweicloudsdkkms.v2.region.kms_region import KmsRegion +from huaweicloudsdkkms.v2 import KmsClient +from huaweicloudsdkkps.v3.region.kps_region import KpsRegion +from huaweicloudsdkkps.v3 import KpsClient +from huaweicloudsdkcsms.v1.region.csms_region import CsmsRegion +from huaweicloudsdkcsms.v1 import CsmsClient + + +class DewClient(): + def __init__(self,region_id = None, ak = None, sk = None, project_id = None, *args, **kwargs): + if not ak or not sk: + raise Exception("please input both huawei_cloud_access_key_id and huawei_cloud_secret_access_key") + self.region_id = region_id + self.__ak = ak + self.__sk = sk + self.__project_id=project_id + self.kms_client = KmsClient.new_builder() \ + .with_credentials(BasicCredentials(self.__ak, self.__sk, self.__project_id)) \ + .with_region(KmsRegion.value_of(self.region_id)) \ + .build() + self.kps_client = KpsClient.new_builder() \ + .with_credentials(BasicCredentials(self.__ak, self.__sk, self.__project_id)) \ + .with_region(KpsRegion.value_of(self.region_id)) \ + .build() + self.cms_client = CsmsClient.new_builder() \ + .with_credentials(BasicCredentials(self.__ak, self.__sk, self.__project_id)) \ + .with_region(CsmsRegion.value_of(self.region_id)) \ + .build() diff --git a/prefect_huaweicloud/test/test_dew_block.py b/prefect_huaweicloud/test/test_dew_block.py new file mode 100644 index 0000000000000000000000000000000000000000..1d33d1277950eaa3d11e6f36c2e65b0a3273af94 --- /dev/null +++ b/prefect_huaweicloud/test/test_dew_block.py @@ -0,0 +1,307 @@ +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).resolve().parents[1])) +from dew_block import DewBlock +from prefect import flow + +KEY_ID='key_id' +KEY_ALIAS='key_alias' +GRANT_ID='grant_id' +KEYPAIR_NAME='keypair_name' +KEY='key' +TYPE='type' +ID='id' +SECRET_NAME='secret_name' +SECRET_ID='secret_id' +STAGE_NAME='stage_name' +STAGE_ID='stage_id' +EVENT_NAME='event_name' +def kms_parameters(): + test_key = None + key_alias = None + if "key_alias" in huaweicloud_dew_block.dew_parameters: + key_alias = huaweicloud_dew_block.dew_parameters["key_alias"] + test_key = huaweicloud_dew_block.create_key(DewBlock.CreatekeyOptions( key_alias = key_alias)) + return dict( + key_alias = key_alias if key_alias is not None else None, + key_id = test_key.key_info.key_id if test_key is not None else None, + key_description = None, + pending_days = huaweicloud_dew_block.dew_parameters["pending_days"] if "pending_days" in huaweicloud_dew_block.dew_parameters else None, + + random_data_length = huaweicloud_dew_block.dew_parameters["random_data_length"] if "random_data_length" in huaweicloud_dew_block.dew_parameters else None, + datakey_length = huaweicloud_dew_block.dew_parameters["datakey_length"] if "datakey_length" in huaweicloud_dew_block.dew_parameters else None, + datakey_plain_length = huaweicloud_dew_block.dew_parameters["datakey_plain_length"] if "datakey_plain_length" in huaweicloud_dew_block.dew_parameters else None, + datakey_cipher_length = huaweicloud_dew_block.dew_parameters["datakey_cipher_length"] if "datakey_cipher_length" in huaweicloud_dew_block.dew_parameters else None, + cipher_text = None, + plain_text = None, + datakey_dgst = None, + + asymmetric_key_id = huaweicloud_dew_block.dew_parameters["asymmetric_key_id"] if "asymmetric_key_id" in huaweicloud_dew_block.dew_parameters else None, + + grantee_principal = huaweicloud_dew_block.dew_parameters["grantee_principal"] if "grantee_principal" in huaweicloud_dew_block.dew_parameters else None, + listOperationsbody = huaweicloud_dew_block.dew_parameters["listOperationsbody"] if "listOperationsbody" in huaweicloud_dew_block.dew_parameters else None, + grant_id = None, + + rotation_interval = huaweicloud_dew_block.dew_parameters["rotation_interval"] if "rotation_interval" in huaweicloud_dew_block.dew_parameters else None, + + signing_algorithm = huaweicloud_dew_block.dew_parameters["signing_algorithm"] if "signing_algorithm" in huaweicloud_dew_block.dew_parameters else None, + message = huaweicloud_dew_block.dew_parameters["message"] if "message" in huaweicloud_dew_block.dew_parameters else None, + message_type = huaweicloud_dew_block.dew_parameters["message_type"] if "message_type" in huaweicloud_dew_block.dew_parameters else None, + signature = None, + + action = huaweicloud_dew_block.dew_parameters["action"] if "action" in huaweicloud_dew_block.dew_parameters else None, + key = huaweicloud_dew_block.dew_parameters[KEY] if KEY in huaweicloud_dew_block.dew_parameters else None, + version_id = huaweicloud_dew_block.dew_parameters["version_id"] if "version_id" in huaweicloud_dew_block.dew_parameters else None, + ) + + +def kps_parameters(): + key_pair_list = huaweicloud_dew_block.list_keypairs() + test_key = None + name = None + if "name" in huaweicloud_dew_block.dew_parameters: + name = huaweicloud_dew_block.dew_parameters.get("name") + if name in list(x.keypair.name for x in key_pair_list.keypairs[:]): + huaweicloud_dew_block.delete_keypair(keypair_name = name) + test_key = huaweicloud_dew_block.create_keypair(DewBlock.CreateKeypairOptions(name = name)) + return dict( + name = name, + keypair_name = name, + private_key = test_key.keypair.private_key, + description = None, + id = huaweicloud_dew_block.dew_parameters["id"] if "id" in huaweicloud_dew_block.dew_parameters else None, + kms_key_name = huaweicloud_dew_block.dew_parameters["kms_key_name"] if "kms_key_name" in huaweicloud_dew_block.dew_parameters else None, + type = huaweicloud_dew_block.dew_parameters["type"] if "type" in huaweicloud_dew_block.dew_parameters else None, + key = huaweicloud_dew_block.dew_parameters[KEY] if KEY in huaweicloud_dew_block.dew_parameters else None, + task_id = None, + disable_password = huaweicloud_dew_block.dew_parameters["disable_password"] if "disable_password" in huaweicloud_dew_block.dew_parameters else None, + ) + + +def csms_parameters(): + secret_list = huaweicloud_dew_block.list_secrets() + secret_string = None + event_name = None + test_secret = None + if "name" and "secret_string" in huaweicloud_dew_block.dew_parameters: + name = huaweicloud_dew_block.dew_parameters.get("name") + secret_string = huaweicloud_dew_block.dew_parameters["secret_string"] + + if name in list(x.name for x in secret_list.secrets[:]): + huaweicloud_dew_block.delete_secret(secret_name = name) + test_secret = huaweicloud_dew_block.create_secret(DewBlock.CreateSecretOptions(name = name, secret_string = secret_string)) + + secret_event = huaweicloud_dew_block.list_secret_events() + + if event_name in list(x.name for x in secret_event.events[:]): + huaweicloud_dew_block.delete_secret_event(event_name = event_name) + return dict( + secret_name = test_secret.secret.name, + version_id = huaweicloud_dew_block.dew_parameters["version_id"] if "version_id" in huaweicloud_dew_block.dew_parameters else None, + secret_blob = None, + expire_time = huaweicloud_dew_block.dew_parameters["expire_time"] if "expire_time" in huaweicloud_dew_block.dew_parameters else None, + secret_string = secret_string, + stage_name = huaweicloud_dew_block.dew_parameters[STAGE_NAME] if STAGE_NAME in huaweicloud_dew_block.dew_parameters else None, + update_stage_name = huaweicloud_dew_block.dew_parameters["update_stage_name"] if "update_stage_name" in huaweicloud_dew_block.dew_parameters else None, + update_version_id = huaweicloud_dew_block.dew_parameters["update_version_id"] if "update_version_id" in huaweicloud_dew_block.dew_parameters else None, + secret_id = test_secret.secret.id, + key = huaweicloud_dew_block.dew_parameters[KEY] if KEY in huaweicloud_dew_block.dew_parameters else None, + action = huaweicloud_dew_block.dew_parameters["action0"] if "action0" in huaweicloud_dew_block.dew_parameters else None, + action1 = huaweicloud_dew_block.dew_parameters["action1"] if "action1" in huaweicloud_dew_block.dew_parameters else None, + + value = huaweicloud_dew_block.dew_parameters["value"] if "value" in huaweicloud_dew_block.dew_parameters else None, + values = huaweicloud_dew_block.dew_parameters["values"] if "values" in huaweicloud_dew_block.dew_parameters else None, + resource_instances = huaweicloud_dew_block.dew_parameters["resource_instances"] if "resource_instances" in huaweicloud_dew_block.dew_parameters else None, + event_name = huaweicloud_dew_block.dew_parameters[EVENT_NAME] if EVENT_NAME in huaweicloud_dew_block.dew_parameters else None, + event_types = huaweicloud_dew_block.dew_parameters["event_types"] if "event_types" in huaweicloud_dew_block.dew_parameters else None, + state = huaweicloud_dew_block.dew_parameters["state"] if "state" in huaweicloud_dew_block.dew_parameters else None, + target_type = huaweicloud_dew_block.dew_parameters["target_type"] if "target_type" in huaweicloud_dew_block.dew_parameters else None, + target_id = huaweicloud_dew_block.dew_parameters["target_id"] if "target_id" in huaweicloud_dew_block.dew_parameters else None, + target_name = huaweicloud_dew_block.dew_parameters["target_name"] if "target_name" in huaweicloud_dew_block.dew_parameters else None, + recovery_window_in_days = huaweicloud_dew_block.dew_parameters["recovery_window_in_days"] if "recovery_window_in_days" in huaweicloud_dew_block.dew_parameters else None, + + ) + + +@flow(name = "test_key_lifecycle_management") +def test_key_lifecycle_management(kms_params): + row_key_alias = kms_params.get(KEY_ALIAS) + huaweicloud_dew_block.create_key(DewBlock.CreatekeyOptions(key_alias = kms_params.get(KEY_ALIAS))) + huaweicloud_dew_block.disable_key(key_id = kms_params.get(KEY_ID)) + huaweicloud_dew_block.enable_key(key_id = kms_params.get(KEY_ID)) + kms_params[KEY_ALIAS] = "test_alias" + huaweicloud_dew_block.update_key_alias(key_id = kms_params.get(KEY_ID), key_alias = kms_params.get(KEY_ALIAS)) + kms_params[KEY_ALIAS] = row_key_alias + huaweicloud_dew_block.update_key_alias(key_id = kms_params.get(KEY_ID), key_alias = kms_params.get(KEY_ALIAS)) + kms_params['key_description'] = "test_update_description" + huaweicloud_dew_block.update_key_description(key_id = kms_params.get(KEY_ID), key_description = kms_params.get('key_description')) + huaweicloud_dew_block.delete_key(key_id = kms_params.get(KEY_ID), pending_days = kms_params.get('pending_days')) + huaweicloud_dew_block.cancel_key_deletion(key_id = kms_params.get(KEY_ID)) + huaweicloud_dew_block.enable_key(key_id = kms_params.get(KEY_ID)) + + +@flow(name = 'test_data_key_management') +def test_data_key_management(kms_params): + huaweicloud_dew_block.create_data_key_without_plaintext(key_id = kms_params.get(KEY_ID)) + create_data_key_response = huaweicloud_dew_block.create_data_key(key_id = kms_params.get(KEY_ID), datakey_length = kms_params.get('datakey_length')) + huaweicloud_dew_block.create_random(random_data_length = kms_params.get('random_data_length')) + kms_params["cipher_text"] = create_data_key_response.cipher_text + decrypt_data_key_response = huaweicloud_dew_block.decrypt_data_key(key_id = kms_params.get(KEY_ID), cipher_text = kms_params.get('cipher_text'), datakey_cipher_length = kms_params.get('datakey_cipher_length')) + kms_params["datakey_dgst"] = decrypt_data_key_response.datakey_dgst + kms_params["plain_text"] = decrypt_data_key_response.data_key+decrypt_data_key_response.datakey_dgst + huaweicloud_dew_block.encrypt_data_key(key_id = kms_params.get(KEY_ID), plain_text = kms_params.get('plain_text'), datakey_plain_length = kms_params.get('datakey_plain_length')) + + +@flow(name = 'test_key_grant_management') +def test_key_grant_management(kms_params): + create_grant_response = huaweicloud_dew_block.create_grant(DewBlock.CreateGrantOptions(key_id = kms_params.get(KEY_ID), + grantee_principal = kms_params.get("grantee_principal"), listOperationsbody = kms_params.get("listOperationsbody"))) + kms_params[GRANT_ID] = create_grant_response.grant_id + huaweicloud_dew_block.list_retirable_grants() + huaweicloud_dew_block.cancel_self_grant(key_id = kms_params.get(KEY_ID), grant_id = kms_params.get(GRANT_ID)) + huaweicloud_dew_block.create_grant(DewBlock.CreateGrantOptions(key_id = kms_params.get(KEY_ID), + grantee_principal = kms_params.get("grantee_principal"), listOperationsbody = kms_params.get("listOperationsbody"))) + huaweicloud_dew_block.cancel_grant(key_id = kms_params.get(KEY_ID), grant_id = kms_params.get(GRANT_ID)) + huaweicloud_dew_block.list_grants(key_id = kms_params.get(KEY_ID)) + + +@flow(name = 'test_key_pair_management') +def test_key_pair_management(kps_params): + huaweicloud_dew_block.import_private_key(name = kps_params.get('name'), type = kps_params.get(TYPE), kms_key_name = kps_params.get('kms_key_name'), private_key = kps_params.get('private_key')) + huaweicloud_dew_block.export_private_key(name = kps_params.get('name')) + huaweicloud_dew_block.list_keypair_detail(keypair_name = kps_params.get(KEYPAIR_NAME)) + huaweicloud_dew_block.list_keypairs() + huaweicloud_dew_block.clear_private_key(keypair_name = kps_params.get(KEYPAIR_NAME)) + + kps_params['description'] = "test_update_description" + huaweicloud_dew_block.update_keypair_description(keypair_name = kps_params.get(KEYPAIR_NAME), description = kps_params.get('description')) + + huaweicloud_dew_block.delete_keypair(keypair_name = kps_params.get(KEYPAIR_NAME)) + + +@flow(name = 'test_key_pair_task_management') +def test_key_pair_task_management(kps_params): + kps_params["task_id"] = huaweicloud_dew_block.associate_keypair(DewBlock.AssociateKeypairOptions(keypair_name = kps_params.get(KEYPAIR_NAME), + id = kps_params.get(ID), type = kps_params.get(TYPE), key = kps_params.get(KEY), disable_password = kps_params.get('disable_password'))).task_id + huaweicloud_dew_block.batch_associate_keypair(DewBlock.BatchAssociateKeypairOptions(keypair_name = kps_params.get(KEYPAIR_NAME), + id = kps_params.get(ID), type = kps_params.get(TYPE), key = kps_params.get(KEY), disable_password = kps_params.get('disable_password'))) + huaweicloud_dew_block.delete_all_failed_task() + huaweicloud_dew_block.disassociate_keypair(id = kps_params.get(ID), type = kps_params.get(TYPE), key = kps_params.get(KEY)) + huaweicloud_dew_block.list_failed_task() + huaweicloud_dew_block.list_keypair_task(task_id = kps_params.get('task_id')) + huaweicloud_dew_block.list_running_task() + + +@flow(name = 'test_small_data_encryption') +def test_small_data_encryption(kms_params): + encrypt_data_response = huaweicloud_dew_block.encrypt_data(key_id = kms_params.get(KEY_ID), plain_text = kms_params.get("plain_text"), encryption_algorithm = kms_params.get("encryption_algorithm")) + kms_params["cipher_text"] = encrypt_data_response.cipher_text + huaweicloud_dew_block.decrypt_data(cipher_text = kms_params.get("cipher_text"), key_id = kms_params.get(KEY_ID), encryption_algorithm = kms_params.get("encryption_algorithm")) + + +@flow(name = 'test_list_key') +def test_list_key(kms_params): + huaweicloud_dew_block.list_keys(DewBlock.ListKeysOptions()) + huaweicloud_dew_block.list_key_detail(key_id = kms_params.get(KEY_ID)) + huaweicloud_dew_block.show_public_key(asymmetric_key_id = kms_params.get("asymmetric_key_id")) + huaweicloud_dew_block.show_user_instances() + huaweicloud_dew_block.show_user_quotas() + + +@flow(name = 'test_key_rotation_management') +def test_key_rotation_management(kms_params): + huaweicloud_dew_block.enable_key_rotation(key_id = kms_params.get(KEY_ID)) + huaweicloud_dew_block.show_key_rotation_status(key_id = kms_params.get(KEY_ID)) + huaweicloud_dew_block.update_key_rotation_interval(key_id = kms_params.get(KEY_ID), rotation_interval = kms_params.get("rotation_interval")) + huaweicloud_dew_block.disable_key_rotation(key_id = kms_params.get(KEY_ID)) + + +@flow(name = 'test_signature_verification') +def test_signature_verification(kms_params): + row_sign = huaweicloud_dew_block.sign(kms_params.get("asymmetric_key_id"), kms_params.get("message"), + kms_params.get("signing_algorithm"), kms_params.get("message_type")) + kms_params['signature'] = row_sign.signature + huaweicloud_dew_block.validate_signature(options = DewBlock.ValidateSignatureOptions(asymmetric_key_id = kms_params.get("asymmetric_key_id"), + signature = kms_params.get("signature"), message = kms_params.get("message"), signing_algorithm = kms_params.get("signing_algorithm"), message_type = kms_params.get("message_type"))) + + +@flow(name = 'test_lifecycle_management') +def test_lifecycle_management(csms_params): + huaweicloud_dew_block.show_secret(secret_name = csms_params.get(SECRET_NAME)) + csms_params["secret_blob"] = huaweicloud_dew_block.download_secret_blob(secret_name = csms_params.get(SECRET_NAME)).secret_blob + huaweicloud_dew_block.update_secret(DewBlock.UpdateSecretOptions(secret_name = csms_params.get(SECRET_NAME), )) + huaweicloud_dew_block.delete_secret_for_schedule(secret_name = csms_params.get(SECRET_NAME), + recovery_window_in_days = csms_params.get("recovery_window_in_days")) + huaweicloud_dew_block.restore_secret(secret_name = csms_params.get(SECRET_NAME)) + huaweicloud_dew_block.delete_secret(secret_name = csms_params.get(SECRET_NAME)) + huaweicloud_dew_block.upload_secret_blob(secret_blob = csms_params.get("secret_blob")) + + +@flow(name = 'test_credential_version_management') +def test_credential_version_management(csms_params): + huaweicloud_dew_block.create_secret_version(secret_name = csms_params.get(SECRET_NAME), secret_string = csms_params.get("secret_string")) + huaweicloud_dew_block.show_secret_version(secret_name = csms_params.get(SECRET_NAME), version_id = csms_params.get("version_id")) + huaweicloud_dew_block.list_secret_versions(secret_name = csms_params.get(SECRET_NAME)) + huaweicloud_dew_block.update_version(secret_name = csms_params.get(SECRET_NAME), version_id = csms_params.get("version_id"), expire_time = csms_params.get("expire_time")) + + +@flow(name = 'test_credential_version_status_management') +def test_credential_version_status_management(csms_params): + show_secret_stage_response = huaweicloud_dew_block.show_secret_stage(secret_name = csms_params.get(SECRET_NAME), + stage_name = csms_params.get(STAGE_NAME)) + csms_params[STAGE_NAME] = csms_params["update_stage_name"] + csms_params["version_id"] = show_secret_stage_response.stage.version_id + huaweicloud_dew_block.update_secret_stage(secret_name = csms_params.get(SECRET_NAME), + stage_name = csms_params.get(STAGE_NAME), + version_id = csms_params.get("version_id")) + huaweicloud_dew_block.delete_secret_stage(secret_name = csms_params.get(SECRET_NAME), + stage_name = csms_params.get(STAGE_NAME)) + + +@flow(name = 'test_voucher_label_management') +def test_voucher_label_management(csms_params): + huaweicloud_dew_block.batch_create_or_delete_tags(secret_id = csms_params.getSECRET_ID, + key = csms_params.get(KEY), action = csms_params.get("action")) + huaweicloud_dew_block.create_secret_tag(secret_id = csms_params.getSECRET_ID, key = csms_params.get(KEY)) + huaweicloud_dew_block.list_project_secrets_tags() + huaweicloud_dew_block.list_resource_instances(DewBlock.ListResourceInstancesOptions( + action = csms_params.get("action1"), resource_instances = csms_params.get("resource_instances"))) + huaweicloud_dew_block.list_secret_tags(secret_id = csms_params.getSECRET_ID) + huaweicloud_dew_block.delete_secret_tag(secret_id = csms_params.getSECRET_ID, key = csms_params.get(KEY)) + + +@flow(name = 'test_event_management') +def test_event_management(csms_params): + huaweicloud_dew_block.create_secret_event(DewBlock.CreateSecretEventOptions(target_type = csms_params.get("target_type"), + target_id = csms_params.get("target_id"), + target_name = csms_params.get("target_name"), + event_name = csms_params.get(EVENT_NAME), + event_types = csms_params.get("event_types"), + state = csms_params.get("state"))) + huaweicloud_dew_block.show_secret_event(event_name = csms_params.get(EVENT_NAME)) + huaweicloud_dew_block.update_secret_event(event_name = csms_params.get(EVENT_NAME), + target_type = csms_params.get("target_type"), + target_id = csms_params.get("target_id"), + target_name = csms_params.get("target_name")) + huaweicloud_dew_block.delete_secret_event(event_name = csms_params.get(EVENT_NAME)) + huaweicloud_dew_block.list_notification_records() + + +@flow(name = 'test_key_label_management') +def test_key_label_management(kms_params): + huaweicloud_dew_block.list_kms_tags() + huaweicloud_dew_block.batch_create_kms_tags(key_id = kms_params.get(KEY_ID), key = [kms_params.get(KEY)], action = kms_params.get("action")) + huaweicloud_dew_block.create_kms_tag(key_id = kms_params.get(KEY_ID), key = kms_params.get(KEY)) + huaweicloud_dew_block.list_kms_by_tags(DewBlock.ListKmsByTagsOptions(action = "count")) + huaweicloud_dew_block.show_kms_tags(key_id = kms_params.get(KEY_ID)) + huaweicloud_dew_block.delete_tag(key_id = kms_params.get(KEY_ID), key = kms_params.get(KEY)) + +@flow(name = 'test_query_key_api_version_information') +def test_query_key_api_version_information(kms_params): + huaweicloud_dew_block.show_versions() + huaweicloud_dew_block.show_version(version_id = kms_params.get("version_id")) + + +if __name__ == "__main__": + + huaweicloud_dew_block = DewBlock.load("test-dew-block") \ No newline at end of file