From 0018e719a28116694e825ab5a1a27b57f1640dcc Mon Sep 17 00:00:00 2001 From: PankajJoshi Date: Mon, 1 Apr 2024 18:13:45 +0530 Subject: [PATCH 1/3] refactoring to move CVM specific code from DiskUtil to CVMDiskUtil --- VMEncryption/main/CVMDiskUtil.py | 312 +++++++++++++++++++ VMEncryption/main/CryptMountConfigUtil.py | 14 +- VMEncryption/main/DiskUtil.py | 293 +---------------- VMEncryption/main/OnlineEncryptionResumer.py | 5 +- VMEncryption/main/handle.py | 29 +- VMEncryption/main/test/test_disk_util.py | 26 +- 6 files changed, 365 insertions(+), 314 deletions(-) create mode 100644 VMEncryption/main/CVMDiskUtil.py diff --git a/VMEncryption/main/CVMDiskUtil.py b/VMEncryption/main/CVMDiskUtil.py new file mode 100644 index 000000000..f984a65c2 --- /dev/null +++ b/VMEncryption/main/CVMDiskUtil.py @@ -0,0 +1,312 @@ +#!/usr/bin/env python +# +# VMEncryption extension +# +# Copyright 2015 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import os.path +import tempfile +import io + +from CommandExecutor import CommandExecutor, ProcessCommunicator +from Common import CommonVariables, LvmItem, DeviceItem +from io import open + +class CVMDiskUtil(object): + ''' CVM disk util class has cvm specific util function for disk encryption.''' + + def __init__(self, disk_util, logger): + '''initialize to cvm disk util class''' + self.disk_util = disk_util + self.logger = logger + self.command_executor = CommandExecutor(self.logger) + + def _isnumeric(self, chars): + '''check if chars is numeric type''' + try: + int(chars) + return True + except ValueError: + return False + + def _get_skr_exe_path(self): + '''getting cvm secure key release (SKR) app, i.e., AzureAttestSRK binary path''' + abs_file_path=os.path.abspath(__file__) + current_dir = os.path.dirname(abs_file_path) + return os.path.normpath(os.path.join(current_dir,"..")) + + def _secure_key_release_operation(self,protector_base64,kek_url,operation,attestation_url=None): + '''This is private function, used for releasing key and wrap/unwrap operation on protector''' + self.logger.log("secure_key_release_operation {0} started.".format(operation)) + skr_app_dir = self._get_skr_exe_path() + skr_app = os.path.join(skr_app_dir,CommonVariables.secure_key_release_app) + if not os.path.isdir(skr_app_dir): + self.logger.log("secure_key_release_operation app directory {0} is not valid.".format(skr_app_dir)) + return None + if not os.path.isfile(skr_app): + self.logger.log("secure_key_release_operation app {0} is not present.".format(skr_app)) + return None + if attestation_url: + cmd = "{0} -a {1} -k {2} -s {3}".format(skr_app,attestation_url,kek_url,protector_base64) + else: + cmd = "{0} -k {1} -s {2}".format(skr_app,kek_url,protector_base64) + cmd = "{0} {1}".format(cmd,operation) + process_comm = ProcessCommunicator() + #suppressing logging enabled due to password use. + ret = self.command_executor.Execute(cmd,communicator=process_comm,suppress_logging=True) + if ret!=CommonVariables.process_success: + msg = "" + if process_comm.stderr: + msg = process_comm.stderr.strip() + elif process_comm.stdout: + msg = process_comm.stdout.strip() + else: + pass + self.logger.log("secure_key_release_operation {0} unsuccessful.".format(operation)) + self.logger.log(msg=msg) + return None + self.logger.log("secure_key_release_operation {0} end.".format(operation)) + return process_comm.stdout.strip() + + def import_token_data(self, device_path, token_data, token_id): + '''Updating token_data json object to LUKS2 header's Tokens field. + token data is as follow for version 1.0. + "version": "1.0", + "type": "Azure_Disk_Encryption", + "keyslots": [], + "KekVaultResourceId": "", + "KeyEncryptionKeyURL": "", + "KeyVaultResourceId": "", + "KeyVaultURL": "https://.vault.azure.net/", + "AttestationURL": null, + "PassphraseName": "LUKSPasswordProtector", + "Passphrase": "M53XE09n7O9r2AdKa7FYRYe..." + ''' + self.logger.log(msg="import_token_data for device: {0} started.".format(device_path)) + if not token_data or not isinstance(token_data, dict): + self.logger.log(level=CommonVariables.WarningLevel, msg="import_token_data: token_data: {0} for device: {1} is not valid.".format(token_data,device_path)) + return False + if not token_id: + self.logger.log(level= CommonVariables.WarningLevel, msg = "import_token_data: token_id: {0} for device: {1} is not valid.".format(token_id,device_path) ) + return False + temp_file = tempfile.NamedTemporaryFile(delete=False,mode='w+') + json.dump(token_data,temp_file,indent=4) + temp_file.close() + cmd = "cryptsetup token import --json-file {0} --token-id {1} {2}".format(temp_file.name,token_id,device_path) + process_comm = ProcessCommunicator() + status = self.command_executor.Execute(cmd,communicator=process_comm) + self.logger.log(msg="import_token_data: device: {0} status: {1}".format(device_path,status)) + os.unlink(temp_file.name) + return status==CommonVariables.process_success + + def import_token(self, device_path, passphrase_file, public_settings, + passphrase_name_value=CommonVariables.PassphraseNameValueProtected): + '''This function reads passphrase from passphrase_file, do SKR and wrap passphrase with securely + released key. Then it updates metadata (required encryption settings for SKR + wrapped passphrase) + to primary token id: 5 type: Azure_Disk_Encryption in Tokens field of LUKS2 header.''' + self.logger.log(msg="import_token for device: {0} started.".format(device_path)) + self.logger.log(msg="import_token for passphrase file path: {0}.".format(passphrase_file)) + if not passphrase_file or not os.path.exists(passphrase_file): + self.logger.log(level=CommonVariables.WarningLevel,msg="import_token for passphrase file path: {0} not exists.".format(passphrase_file)) + return False + protector= "" + with open(passphrase_file,"rb") as protector_file: + #passphrase stored in keyfile is base64 + protector = protector_file.read().decode('utf-8') + kek_vault_resource_id=public_settings.get(CommonVariables.KekVaultResourceIdKey) + key_encryption_key_url=public_settings.get(CommonVariables.KeyEncryptionKeyURLKey) + attestation_url = public_settings.get(CommonVariables.AttestationURLKey) + if passphrase_name_value == CommonVariables.PassphraseNameValueProtected: + protector = self._secure_key_release_operation(protector_base64=protector, + kek_url=key_encryption_key_url, + operation=CommonVariables.secure_key_release_wrap, + attestation_url=attestation_url) + else: + self.logger.log(msg="import_token passphrase is not wrapped, value of passphrase name key: {0}".format(passphrase_name_value)) + + if not protector: + self.logger.log("import_token protector wrapping is unsuccessful for device {0}".format(device_path)) + return False + data={ + "version":CommonVariables.ADEEncryptionVersionInLuksToken_1_0, + "type":"Azure_Disk_Encryption", + "keyslots":[], + CommonVariables.KekVaultResourceIdKey:kek_vault_resource_id, + CommonVariables.KeyEncryptionKeyURLKey:key_encryption_key_url, + CommonVariables.KeyVaultResourceIdKey:public_settings.get(CommonVariables.KeyVaultResourceIdKey), + CommonVariables.KeyVaultURLKey:public_settings.get(CommonVariables.KeyVaultURLKey), + CommonVariables.AttestationURLKey:attestation_url, + CommonVariables.PassphraseNameKey:passphrase_name_value, + CommonVariables.PassphraseKey:protector + } + status = self.import_token_data(device_path=device_path, + token_data=data, + token_id=CommonVariables.cvm_ade_vm_encryption_token_id) + self.logger.log(msg="import_token: device: {0} end.".format(device_path)) + return status + + def export_token(self,device_name): + '''This function reads wrapped passphrase from LUKS2 Tokens for + token id:5, which belongs to primary token type: Azure_Disk_Encryption + and do SKR and returns unwrapped passphrase''' + self.logger.log("export_token: for device_name: {0} started.".format(device_name)) + device_path = self.disk_util.get_device_path(device_name) + if not device_path: + self.logger.log(level= CommonVariables.WarningLevel, msg="export_token Input is not valid. device name: {0}".format(device_name)) + return None + protector = None + cvm_ade_vm_encryption_token_id = self.get_token_id(header_or_dev_path=device_path,token_name=CommonVariables.AzureDiskEncryptionToken) + if not cvm_ade_vm_encryption_token_id: + self.logger.log("export_token token id {0} not found in device {1} LUKS header".format(cvm_ade_vm_encryption_token_id,device_name)) + return None + disk_encryption_setting=self.read_token(device_name=device_name,token_id=cvm_ade_vm_encryption_token_id) + if disk_encryption_setting['version'] != CommonVariables.ADEEncryptionVersionInLuksToken_1_0: + self.logger.log("export_token token version {0} is not a vaild version.".format(disk_encryption_setting['version'])) + return None + key_encryption_key_url=disk_encryption_setting[CommonVariables.KeyEncryptionKeyURLKey] + wrapped_protector = disk_encryption_setting[CommonVariables.PassphraseKey] + attestation_url = disk_encryption_setting[CommonVariables.AttestationURLKey] + if disk_encryption_setting[CommonVariables.PassphraseNameKey] != CommonVariables.PassphraseNameValueProtected: + self.logger.log(level=CommonVariables.WarningLevel, msg="passphrase is not Protected. No need to do SKR.") + return wrapped_protector if wrapped_protector else None + if wrapped_protector: + #unwrap the protector. + protector=self._secure_key_release_operation(attestation_url=attestation_url, + kek_url=key_encryption_key_url, + protector_base64=wrapped_protector, + operation=CommonVariables.secure_key_release_unwrap) + self.logger.log("export_token to device {0} end.".format(device_name)) + return protector + + def remove_token(self, device_name, token_id): + '''this function remove the token''' + device_path = self.disk_util.get_device_path(dev_name=device_name) + if not device_path or not token_id: + self.logger.log(level=CommonVariables.WarningLevel, + msg="remove_token: Inputs are not valid. device name: {0}, token_id: {1}".format(device_name,token_id)) + return False + cmd = "cryptsetup token remove --token-id {0} {1}".format(token_id,device_path) + process_comm = ProcessCommunicator() + status = self.command_executor.Execute(cmd, communicator=process_comm) + if status != 0: + self.logger.log(level=CommonVariables.WarningLevel, + msg="remove_token: token id: {0} is not found for device_name: {1} in LUKS header".format(token_id,device_name)) + return False + return True + + def read_token(self, device_name, token_id): + '''this functions reads tokens from LUKS2 header.''' + device_path = self.disk_util.get_device_path(dev_name=device_name) + if not device_path or not token_id: + self.logger.log(level=CommonVariables.WarningLevel, + msg="read_token: Inputs are not valid. device_name: {0}, token id: {1}".format(device_name,token_id)) + return None + cmd = "cryptsetup token export --token-id {0} {1}".format(token_id,device_path) + process_comm = ProcessCommunicator() + status = self.command_executor.Execute(cmd, communicator=process_comm) + if status != 0: + self.logger.log(level=CommonVariables.WarningLevel, + msg="read_token: token id: {0} is not found for device_name: {1} in LUKS header".format(token_id,device_name)) + return None + token = process_comm.stdout + return json.loads(token) + + def get_token_id(self, header_or_dev_path, token_name): + '''if LUKS2 header has token name return the id else return none.''' + if not header_or_dev_path or not os.path.exists(header_or_dev_path) or not token_name: + self.logger.log("get_token_id: invalid input, header_or_dev_path:{0} token_name:{1}".format(header_or_dev_path,token_name)) + return None + luks_dump_out = self.disk_util.luks_get_header_dump(header_or_dev_path) + tokens = self.extract_luks2_token(luks_dump_out) + for token in tokens: + if len(token) == 2 and token[1] == token_name: + return token[0] + return None + + def restore_luks2_token(self, device_name=None): + '''this function restores token + type:Azure_Disk_Encryption_BackUp, id:6 to type:Azure_Disk_Encryption id:5, + this function acts on 4 scenarios. + 1. both token id: 5 and 6 present in LUKS2 Tokens field, due to reboot/interrupt during + KEK rotation, such case remove token id 5 has latest data so remove token id 6. + 2. token id 5 present but 6 is not present in LUKS2 Tokens field. do nothing. + 3. token id 5 not present but 6 present in LUKS2 Tokens field, restore token id 5 using + token id 6, then remove token id 6. + 4. no token ids 5 or 6 present in LUKS2 Tokens field, do nothing.''' + device_path = self.disk_util.get_device_path(device_name) + if not device_path: + self.logger.log(level=CommonVariables.WarningLevel,msg="restore_luks2_token invalid input. device_name = {0}".format(device_name)) + return + ade_token_id_primary = self.get_token_id(header_or_dev_path=device_path,token_name=CommonVariables.AzureDiskEncryptionToken) + ade_token_id_backup = self.get_token_id(header_or_dev_path=device_path,token_name=CommonVariables.AzureDiskEncryptionBackUpToken) + if not ade_token_id_backup: + #do nothing + return + if ade_token_id_primary: + #remove backup token id + self.remove_token(device_name=device_name,token_id=ade_token_id_backup) + return + #ade_token_id_backup having value but ade_token_id_primary is none + self.logger.log("restore luks2 token for device {0} is started.".format(device_name)) + #read from backup and update AzureDiskEncryptionToken + data = self.read_token(device_name=device_name,token_id=ade_token_id_backup) + data['type']=CommonVariables.AzureDiskEncryptionToken + self.import_token_data(device_path=device_path,token_data=data,token_id=CommonVariables.cvm_ade_vm_encryption_token_id) + #remove backup + self.remove_token(device_name=device_name,token_id=ade_token_id_backup) + self.logger.log("restore luks2 token id {0} to {1} for device {2} is successful.".format(ade_token_id_backup,CommonVariables.cvm_ade_vm_encryption_token_id,device_name)) + + def extract_luks2_token(self, luks_dump_out): + """ + A luks v2 luksheader looks kind of like this: (inessential stuff removed) + + LUKS header information + Version: 2 + Data segments: + 0: crypt + offset: 0 [bytes] + length: 5539430400 [bytes] + cipher: aes-xts-plain64 + sector: 512 [bytes] + Keyslots: + 1: luks2 + Key: 512 bits + 3: reencrypt (unbound) + Key: 8 bits + Tokens: + 6: Azure_Disk_Encryption_BackUp + 5: Azure_Disk_Encryption + ... + """ + if not luks_dump_out: + return [] + lines = luks_dump_out.split("\n") + token_segment = False + token_lines = [] + for line in lines: + parts = line.split(":") + if len(parts)<2: + continue + if token_segment and parts[1].strip() == '': + break + if "tokens" in parts[0].strip().lower(): + token_segment = True + continue + if token_segment and self._isnumeric(parts[0].strip()): + token_lines.append([int(parts[0].strip()),parts[1].strip()]) + continue + return token_lines \ No newline at end of file diff --git a/VMEncryption/main/CryptMountConfigUtil.py b/VMEncryption/main/CryptMountConfigUtil.py index 4d3de3440..4e2dc750b 100644 --- a/VMEncryption/main/CryptMountConfigUtil.py +++ b/VMEncryption/main/CryptMountConfigUtil.py @@ -25,10 +25,11 @@ import io from datetime import datetime import sys +import threading from CommandExecutor import CommandExecutor, ProcessCommunicator from Common import CryptItem, CommonVariables - +from CVMDiskUtil import CVMDiskUtil class CryptMountConfigUtil(object): """ @@ -221,16 +222,17 @@ def _restore_backup_crypttab_info(self,crypt_item,passphrase_file): def _device_unlock_using_luks2_header(self,device_name,device_item_path,azure_device_path,lock): device_item_real_path = os.path.realpath(device_item_path) + cmv_disk_util = CVMDiskUtil(disk_util=self.disk_util, logger=self.logger) if self.disk_util.is_device_locked(device_item_real_path,None): self.logger.log("Found an encrypted device {0} in locked state.".format(device_name)) #read protector from device luks header. - protector = self.disk_util.export_token(device_name=device_name) + protector = cmv_disk_util.export_token(device_name=device_name) if not protector: self.logger.log("device_unlock_using_luks2_header {0} device does not have token field containing wrapped protector.".format(device_name)) return crypt_item = CryptItem() crypt_item.dev_path = azure_device_path - #using UUID of crypted device as mappers name. + #using UUID of encrypted device as mappers name. crypt_item.mapper_name = self.disk_util.get_device_items_property(dev_name=device_name, property_name='UUID') crypt_item.uses_cleartext_key = False @@ -257,17 +259,17 @@ def _device_unlock_using_luks2_header(self,device_name,device_item_path,azure_de lock.release() def device_unlock_using_luks2_header(self): - """Reads vm setting properties of blcock devices that have wrapped passphrase in tokens in LUKS header.""" + """Reads vm setting properties of block devices that have wrapped passphrase in tokens in LUKS header.""" self.logger.log("device_unlock_using_luks2_header Start") device_items = self.disk_util.get_device_items(None) azure_name_table = self.disk_util.get_block_device_to_azure_udev_table() - import threading + cmv_disk_util = CVMDiskUtil(disk_util=self.disk_util, logger=self.logger) threads = [] lock = threading.Lock() for device_item in device_items: if device_item.file_system == "crypto_LUKS": #restore LUKS2 token using BackUp. - self.disk_util.restore_luks2_token(device_name=device_item.name) + cmv_disk_util.restore_luks2_token(device_name=device_item.name) device_item_path = self.disk_util.get_device_path(device_item.name) azure_item_path = azure_name_table[device_item_path] if device_item_path in azure_name_table else device_item_path thread = threading.Thread(target=self._device_unlock_using_luks2_header,args=(device_item.name,device_item_path,azure_item_path,lock)) diff --git a/VMEncryption/main/DiskUtil.py b/VMEncryption/main/DiskUtil.py index f15ff1eff..961342700 100644 --- a/VMEncryption/main/DiskUtil.py +++ b/VMEncryption/main/DiskUtil.py @@ -24,7 +24,7 @@ from subprocess import Popen import traceback import glob -import tempfile +import io from EncryptionConfig import EncryptionConfig from DecryptionMarkConfig import DecryptionMarkConfig @@ -54,13 +54,7 @@ def __init__(self, hutil, patching, logger, encryption_environment): self.command_executor = CommandExecutor(self.logger) self._LUN_PREFIX = "lun" self._SCSI_PREFIX = "scsi" - - def _get_SKR_exe_path(self): - '''getting cvm_secure_key_release_app path AzureSttestSRK''' - absFilePath=os.path.abspath(__file__) - currentDir = os.path.dirname(absFilePath) - return os.path.normpath(os.path.join(currentDir,"..")) - + def get_osmapper_path(self): return os.path.join(CommonVariables.dev_mapper_root, CommonVariables.osmapper_name) @@ -109,7 +103,7 @@ def is_luks_device(self, device_path, device_header_path): path_var = device_header_path if device_header_path else device_path cmd = 'cryptsetup isLuks ' + path_var return (int)(self.command_executor.Execute(cmd, suppress_logging=True)) == CommonVariables.process_success - + def is_device_locked(self, device_path, device_header_path): '''Checks if device is locked or unlocked''' if not self.is_luks_device(device_path=device_path, device_header_path=device_header_path): @@ -119,15 +113,14 @@ def is_device_locked(self, device_path, device_header_path): if self.command_executor.ExecuteInBash(cmd,suppress_logging=True) == CommonVariables.process_success: self.logger.log("is_device_locked device path {0} is opened.".format(device_path)) return False - #test command is failed for multiple mappers for same crypted device. + #test command is failed for multiple mappers for same crypted device. #if any of mapper status is valid. - #luksClose to non valid mapper status + #luksClose to non valid mapper status cmd="ls -C /dev/disk/by-id/dm-uuid-*$(cryptsetup luksUUID {0} | tr -d -)*".format(path_var) comm = ProcessCommunicator() locked = True if self.command_executor.ExecuteInBash(cmd,communicator=comm,suppress_logging=True) == CommonVariables.process_success: - import io - buf = io.StringIO(comm.stdout) + buf = io.StringIO(comm.stdout) for line in buf: file = os.path.basename(line.strip()) sp = "-".join(file.split('-')[:-5]) @@ -146,7 +139,7 @@ def is_device_locked(self, device_path, device_header_path): self.command_executor.Execute(cmd) self.logger.log("is_device_locked device path {0} is locked.".format(device_path)) return locked - + def create_luks_header(self, mapper_name): luks_header_file_path = self.encryption_environment.luks_header_base_path + mapper_name if not os.path.exists(luks_header_file_path): @@ -161,182 +154,6 @@ def create_cleartext_key(self, mapper_name): self.command_executor.ExecuteInBash(dd_command, raise_exception_on_failure=True) return cleartext_key_file_path - def secure_key_release_operation(self,protectorbase64,kekUrl,operation,attestationUrl=None): - '''This function release key and does wrap/unwrap operation on protector''' - self.logger.log("secure_key_release_operation {0} started.".format(operation)) - skr_app_dir = self._get_SKR_exe_path() - skr_app = os.path.join(skr_app_dir,CommonVariables.secure_key_release_app) - if not os.path.isdir(skr_app_dir): - self.logger.log("secure_key_release_operation app directory {0} is not valid.".format(skr_app_dir)) - return None - if not os.path.isfile(skr_app): - self.logger.log("secure_key_release_operation app {0} is not present.".format(skr_app)) - return None - if attestationUrl: - cmd = "{0} -a {1} -k {2} -s {3}".format(skr_app,attestationUrl,kekUrl,protectorbase64) - else: - cmd = "{0} -k {1} -s {2}".format(skr_app,kekUrl,protectorbase64) - cmd = "{0} {1}".format(cmd,operation) - process_comm = ProcessCommunicator() - #needed to subpress logic for this execute command. run this command silently due to password. - ret = self.command_executor.Execute(cmd,communicator=process_comm,suppress_logging=True) - if ret!=CommonVariables.process_success: - msg = "" - if process_comm.stderr: - msg = process_comm.stderr.strip() - elif process_comm.stdout: - msg = process_comm.stdout.strip() - else: - pass - self.logger.log("secure_key_release_operation {0} unsuccessful.".format(operation)) - self.logger.log(msg=msg) - return None - self.logger.log("secure_key_release_operation {0} end.".format(operation)) - return process_comm.stdout.strip() - - def import_token_data(self, device_path, token_data, token_id): - '''Updating token_data json object to LUKS2 header's Tokens field. - token data is as follow for version 1.0. - "version": "1.0", - "type": "Azure_Disk_Encryption", - "keyslots": [], - "KekVaultResourceId": "", - "KeyEncryptionKeyURL": "", - "KeyVaultResourceId": "", - "KeyVaultURL": "https://.vault.azure.net/", - "AttestationURL": null, - "PassphraseName": "LUKSPasswordProtector", - "Passphrase": "M53XE09n7O9r2AdKa7FYRYe..." - ''' - self.logger.log(msg="import_token_data for device: {0} started.".format(device_path)) - if not token_data or not isinstance(token_data, dict): - self.logger.log(level=CommonVariables.WarningLevel, msg="import_token_data: token_data: {0} for device: {1} is not valid.".format(token_data,device_path)) - return False - if not token_id: - self.logger.log(level= CommonVariables.WarningLevel, msg = "import_token_data: token_id: {0} for device: {1} is not valid.".format(token_id,device_path) ) - return False - temp_file = tempfile.NamedTemporaryFile(delete=False,mode='w+') - json.dump(token_data,temp_file,indent=4) - temp_file.close() - cmd = "cryptsetup token import --json-file {0} --token-id {1} {2}".format(temp_file.name,token_id,device_path) - process_comm = ProcessCommunicator() - status = self.command_executor.Execute(cmd,communicator=process_comm) - self.logger.log(msg="import_token_data: device: {0} status: {1}".format(device_path,status)) - os.unlink(temp_file.name) - return status==CommonVariables.process_success - - def import_token(self, device_path, passphrase_file, public_settings, - passphrase_name_value=CommonVariables.PassphraseNameValueProtected): - '''This function reads passphrase from passphrase_file, do SKR and wrap passphrase with securely - released key. Then it updates metadata (required encryption settings for SKR + wrapped passphrase) - to primary token id: 5 type: Azure_Disk_Encryption in Tokens field of LUKS2 header.''' - self.logger.log(msg="import_token for device: {0} started.".format(device_path)) - self.logger.log(msg="import_token for passphrase file path: {0}.".format(passphrase_file)) - if not passphrase_file or not os.path.exists(passphrase_file): - self.logger.log(level=CommonVariables.WarningLevel,msg="import_token for passphrase file path: {0} not exists.".format(passphrase_file)) - return False - protector= "" - with open(passphrase_file,"rb") as protector_file: - #passphrase stored in keyfile is base64 - protector = protector_file.read().decode('utf-8') - kek_vault_resource_id=public_settings.get(CommonVariables.KekVaultResourceIdKey) - key_encryption_key_url=public_settings.get(CommonVariables.KeyEncryptionKeyURLKey) - attestation_url = public_settings.get(CommonVariables.AttestationURLKey) - if passphrase_name_value == CommonVariables.PassphraseNameValueProtected: - protector = self.secure_key_release_operation(protectorbase64=protector, - kekUrl=key_encryption_key_url, - operation=CommonVariables.secure_key_release_wrap, - attestationUrl=attestation_url) - else: - self.logger.log(msg="import_token passphrase is not wrapped, value of passphrase name key: {0}".format(passphrase_name_value)) - - if not protector: - self.logger.log("import_token protector wrapping is unsuccessful for device {0}".format(device_path)) - return False - data={ - "version":CommonVariables.ADEEncryptionVersionInLuksToken_1_0, - "type":"Azure_Disk_Encryption", - "keyslots":[], - CommonVariables.KekVaultResourceIdKey:kek_vault_resource_id, - CommonVariables.KeyEncryptionKeyURLKey:key_encryption_key_url, - CommonVariables.KeyVaultResourceIdKey:public_settings.get(CommonVariables.KeyVaultResourceIdKey), - CommonVariables.KeyVaultURLKey:public_settings.get(CommonVariables.KeyVaultURLKey), - CommonVariables.AttestationURLKey:attestation_url, - CommonVariables.PassphraseNameKey:passphrase_name_value, - CommonVariables.PassphraseKey:protector - } - status = self.import_token_data(device_path=device_path, - token_data=data, - token_id=CommonVariables.cvm_ade_vm_encryption_token_id) - self.logger.log(msg="import_token: device: {0} end.".format(device_path)) - return status - - def read_token(self, device_name, token_id): - '''this functions reads tokens from LUKS2 header.''' - device_path = self.get_device_path(dev_name=device_name) - if not device_path or not token_id: - self.logger.log(level=CommonVariables.WarningLevel, - msg="read_token: Inputs are not valid. device_name: {0}, token id: {1}".format(device_name,token_id)) - return None - cmd = "cryptsetup token export --token-id {0} {1}".format(token_id,device_path) - process_comm = ProcessCommunicator() - status = self.command_executor.Execute(cmd, communicator=process_comm) - if status != 0: - self.logger.log(level=CommonVariables.WarningLevel, - msg="read_token: token id: {0} is not found for device_name: {1} in LUKS header".format(token_id,device_name)) - return None - token = process_comm.stdout - return json.loads(token) - - def remove_token(self, device_name, token_id): - '''this function remove the token''' - device_path = self.get_device_path(dev_name=device_name) - if not device_path or not token_id: - self.logger.log(level=CommonVariables.WarningLevel, - msg="remove_token: Inputs are not valid. device name: {0}, token_id: {1}".format(device_name,token_id)) - return False - cmd = "cryptsetup token remove --token-id {0} {1}".format(token_id,device_path) - process_comm = ProcessCommunicator() - status = self.command_executor.Execute(cmd, communicator=process_comm) - if status != 0: - self.logger.log(level=CommonVariables.WarningLevel, - msg="remove_token: token id: {0} is not found for device_name: {1} in LUKS header".format(token_id,device_name)) - return False - return True - - def export_token(self,device_name): - '''This function reads wrapped passphrase from LUKS2 Tokens for - token id:5, which belongs to primary token type: Azure_Disk_Encryption - and do SKR and returns unwrapped passphrase''' - self.logger.log("export_token: for device_name: {0} started.".format(device_name)) - device_path = self.get_device_path(device_name) - if not device_path: - self.logger.log(level= CommonVariables.WarningLevel, msg="export_token Input is not valid. device name: {0}".format(device_name)) - return None - protector = None - cvm_ade_vm_encryption_token_id = self.get_token_id(header_or_dev_path=device_path,token_name=CommonVariables.AzureDiskEncryptionToken) - if not cvm_ade_vm_encryption_token_id: - self.logger.log("export_token token id {0} not found in device {1} LUKS header".format(cvm_ade_vm_encryption_token_id,device_name)) - return None - disk_encryption_setting=self.read_token(device_name=device_name,token_id=cvm_ade_vm_encryption_token_id) - if disk_encryption_setting['version'] != CommonVariables.ADEEncryptionVersionInLuksToken_1_0: - self.logger.log("export_token token version {0} is not a vaild version.".format(disk_encryption_setting['version'])) - return None - key_encryption_key_url=disk_encryption_setting[CommonVariables.KeyEncryptionKeyURLKey] - wrapped_protector = disk_encryption_setting[CommonVariables.PassphraseKey] - attestation_url = disk_encryption_setting[CommonVariables.AttestationURLKey] - if disk_encryption_setting[CommonVariables.PassphraseNameKey] != CommonVariables.PassphraseNameValueProtected: - self.logger.log(level=CommonVariables.WarningLevel, msg="passphrase is not Protected. No need to do SKR.") - return wrapped_protector if wrapped_protector else None - if wrapped_protector: - #unwrap the protector. - protector=self.secure_key_release_operation(attestationUrl=attestation_url, - kekUrl=key_encryption_key_url, - protectorbase64=wrapped_protector, - operation=CommonVariables.secure_key_release_unwrap) - self.logger.log("export_token to device {0} end.".format(device_name)) - return protector - def encrypt_disk(self, dev_path, passphrase_file, mapper_name, header_file): return_code = self.luks_format(passphrase_file=passphrase_file, dev_path=dev_path, header_file=header_file) if return_code != CommonVariables.process_success: @@ -452,7 +269,7 @@ def luks_add_cleartext_key(self, passphrase_file, dev_path, mapper_name, header_ return self.luks_add_key(passphrase_file, dev_path, mapper_name, header_file, cleartext_key_file_path) - def _luks_get_header_dump(self, header_or_dev_path): + def luks_get_header_dump(self, header_or_dev_path): cryptsetup_cmd = "{0} luksDump {1}".format(self.distro_patcher.cryptsetup_path, header_or_dev_path) proc_comm = ProcessCommunicator() @@ -461,7 +278,7 @@ def _luks_get_header_dump(self, header_or_dev_path): return proc_comm.stdout def luks_get_uuid(self, header_or_dev_path): - luks_dump_out = self._luks_get_header_dump(header_or_dev_path) + luks_dump_out = self.luks_get_header_dump(header_or_dev_path) lines = filter(lambda l: "uuid" in l.lower(), luks_dump_out.split("\n")) @@ -471,51 +288,6 @@ def luks_get_uuid(self, header_or_dev_path): return splits[1] return None - def get_token_id(self, header_or_dev_path, token_name): - '''if LUKS2 header has token name return the id else return none.''' - if not header_or_dev_path or not os.path.exists(header_or_dev_path) or not token_name: - self.logger.log("get_token_id: invalid input, header_or_dev_path:{0} token_name:{1}".format(header_or_dev_path,token_name)) - return None - luks_dump_out = self._luks_get_header_dump(header_or_dev_path) - tokens = self._extract_luksv2_token(luks_dump_out) - for token in tokens: - if len(token) == 2 and token[1] == token_name: - return token[0] - return None - - def restore_luks2_token(self, device_name=None): - '''this function restores token - type:Azure_Disk_Encryption_BackUp, id:6 to type:Azure_Disk_Encryption id:5, - this function acts on 4 scenarios. - 1. both token id: 5 and 6 present in LUKS2 Tokens field, due to reboot/interrupt during - KEK rotation, such case remove token id 5 has latest data so remove token id 6. - 2. token id 5 present but 6 is not present in LUKS2 Tokens field. do nothing. - 3. token id 5 not present but 6 present in LUKS2 Tokens field, restore token id 5 using - token id 6, then remove token id 6. - 4. no token ids 5 or 6 present in LUKS2 Tokens field, do nothing.''' - device_path = self.get_device_path(device_name) - if not device_path: - self.logger.log(level=CommonVariables.WarningLevel,msg="restore_luks2_token invalid input. device_name = {0}".format(device_name)) - return - ade_token_id_primary = self.get_token_id(header_or_dev_path=device_path,token_name=CommonVariables.AzureDiskEncryptionToken) - ade_token_id_backup = self.get_token_id(header_or_dev_path=device_path,token_name=CommonVariables.AzureDiskEncryptionBackUpToken) - if not ade_token_id_backup: - #do nothing - return - if ade_token_id_primary: - #remove backup token id - self.remove_token(device_name=device_name,token_id=ade_token_id_backup) - return - #ade_token_id_backup having value but ade_token_id_primary is none - self.logger.log("restore luks2 token for device {0} is started.".format(device_name)) - #read from backup and update AzureDiskEncryptionToken - data = self.read_token(device_name=device_name,token_id=ade_token_id_backup) - data['type']=CommonVariables.AzureDiskEncryptionToken - self.import_token_data(device_path=device_path,token_data=data,token_id=CommonVariables.cvm_ade_vm_encryption_token_id) - #remove backup - self.remove_token(device_name=device_name,token_id=ade_token_id_backup) - self.logger.log("restore luks2 token id {0} to {1} for device {2} is successful.".format(ade_token_id_backup,CommonVariables.cvm_ade_vm_encryption_token_id,device_name)) - def _get_cryptsetup_version(self): # get version of currently installed cryptsetup cryptsetup_cmd = "{0} --version".format(self.distro_patcher.cryptsetup_path) @@ -529,47 +301,6 @@ def _extract_luks_version_from_dump(self, luks_dump_out): if "version:" in line.lower(): return line.split()[-1] - def _extract_luksv2_token(self, luks_dump_out): - """ - A luks v2 luksheader looks kind of like this: (inessential stuff removed) - - LUKS header information - Version: 2 - Data segments: - 0: crypt - offset: 0 [bytes] - length: 5539430400 [bytes] - cipher: aes-xts-plain64 - sector: 512 [bytes] - Keyslots: - 1: luks2 - Key: 512 bits - 3: reencrypt (unbound) - Key: 8 bits - Tokens: - 6: Azure_Disk_Encryption_BackUp - 5: Azure_Disk_Encryption - ... - """ - if not luks_dump_out: - return [] - lines = luks_dump_out.split("\n") - token_segment = False - token_lines = [] - for line in lines: - parts = line.split(":") - if len(parts)<2: - continue - if token_segment and parts[1].strip() == '': - break - if "tokens" in parts[0].strip().lower(): - token_segment = True - continue - if token_segment and self._isnumeric(parts[0].strip()): - token_lines.append([int(parts[0].strip()),parts[1].strip()]) - continue - return token_lines - def _extract_luksv2_keyslot_lines(self, luks_dump_out): """ A luks v2 luksheader looks kind of like this: (inessential stuff removed) @@ -627,7 +358,7 @@ def _isnumeric(self, chars): return False def luks_dump_keyslots(self, dev_path, header_file): - luks_dump_out = self._luks_get_header_dump(header_file or dev_path) + luks_dump_out = self.luks_get_header_dump(header_file or dev_path) luks_version = self._extract_luks_version_from_dump(luks_dump_out) @@ -652,7 +383,7 @@ def luks_check_reencryption(self, dev_path, header_file): else: device_header = header_file - luks_dump_out = self._luks_get_header_dump(device_header) + luks_dump_out = self.luks_get_header_dump(device_header) luks_version = self._extract_luks_version_from_dump(luks_dump_out) @@ -1551,7 +1282,7 @@ def get_luks_header_size(self, device_path=None): # https://gitlab.com/cryptsetup/cryptsetup/-/wikis/FrequentlyAskedQuestions#2-setup # Dump the in place LUKS header and version - luksDump = self._luks_get_header_dump(device_path) + luksDump = self.luks_get_header_dump(device_path) luksVer = self._extract_luks_version_from_dump(luksDump) if luksVer and int(luksVer) == 1: diff --git a/VMEncryption/main/OnlineEncryptionResumer.py b/VMEncryption/main/OnlineEncryptionResumer.py index 2a77e95f8..2cb899d50 100644 --- a/VMEncryption/main/OnlineEncryptionResumer.py +++ b/VMEncryption/main/OnlineEncryptionResumer.py @@ -23,7 +23,7 @@ from threading import Lock from Common import CommonVariables - +from CVMDiskUtil import CVMDiskUtil class OnlineEncryptionResumer: def __init__(self, crypt_item, disk_util, bek_file_path, logger, hutil): @@ -89,7 +89,8 @@ def begin_resume(self, log_status=True, lock=None, import_token = False, public_ message = "Background encryption finished for {0}".format(self.crypt_item.dev_path) if import_token and public_setting: self.update_log("Background token update to device {0}".format(self.crypt_item.dev_path),lock) - self.disk_util.import_token(device_path=self.crypt_item.dev_path, + cvm_disk_util = CVMDiskUtil(disk_util=self.disk_util, logger=self.logger) + cvm_disk_util.import_token(device_path=self.crypt_item.dev_path, passphrase_file=self.bek_file_path, public_settings=public_setting) if log_status: diff --git a/VMEncryption/main/handle.py b/VMEncryption/main/handle.py index a2bd07fed..542030d1a 100644 --- a/VMEncryption/main/handle.py +++ b/VMEncryption/main/handle.py @@ -36,6 +36,7 @@ from Common import CommonVariables, CryptItem from ExtensionParameter import ExtensionParameter from DiskUtil import DiskUtil +from CVMDiskUtil import CVMDiskUtil from CryptMountConfigUtil import CryptMountConfigUtil from ResourceDiskUtil import ResourceDiskUtil from BackupLogger import BackupLogger @@ -342,8 +343,7 @@ def remove_passphrase_luks2_key_slot(disk_util,\ return ret def update_encryption_settings_luks2_header(extra_items_to_encrypt=None): - '''This function is used for CMK passphrase wrapping with new KEK URL and update - metadata in LUKS2 header.''' + '''This function is used for updating metadata information in LUKS2 header.''' if extra_items_to_encrypt is None: extra_items_to_encrypt=[] hutil.do_parse_context('UpdateEncryptionSettingsLuks2Header') @@ -364,7 +364,8 @@ def update_encryption_settings_luks2_header(extra_items_to_encrypt=None): encryption_config = EncryptionConfig(encryption_environment, logger) extension_parameter = ExtensionParameter(hutil, logger, DistroPatcher, encryption_environment, get_protected_settings(), public_setting) disk_util = DiskUtil(hutil=hutil, patching=DistroPatcher, logger=logger, encryption_environment=encryption_environment) - bek_util = BekUtil(disk_util, logger,encryption_environment) + bek_util = BekUtil(disk_util, logger, encryption_environment) + cvm_disk_util = CVMDiskUtil(disk_util=disk_util, logger=logger) device_items = disk_util.get_device_items(None) if extension_parameter.passphrase is None or extension_parameter.passphrase == "": extension_parameter.passphrase = bek_util.generate_passphrase() @@ -376,26 +377,26 @@ def update_encryption_settings_luks2_header(extra_items_to_encrypt=None): continue #restoring the token data to type Azure_Disk_Encryption #It is necessary to restore if we are resuming from previous attempt, otherwise its no-op. - disk_util.restore_luks2_token(device_name=device_item.name) + cvm_disk_util.restore_luks2_token(device_name=device_item.name) logger.log("Reading passphrase from LUKS2 header, device name: {0}".format(device_item.name)) #copy primary token to backup token for recovery, if reboot or interrupt happened during KEK rotation. - ade_primary_token_id = disk_util.get_token_id(header_or_dev_path=device_item_path,token_name=CommonVariables.AzureDiskEncryptionToken) + ade_primary_token_id = cvm_disk_util.get_token_id(header_or_dev_path=device_item_path,token_name=CommonVariables.AzureDiskEncryptionToken) if not ade_primary_token_id: logger.log("primary token type: Azure_Disk_Encryption not found for device {0}".format(device_item.name)) continue - data = disk_util.read_token(device_name=device_item.name,token_id=ade_primary_token_id) + data = cvm_disk_util.read_token(device_name=device_item.name,token_id=ade_primary_token_id) #writing primary token data to backup token, update token type to back up. data['type']=CommonVariables.AzureDiskEncryptionBackUpToken #update backup token data to backup token id:6. - disk_util.import_token_data(device_path=device_item_path,token_data=data,token_id=CommonVariables.cvm_ade_vm_encryption_backup_token_id) + cvm_disk_util.import_token_data(device_path=device_item_path,token_data=data,token_id=CommonVariables.cvm_ade_vm_encryption_backup_token_id) #get the unwrapped passphrase from LUKS2 header. - passphrase=disk_util.export_token(device_name=device_item.name) + passphrase=cvm_disk_util.export_token(device_name=device_item.name) if not passphrase: logger.log(level=CommonVariables.WarningLevel, msg="No passphrase found in LUKS2 header, device name: {0}".format(device_item.name)) continue #remove primary token from Tokens field of LUKS2 header. - disk_util.remove_token(device_name=device_item.name,token_id=ade_primary_token_id) + cvm_disk_util.remove_token(device_name=device_item.name,token_id=ade_primary_token_id) logger.log("Updating wrapped passphrase to LUKS2 header with current public setting. device name {0}".format(device_item.name)) #add new slot with new passphrase. is_added = add_new_passphrase_luks2_key_slot(disk_util=disk_util, @@ -410,7 +411,7 @@ def update_encryption_settings_luks2_header(extra_items_to_encrypt=None): #protect passphrase before updating to LUKS2 is done in import_token new_passphrase_file = create_temp_file(extension_parameter.passphrase) #save passphrase to LUKS2 header with PassphraseNameValueProtected - ret = disk_util.import_token(device_path=device_item_path, + ret = cvm_disk_util.import_token(device_path=device_item_path, passphrase_file=new_passphrase_file, public_settings=public_setting, passphrase_name_value=CommonVariables.PassphraseNameValueProtected) @@ -428,7 +429,7 @@ def update_encryption_settings_luks2_header(extra_items_to_encrypt=None): logger.log(level=CommonVariables.WarningLevel, msg="old passphrase is not removed from LUKS2 slot. Skip operation for device: {0}".format(device_item.name)) #removing backup token as KEK rotation is successful here. - disk_util.remove_token(device_name=device_item.name, + cvm_disk_util.remove_token(device_name=device_item.name, token_id=CommonVariables.cvm_ade_vm_encryption_backup_token_id) #update passphrase file for auto unlock key_file_name = CommonVariables.encryption_key_file_name @@ -1495,7 +1496,8 @@ def enable_encryption_format(passphrase, encryption_format_items, disk_util, cry logger.log(msg="update crypt item failed", level=CommonVariables.ErrorLevel) #update luks2 header token field if security_Type == CommonVariables.ConfidentialVM: - disk_util.import_token(device_path=crypt_item_to_update.dev_path, + cvm_disk_util = CVMDiskUtil(disk_util=disk_util, logger=logger) + cvm_disk_util.import_token(device_path=crypt_item_to_update.dev_path, passphrase_file=passphrase, public_settings=get_public_settings()) else: @@ -1657,7 +1659,8 @@ def encrypt_inplace_without_separate_header_file(passphrase_file, else: #update luks2 header token field if security_Type == CommonVariables.ConfidentialVM: - disk_util.import_token(device_path=ongoing_item_config.original_dev_path, + cvm_disk_util = CVMDiskUtil(disk_util=disk_util, logger=logger) + cvm_disk_util.import_token(device_path=ongoing_item_config.original_dev_path, passphrase_file=passphrase_file, public_settings=get_public_settings()) ongoing_item_config.current_slice_index = 0 diff --git a/VMEncryption/main/test/test_disk_util.py b/VMEncryption/main/test/test_disk_util.py index 59ace6f10..eb74ba4aa 100644 --- a/VMEncryption/main/test/test_disk_util.py +++ b/VMEncryption/main/test/test_disk_util.py @@ -3,6 +3,7 @@ import json from DiskUtil import DiskUtil +from CVMDiskUtil import CVMDiskUtil from EncryptionEnvironment import EncryptionEnvironment from Common import DeviceItem from Common import CommonVariables @@ -20,6 +21,7 @@ class Test_Disk_Util(unittest.TestCase): def setUp(self): self.logger = ConsoleLogger() self.disk_util = DiskUtil(None, MockDistroPatcher('Ubuntu', '14.04', '4.15'), self.logger, EncryptionEnvironment(None, self.logger)) + self.cvm_disk_util = CVMDiskUtil(disk_util=self.disk_util,logger=self.logger) try: self.assertCountEqual([1],[1]) except AttributeError: @@ -443,12 +445,12 @@ def test_import_token(self,cmd_exc_mock,path_exit): '''read passphrase from Passphrase_file, and update to LUKS2 header''' cmd_exc_mock.return_value = 0 path_exit.return_value=False - result = self.disk_util.import_token(device_path="/dev/sdc", + result = self.cvm_disk_util.import_token(device_path="/dev/sdc", passphrase_file=None, public_settings=None) self.assertEqual(result,False) path_exit.return_value=False - result = self.disk_util.import_token(device_path="/dev/sdc", + result = self.cvm_disk_util.import_token(device_path="/dev/sdc", passphrase_file="/var/lib/file_path", public_settings=None) self.assertEqual(result,False) @@ -459,15 +461,15 @@ def test_read_token(self,cmd_exc_mock,path_exists): '''read token from LUKS2 header token''' cmd_exc_mock.return_value = 1 path_exists.return_value = False - result = self.disk_util.read_token(device_name="",token_id=None) + result = self.cvm_disk_util.read_token(device_name="",token_id=None) self.assertEqual(result,None) cmd_exc_mock.return_value = 1 path_exists.return_value = True - result = self.disk_util.read_token(device_name="sda",token_id=None) + result = self.cvm_disk_util.read_token(device_name="sda",token_id=None) self.assertEqual(result,None) cmd_exc_mock.return_value = 1 path_exists.return_value = True - result = self.disk_util.read_token(device_name="sda",token_id=1) + result = self.cvm_disk_util.read_token(device_name="sda",token_id=1) self.assertEqual(result,None) @mock.patch("DiskUtil.DiskUtil.get_token_id") @@ -511,20 +513,20 @@ def test_get_token_id(self,luks_dump,header_or_dev_path_exist): luks_dump.return_value="Tokens:\n\ 1: Azure_Disk_Encryption_BackUp\n\ 5: Azure_Disk_Encryption" - token_id = self.disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption_BackUp") + token_id = self.cvm_disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption_BackUp") self.assertEqual(token_id,1) - token_id = self.disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption") + token_id = self.cvm_disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption") self.assertEqual(token_id,5) luks_dump.return_value="Tokens:\n\ 1: Azure_Disk_Encryption_BackUp" - token_id = self.disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption_BackUp") + token_id = self.cvm_disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption_BackUp") self.assertEqual(token_id,1) - token_id = self.disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption") + token_id = self.cvm_disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption") self.assertEqual(token_id,None) - token_id = self.disk_util.get_token_id("","Azure_Disk_Encryption_BackUp") + token_id = self.cvm_disk_util.get_token_id("","Azure_Disk_Encryption_BackUp") self.assertEqual(token_id,None) - token_id = self.disk_util.get_token_id("/dev/sda","") + token_id = self.cvm_disk_util.get_token_id("/dev/sda","") self.assertEqual(token_id,None) header_or_dev_path_exist.return_value = False - token_id = self.disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption") + token_id = self.cvm_disk_util.get_token_id("/dev/sda","Azure_Disk_Encryption") self.assertEqual(token_id,None) \ No newline at end of file From b49cd224de3a49293c657d53daf91958c8845893 Mon Sep 17 00:00:00 2001 From: PankajJoshi Date: Mon, 1 Apr 2024 22:20:45 +0530 Subject: [PATCH 2/3] removing CVM specific code from handle.py to CVMLuksUtil.py --- VMEncryption/main/CVMLuksUtil.py | 250 +++++++++++++++++++++++++++++++ VMEncryption/main/handle.py | 228 ++-------------------------- 2 files changed, 261 insertions(+), 217 deletions(-) create mode 100644 VMEncryption/main/CVMLuksUtil.py diff --git a/VMEncryption/main/CVMLuksUtil.py b/VMEncryption/main/CVMLuksUtil.py new file mode 100644 index 000000000..764caa807 --- /dev/null +++ b/VMEncryption/main/CVMLuksUtil.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python +# +# VMEncryption extension +# +# Copyright 2015 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import sys +import re +import os +import tempfile +import traceback + +from Common import CommonVariables, CryptItem +from EncryptionConfig import EncryptionConfig +from ExtensionParameter import ExtensionParameter +from DiskUtil import DiskUtil +from BekUtil import BekUtil +from CVMDiskUtil import CVMDiskUtil +class CVMLuksUtil(object): + '''This class contains CVM utils used in handle.py''' + + @staticmethod + def create_temp_file(file_content): + '''This function is creating a temp file of file_content. returning a file name.''' + temp_keyfile = tempfile.NamedTemporaryFile(delete=False) + if isinstance(file_content, bytes): + temp_keyfile.write(file_content) + else: + temp_keyfile.write(file_content.encode("utf-8")) + temp_keyfile.close() + return temp_keyfile.name + + @staticmethod + def is_confidential_temp_disk_encryption(public_settings,logger): + '''this function reads cvm public setting NoConfidentialEncryptionTempDisk for cvm temp disk encryption. + function returns true/false for for temp disk encryption. by default return is True.''' + no_confidential_encryption_tempdisk = public_settings.get("NoConfidentialEncryptionTempDisk") + no_confidential_encryption_tempdisk_flag = False + msg = "" + if no_confidential_encryption_tempdisk.__class__.__name__ in ['str','bool']: + if no_confidential_encryption_tempdisk.__class__.__name__ == 'str' and no_confidential_encryption_tempdisk.lower() == "true": + no_confidential_encryption_tempdisk_flag=True + else: + no_confidential_encryption_tempdisk_flag=no_confidential_encryption_tempdisk + msg="NoConfidentialEncryptionTempDisk: {0}".format(no_confidential_encryption_tempdisk_flag) + else: + if no_confidential_encryption_tempdisk: + msg="Invalid input {0}. NoConfidentialEncryptionTempDisk is set an invalid value by customer.".format(no_confidential_encryption_tempdisk) + else: + msg="NoConfidentialEncryptionTempDisk is not set,default value is false." + logger.log(msg=msg) + return not no_confidential_encryption_tempdisk_flag + + @staticmethod + def add_new_passphrase_luks2_key_slot(disk_util,\ + existing_passphrase,\ + new_passphrase,\ + device_path,\ + luks_header_path,\ + logger): + '''This function is used to add a new passphrase in luks key slot.''' + logger.log("add_new_passphrase_luks2_key_slot: start!") + ret = False + try: + before_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) + logger.log("Before key addition, key slots for {0}: {1}".format(device_path, before_key_slots)) + logger.log("Adding new key for {0}".format(device_path)) + existing_passphrase_file_name = CVMLuksUtil.create_temp_file(existing_passphrase) + new_passphrase_file_name = CVMLuksUtil.create_temp_file(new_passphrase) + luks_add_result = disk_util.luks_add_key(passphrase_file=existing_passphrase_file_name, + dev_path=device_path, + mapper_name=None, + header_file=luks_header_path, + new_key_path=new_passphrase_file_name) + logger.log("luks add result is {0}".format(luks_add_result)) + after_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) + logger.log("After key addition, key slots for {0}: {1}".format(device_path, after_key_slots)) + new_key_slot = list([x[0] != x[1] for x in zip(before_key_slots, after_key_slots)]).index(True) + logger.log("New key was added in key slot {0}".format(new_key_slot)) + os.unlink(existing_passphrase_file_name) + os.unlink(new_passphrase_file_name) + ret = True + except Exception as e: + msg="add_new_passphrase_luks2_key_slot failed with error {0}, stack trace: {1}".format(e, traceback.format_exc()) + logger.log(msg=msg,level=CommonVariables.WarningLevel) + logger.log("add_new_passphrase_luks2_key_slot: end!") + return ret + + @staticmethod + def remove_passphrase_luks2_key_slot(disk_util,\ + passphrase,\ + device_path,\ + luks_header_path,\ + logger): + '''This function is used for removing a passphrase from luks key slot.''' + logger.log("remove_passphrase_luks2_key_slot: start!") + ret = False + try: + before_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) + logger.log("Before key removal, key slots for {0}: {1}".format(device_path, before_key_slots)) + logger.log("Removing new key for {0}".format(device_path)) + passphrase_file_name = CVMLuksUtil.create_temp_file(passphrase) + luks_remove_result = disk_util.luks_remove_key(passphrase_file=passphrase_file_name, + dev_path=device_path, + header_file=luks_header_path) + logger.log("luks remove result is {0}".format(luks_remove_result)) + after_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) + logger.log("After key removal, key slots for {0}: {1}".format(device_path, after_key_slots)) + os.unlink(passphrase_file_name) + ret = True + except Exception as e: + msg="remove_passphrase_luks2_key_slot failed with error {0}, stack trace: {1}".format(e, traceback.format_exc()) + logger.log(msg=msg,level=CommonVariables.WarningLevel) + logger.log("remove_passphrase_luks2_key_slot: end!") + return ret + + @staticmethod + def update_encryption_settings_luks2_header(hutil,logger,public_setting,encryption_environment,protected_settings,extra_items_to_encrypt=None): + '''This function is used for updating metadata information in LUKS2 header.''' + if extra_items_to_encrypt is None: + extra_items_to_encrypt=[] + hutil.do_parse_context('UpdateEncryptionSettingsLuks2Header') + logger.log('Updating encryption settings LUKS-2 header') + # ensure cryptsetup package is still available in case it was for some reason removed after enable + try: + hutil.patching.install_cryptsetup() + except Exception as e: + hutil.save_seq() + message = "Failed to update encryption settings with error: {0}, stack trace: {1}".format(e, traceback.format_exc()) + hutil.do_exit(exit_code=CommonVariables.missing_dependency, + operation='UpdateEncryptionSettingsLuks2Header', + status=CommonVariables.extension_error_status, + code=str(CommonVariables.missing_dependency), + message=message) + try: + encryption_config = EncryptionConfig(encryption_environment, logger) + extension_parameter = ExtensionParameter(hutil, logger, hutil.patching, encryption_environment, protected_settings, public_setting) + disk_util = DiskUtil(hutil=hutil, patching=hutil.patching, logger=logger, encryption_environment=encryption_environment) + bek_util = BekUtil(disk_util, logger, encryption_environment) + cvm_disk_util = CVMDiskUtil(disk_util=disk_util, logger=logger) + device_items = disk_util.get_device_items(None) + if extension_parameter.passphrase is None or extension_parameter.passphrase == "": + extension_parameter.passphrase = bek_util.generate_passphrase() + + for device_item in device_items: + device_item_path = disk_util.get_device_path(device_item.name) + if not disk_util.is_luks_device(device_item_path,None): + logger.log("Not a LUKS device, device path: {0}".format(device_item_path)) + continue + #restoring the token data to type Azure_Disk_Encryption + #It is necessary to restore if we are resuming from previous attempt, otherwise its no-op. + cvm_disk_util.restore_luks2_token(device_name=device_item.name) + logger.log("Reading passphrase from LUKS2 header, device name: {0}".format(device_item.name)) + #copy primary token to backup token for recovery, if reboot or interrupt happened during KEK rotation. + ade_primary_token_id = cvm_disk_util.get_token_id(header_or_dev_path=device_item_path,token_name=CommonVariables.AzureDiskEncryptionToken) + if not ade_primary_token_id: + logger.log("primary token type: Azure_Disk_Encryption not found for device {0}".format(device_item.name)) + continue + data = cvm_disk_util.read_token(device_name=device_item.name,token_id=ade_primary_token_id) + #writing primary token data to backup token, update token type to back up. + data['type']=CommonVariables.AzureDiskEncryptionBackUpToken + #update backup token data to backup token id:6. + cvm_disk_util.import_token_data(device_path=device_item_path,token_data=data,token_id=CommonVariables.cvm_ade_vm_encryption_backup_token_id) + #get the unwrapped passphrase from LUKS2 header. + passphrase=cvm_disk_util.export_token(device_name=device_item.name) + if not passphrase: + logger.log(level=CommonVariables.WarningLevel, + msg="No passphrase found in LUKS2 header, device name: {0}".format(device_item.name)) + continue + #remove primary token from Tokens field of LUKS2 header. + cvm_disk_util.remove_token(device_name=device_item.name,token_id=ade_primary_token_id) + logger.log("Updating wrapped passphrase to LUKS2 header with current public setting. device name {0}".format(device_item.name)) + #add new slot with new passphrase. + is_added = CVMLuksUtil.add_new_passphrase_luks2_key_slot(disk_util=disk_util, + existing_passphrase=passphrase, + new_passphrase=extension_parameter.passphrase, + device_path= device_item_path, + luks_header_path=None, + logger= logger) + if not is_added: + logger.log(level=CommonVariables.WarningLevel, + msg="new passphrase is not added to LUKS2 slot. Skip operation for device: {0}".format(device_item.name)) + continue + #protect passphrase before updating to LUKS2 is done in import_token + new_passphrase_file = CVMLuksUtil.create_temp_file(extension_parameter.passphrase) + #save passphrase to LUKS2 header with PassphraseNameValueProtected + ret = cvm_disk_util.import_token(device_path=device_item_path, + passphrase_file=new_passphrase_file, + public_settings=public_setting, + passphrase_name_value=CommonVariables.PassphraseNameValueProtected) + if not ret: + logger.log(level=CommonVariables.WarningLevel, + msg="Update passphrase with current public setting to LUKS2 header is not successful. device path {0}".format(device_item_path)) + return None + os.unlink(new_passphrase_file) + #removing old password form key slot. + is_removed = CVMLuksUtil.remove_passphrase_luks2_key_slot(disk_util=disk_util, + passphrase=passphrase, + device_path=device_item_path, + luks_header_path=None, + logger=logger) + if not is_removed: + logger.log(level=CommonVariables.WarningLevel, + msg="old passphrase is not removed from LUKS2 slot. Skip operation for device: {0}".format(device_item.name)) + #removing backup token as KEK rotation is successful here. + cvm_disk_util.remove_token(device_name=device_item.name, + token_id=CommonVariables.cvm_ade_vm_encryption_backup_token_id) + #update passphrase file for auto unlock + key_file_name = CommonVariables.encryption_key_file_name + scsi_lun_numbers = disk_util.get_azure_data_disk_controller_and_lun_numbers([os.path.realpath(device_item_path)]) + if len(scsi_lun_numbers) != 0: + scsi_controller, lun_number = scsi_lun_numbers[0] + key_file_name = "{0}_{1}_{2}".format(key_file_name,str(scsi_controller),str(lun_number)) + bek_util.store_bek_passphrase_file_name(encryption_config=encryption_config, + passphrase=extension_parameter.passphrase, + key_file_name=key_file_name) + #committing the extension parameter if KEK rotation is successful. + extension_parameter.commit() + + if len(extra_items_to_encrypt) > 0: + hutil.do_status_report(operation='UpdateEncryptionSettingsLuks2Header', + status=CommonVariables.extension_success_status, + status_code=str(CommonVariables.success), + message='Encryption settings updated in LUKS2 header') + else: + hutil.do_exit(exit_code=0, + operation='UpdateEncryptionSettingsLuks2Header', + status=CommonVariables.extension_success_status, + code=str(CommonVariables.success), + message='Encryption settings updated in LUKS2 header') + except Exception as e: + hutil.save_seq() + message = "Failed to update encryption settings Luks2 header with error: {0}, stack trace: {1}".format(e, traceback.format_exc()) + logger.log(msg=message, level=CommonVariables.ErrorLevel) + hutil.do_exit(exit_code=CommonVariables.unknown_error, + operation='UpdateEncryptionSettingsLuks2Header', + status=CommonVariables.extension_error_status, + code=str(CommonVariables.unknown_error), + message=message) diff --git a/VMEncryption/main/handle.py b/VMEncryption/main/handle.py index 542030d1a..a5f0ebcb4 100644 --- a/VMEncryption/main/handle.py +++ b/VMEncryption/main/handle.py @@ -37,6 +37,7 @@ from ExtensionParameter import ExtensionParameter from DiskUtil import DiskUtil from CVMDiskUtil import CVMDiskUtil +from CVMLuksUtil import CVMLuksUtil from CryptMountConfigUtil import CryptMountConfigUtil from ResourceDiskUtil import ResourceDiskUtil from BackupLogger import BackupLogger @@ -273,196 +274,6 @@ def get_protected_settings(): else: return protected_settings_str -def create_temp_file(file_content): - '''This function is creating a temp file of file_content. returning a file name.''' - temp_keyfile = tempfile.NamedTemporaryFile(delete=False) - if isinstance(file_content, bytes): - temp_keyfile.write(file_content) - else: - temp_keyfile.write(file_content.encode("utf-8")) - temp_keyfile.close() - return temp_keyfile.name - -def add_new_passphrase_luks2_key_slot(disk_util,\ - existing_passphrase,\ - new_passphrase,\ - device_path,\ - luks_header_path): - '''This function is used to add a new passphrase in luks key slot.''' - logger.log("add_new_passphrase_luks2_key_slot: start!") - ret = False - try: - before_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) - logger.log("Before key addition, key slots for {0}: {1}".format(device_path, before_key_slots)) - logger.log("Adding new key for {0}".format(device_path)) - existing_passphrase_file_name = create_temp_file(existing_passphrase) - new_passphrase_file_name = create_temp_file(new_passphrase) - luks_add_result = disk_util.luks_add_key(passphrase_file=existing_passphrase_file_name, - dev_path=device_path, - mapper_name=None, - header_file=luks_header_path, - new_key_path=new_passphrase_file_name) - logger.log("luks add result is {0}".format(luks_add_result)) - after_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) - logger.log("After key addition, key slots for {0}: {1}".format(device_path, after_key_slots)) - new_key_slot = list([x[0] != x[1] for x in zip(before_key_slots, after_key_slots)]).index(True) - logger.log("New key was added in key slot {0}".format(new_key_slot)) - os.unlink(existing_passphrase_file_name) - os.unlink(new_passphrase_file_name) - ret = True - except Exception as e: - msg="add_new_passphrase_luks2_key_slot failed with error {0}, stack trace: {1}".format(e, traceback.format_exc()) - logger.log(msg=msg,level=CommonVariables.WarningLevel) - logger.log("add_new_passphrase_luks2_key_slot: end!") - return ret - -def remove_passphrase_luks2_key_slot(disk_util,\ - passphrase,\ - device_path,\ - luks_header_path): - '''This function is used for removing a passphrase from luks key slot.''' - logger.log("remove_passphrase_luks2_key_slot: start!") - ret = False - try: - before_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) - logger.log("Before key removal, key slots for {0}: {1}".format(device_path, before_key_slots)) - logger.log("Removing new key for {0}".format(device_path)) - passphrase_file_name = create_temp_file(passphrase) - luks_remove_result = disk_util.luks_remove_key(passphrase_file=passphrase_file_name, - dev_path=device_path, - header_file=luks_header_path) - logger.log("luks remove result is {0}".format(luks_remove_result)) - after_key_slots = disk_util.luks_dump_keyslots(device_path, luks_header_path) - logger.log("After key removal, key slots for {0}: {1}".format(device_path, after_key_slots)) - os.unlink(passphrase_file_name) - ret = True - except Exception as e: - msg="remove_passphrase_luks2_key_slot failed with error {0}, stack trace: {1}".format(e, traceback.format_exc()) - logger.log(msg=msg,level=CommonVariables.WarningLevel) - logger.log("remove_passphrase_luks2_key_slot: end!") - return ret - -def update_encryption_settings_luks2_header(extra_items_to_encrypt=None): - '''This function is used for updating metadata information in LUKS2 header.''' - if extra_items_to_encrypt is None: - extra_items_to_encrypt=[] - hutil.do_parse_context('UpdateEncryptionSettingsLuks2Header') - logger.log('Updating encryption settings LUKS-2 header') - # ensure cryptsetup package is still available in case it was for some reason removed after enable - try: - DistroPatcher.install_cryptsetup() - except Exception as e: - hutil.save_seq() - message = "Failed to update encryption settings with error: {0}, stack trace: {1}".format(e, traceback.format_exc()) - hutil.do_exit(exit_code=CommonVariables.missing_dependency, - operation='UpdateEncryptionSettingsLuks2Header', - status=CommonVariables.extension_error_status, - code=str(CommonVariables.missing_dependency), - message=message) - try: - public_setting = get_public_settings() - encryption_config = EncryptionConfig(encryption_environment, logger) - extension_parameter = ExtensionParameter(hutil, logger, DistroPatcher, encryption_environment, get_protected_settings(), public_setting) - disk_util = DiskUtil(hutil=hutil, patching=DistroPatcher, logger=logger, encryption_environment=encryption_environment) - bek_util = BekUtil(disk_util, logger, encryption_environment) - cvm_disk_util = CVMDiskUtil(disk_util=disk_util, logger=logger) - device_items = disk_util.get_device_items(None) - if extension_parameter.passphrase is None or extension_parameter.passphrase == "": - extension_parameter.passphrase = bek_util.generate_passphrase() - - for device_item in device_items: - device_item_path = disk_util.get_device_path(device_item.name) - if not disk_util.is_luks_device(device_item_path,None): - logger.log("Not a LUKS device, device path: {0}".format(device_item_path)) - continue - #restoring the token data to type Azure_Disk_Encryption - #It is necessary to restore if we are resuming from previous attempt, otherwise its no-op. - cvm_disk_util.restore_luks2_token(device_name=device_item.name) - logger.log("Reading passphrase from LUKS2 header, device name: {0}".format(device_item.name)) - #copy primary token to backup token for recovery, if reboot or interrupt happened during KEK rotation. - ade_primary_token_id = cvm_disk_util.get_token_id(header_or_dev_path=device_item_path,token_name=CommonVariables.AzureDiskEncryptionToken) - if not ade_primary_token_id: - logger.log("primary token type: Azure_Disk_Encryption not found for device {0}".format(device_item.name)) - continue - data = cvm_disk_util.read_token(device_name=device_item.name,token_id=ade_primary_token_id) - #writing primary token data to backup token, update token type to back up. - data['type']=CommonVariables.AzureDiskEncryptionBackUpToken - #update backup token data to backup token id:6. - cvm_disk_util.import_token_data(device_path=device_item_path,token_data=data,token_id=CommonVariables.cvm_ade_vm_encryption_backup_token_id) - #get the unwrapped passphrase from LUKS2 header. - passphrase=cvm_disk_util.export_token(device_name=device_item.name) - if not passphrase: - logger.log(level=CommonVariables.WarningLevel, - msg="No passphrase found in LUKS2 header, device name: {0}".format(device_item.name)) - continue - #remove primary token from Tokens field of LUKS2 header. - cvm_disk_util.remove_token(device_name=device_item.name,token_id=ade_primary_token_id) - logger.log("Updating wrapped passphrase to LUKS2 header with current public setting. device name {0}".format(device_item.name)) - #add new slot with new passphrase. - is_added = add_new_passphrase_luks2_key_slot(disk_util=disk_util, - existing_passphrase=passphrase, - new_passphrase=extension_parameter.passphrase, - device_path= device_item_path, - luks_header_path=None) - if not is_added: - logger.log(level=CommonVariables.WarningLevel, - msg="new passphrase is not added to LUKS2 slot. Skip operation for device: {0}".format(device_item.name)) - continue - #protect passphrase before updating to LUKS2 is done in import_token - new_passphrase_file = create_temp_file(extension_parameter.passphrase) - #save passphrase to LUKS2 header with PassphraseNameValueProtected - ret = cvm_disk_util.import_token(device_path=device_item_path, - passphrase_file=new_passphrase_file, - public_settings=public_setting, - passphrase_name_value=CommonVariables.PassphraseNameValueProtected) - if not ret: - logger.log(level=CommonVariables.WarningLevel, - msg="Update passphrase with current public setting to LUKS2 header is not successful. device path {0}".format(device_item_path)) - return None - os.unlink(new_passphrase_file) - #removing old password form key slot. - is_removed = remove_passphrase_luks2_key_slot(disk_util=disk_util, - passphrase=passphrase, - device_path=device_item_path, - luks_header_path=None) - if not is_removed: - logger.log(level=CommonVariables.WarningLevel, - msg="old passphrase is not removed from LUKS2 slot. Skip operation for device: {0}".format(device_item.name)) - #removing backup token as KEK rotation is successful here. - cvm_disk_util.remove_token(device_name=device_item.name, - token_id=CommonVariables.cvm_ade_vm_encryption_backup_token_id) - #update passphrase file for auto unlock - key_file_name = CommonVariables.encryption_key_file_name - scsi_lun_numbers = disk_util.get_azure_data_disk_controller_and_lun_numbers([os.path.realpath(device_item_path)]) - if len(scsi_lun_numbers) != 0: - scsi_controller, lun_number = scsi_lun_numbers[0] - key_file_name = "{0}_{1}_{2}".format(key_file_name,str(scsi_controller),str(lun_number)) - bek_util.store_bek_passphrase_file_name(encryption_config=encryption_config, - passphrase=extension_parameter.passphrase, - key_file_name=key_file_name) - #committing the extension parameter if KEK rotation is successful. - extension_parameter.commit() - - if len(extra_items_to_encrypt) > 0: - hutil.do_status_report(operation='UpdateEncryptionSettingsLuks2Header', - status=CommonVariables.extension_success_status, - status_code=str(CommonVariables.success), - message='Encryption settings updated in LUKS2 header') - else: - hutil.do_exit(exit_code=0, - operation='UpdateEncryptionSettingsLuks2Header', - status=CommonVariables.extension_success_status, - code=str(CommonVariables.success), - message='Encryption settings updated in LUKS2 header') - except Exception as e: - hutil.save_seq() - message = "Failed to update encryption settings Luks2 header with error: {0}, stack trace: {1}".format(e, traceback.format_exc()) - logger.log(msg=message, level=CommonVariables.ErrorLevel) - hutil.do_exit(exit_code=CommonVariables.unknown_error, - operation='UpdateEncryptionSettingsLuks2Header', - status=CommonVariables.extension_error_status, - code=str(CommonVariables.unknown_error), - message=message) def update_encryption_settings(extra_items_to_encrypt=[]): hutil.do_parse_context('UpdateEncryptionSettings') @@ -754,7 +565,8 @@ def mount_encrypted_disks(disk_util, crypt_mount_config_util, bek_util, passphra resource_disk_util.automount() else: logger.log("Format resource disk if unusable. security type {0}".format(security_Type)) - resource_disk_util.automount(is_confidential_temp_disk_encryption()) + is_cmv_temp_disk_encryption = CVMLuksUtil.is_confidential_temp_disk_encryption(public_settings=get_public_settings(),logger=logger) + resource_disk_util.automount(is_cmv_temp_disk_encryption) logger.log("mounted resource disk") else: # Probably a re-image scenario: Just do a best effort @@ -788,6 +600,7 @@ def mount_encrypted_disks(disk_util, crypt_mount_config_util, bek_util, passphra encryption_environment.enable_se_linux() def is_vns_call(): + '''checking if the call is vns call.''' for a in sys.argv[1:]: if re.match("^([-/]*)(vnscall)", a): return True @@ -836,7 +649,6 @@ def main(): elif re.match("^([-/]*)(daemon)", a): daemon() - def mark_encryption(command, volume_type, disk_format_query, encryption_mode=None, encryption_phase=None): encryption_marker = EncryptionMarkConfig(logger, encryption_environment) encryption_marker.command = command @@ -893,7 +705,6 @@ def is_daemon_running(): return False - def enable(): lock = ProcessLock(logger=logger, lock_file_path=encryption_environment.enable_lock_file_path) if not lock.try_lock(): @@ -1145,7 +956,12 @@ def handle_encryption(public_settings, encryption_status, disk_util, bek_util, e if security_Type==CommonVariables.ConfidentialVM: logger.log('Calling Update Encryption Setting in LUKS2 header.') if extension_parameter.cmk_changed(): - update_encryption_settings_luks2_header(items_to_encrypt) + CVMLuksUtil.update_encryption_settings_luks2_header(hutil=hutil, + logger=logger, + public_setting=get_public_settings(), + encryption_environment=encryption_environment, + protected_settings=get_protected_settings(), + extra_items_to_encrypt=items_to_encrypt) if not are_devices_encrypted: enable_encryption() else: @@ -1177,28 +993,6 @@ def handle_encryption(public_settings, encryption_status, disk_util, bek_util, e logger.log('Calling enable for volume type {0}.'.format(volume_type)) enable_encryption() -def is_confidential_temp_disk_encryption(): - '''this function reads cvm public setting NoConfidentialEncryptionTempDisk for cvm temp disk encryption. - function returns true/false for for temp disk encryption. by default return is True.''' - public_settings = get_public_settings() - no_confidential_encryption_tempdisk = public_settings.get("NoConfidentialEncryptionTempDisk") - no_confidential_encryption_tempdisk_flag = False - msg = "" - if no_confidential_encryption_tempdisk.__class__.__name__ in ['str','bool']: - if no_confidential_encryption_tempdisk.__class__.__name__ == 'str' and no_confidential_encryption_tempdisk.lower() == "true": - no_confidential_encryption_tempdisk_flag=True - else: - no_confidential_encryption_tempdisk_flag=no_confidential_encryption_tempdisk - msg="NoConfidentialEncryptionTempDisk: {0}".format(no_confidential_encryption_tempdisk_flag) - else: - if no_confidential_encryption_tempdisk: - msg="Invalid input {0}. NoConfidentialEncryptionTempDisk is set an invalid value by customer.".format(no_confidential_encryption_tempdisk) - else: - msg="NoConfidentialEncryptionTempDisk is not set,default value is false." - logger.log(msg=msg) - return not no_confidential_encryption_tempdisk_flag - - def enable_encryption(): hutil.do_parse_context('EnableEncryption') # we need to start another subprocess to do it, because the initial process @@ -1310,7 +1104,7 @@ def enable_encryption(): #Temp disk encryption will happen for CVM type encryptResourceDisk = False if security_Type==CommonVariables.ConfidentialVM: - encryptResourceDisk = is_confidential_temp_disk_encryption() + encryptResourceDisk = CVMLuksUtil.is_confidential_temp_disk_encryption(public_settings=get_public_settings(),logger=logger) elif extension_parameter.command == CommonVariables.EnableEncryptionFormatAll: encryptResourceDisk = True From d31a0af099e7008077d43d10195fa4c876687daf Mon Sep 17 00:00:00 2001 From: PankajJoshi Date: Tue, 2 Apr 2024 10:03:11 +0530 Subject: [PATCH 3/3] correction to test cases. --- VMEncryption/main/test/test_disk_util.py | 39 ++++++++++++------------ 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/VMEncryption/main/test/test_disk_util.py b/VMEncryption/main/test/test_disk_util.py index eb74ba4aa..e74c609fb 100644 --- a/VMEncryption/main/test/test_disk_util.py +++ b/VMEncryption/main/test/test_disk_util.py @@ -8,6 +8,7 @@ from Common import DeviceItem from Common import CommonVariables from CommandExecutor import CommandExecutor +from CVMDiskUtil import CVMDiskUtil from .console_logger import ConsoleLogger from .test_utils import mock_dir_structure, MockDistroPatcher @@ -21,7 +22,7 @@ class Test_Disk_Util(unittest.TestCase): def setUp(self): self.logger = ConsoleLogger() self.disk_util = DiskUtil(None, MockDistroPatcher('Ubuntu', '14.04', '4.15'), self.logger, EncryptionEnvironment(None, self.logger)) - self.cvm_disk_util = CVMDiskUtil(disk_util=self.disk_util,logger=self.logger) + self.cvm_disk_util = CVMDiskUtil(disk_util=self.disk_util, logger=self.logger) try: self.assertCountEqual([1],[1]) except AttributeError: @@ -231,7 +232,7 @@ def test_make_sure_path_exists(self, cmd_exc_mock, exists_mock): self.assertEqual(path_exists, 0) self.assertEqual(cmd_exc_mock.call_count, 1) - @mock.patch("DiskUtil.DiskUtil._luks_get_header_dump") + @mock.patch("DiskUtil.DiskUtil.luks_get_header_dump") def test_luks_dump_keyslots(self, get_luks_header_mock): get_luks_header_mock.return_value = """ LUKS header information @@ -310,7 +311,7 @@ def test_get_luks_header_size_v222(self, ver_mock): header_size = self.disk_util.get_luks_header_size() self.assertEqual(header_size, CommonVariables.luks_header_size_v2) - @mock.patch("DiskUtil.DiskUtil._luks_get_header_dump") + @mock.patch("DiskUtil.DiskUtil.luks_get_header_dump") def test_get_luks_header_size_luks1(self, lghd_mock): lghd_mock.return_value = """ LUKS header information for /dev/sdd @@ -343,7 +344,7 @@ def test_get_luks_header_size_luks1(self, lghd_mock): header_size = self.disk_util.get_luks_header_size("/mocked/device/path") self.assertEqual(header_size, CommonVariables.luks_header_size) - @mock.patch("DiskUtil.DiskUtil._luks_get_header_dump") + @mock.patch("DiskUtil.DiskUtil.luks_get_header_dump") def test_get_luks_header_size_luks2(self, lghd_mock): lghd_mock.return_value = """ LUKS header information @@ -392,7 +393,7 @@ def test_get_luks_header_size_luks2(self, lghd_mock): header_size = self.disk_util.get_luks_header_size("/mocked/device/path") self.assertEqual(header_size, CommonVariables.luks_header_size_v2) - @mock.patch("DiskUtil.DiskUtil._luks_get_header_dump") + @mock.patch("DiskUtil.DiskUtil.luks_get_header_dump") @mock.patch("DiskUtil.DiskUtil._extract_luks_version_from_dump") def test_get_luks_header_size_bad_version(self, ver_mock, dump_mock): # log error, return None if LUKS version is outside of supported {1,2} @@ -401,7 +402,7 @@ def test_get_luks_header_size_bad_version(self, ver_mock, dump_mock): header_size = self.disk_util.get_luks_header_size("/mocked/device/path") self.assertEqual(header_size, None) - @mock.patch("DiskUtil.DiskUtil._luks_get_header_dump") + @mock.patch("DiskUtil.DiskUtil.luks_get_header_dump") @mock.patch("DiskUtil.DiskUtil._extract_luks_version_from_dump") def test_get_luks_header_size_luks1_badoffset(self, ver_mock, dump_mock): # log error, return None if LUKS1 offset is not found in header dump @@ -410,7 +411,7 @@ def test_get_luks_header_size_luks1_badoffset(self, ver_mock, dump_mock): header_size = self.disk_util.get_luks_header_size("/mocked/device/path") self.assertEqual(header_size, None) - @mock.patch("DiskUtil.DiskUtil._luks_get_header_dump") + @mock.patch("DiskUtil.DiskUtil.luks_get_header_dump") @mock.patch("DiskUtil.DiskUtil._extract_luks_version_from_dump") def test_get_luks_header_size_luks2_badoffset(self, ver_mock, dump_mock): # log error, return None if LUKS2 offset is not found in header dump @@ -424,19 +425,19 @@ def test_import_token_data(self,cmd_exc_mock): '''update dict type data to luks2 header''' cmd_exc_mock.return_value = 0 data = {} - result = self.disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) + result = self.cvm_disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) self.assertEqual(result,False) data = None - result = self.disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) + result = self.cvm_disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) self.assertEqual(result,False) data = [1,3] - result = self.disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) + result = self.cvm_disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) self.assertEqual(result,False) data = "test data" - result = self.disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) + result = self.cvm_disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) self.assertEqual(result,False) data = {'version':'1.0','type':'ADE'} - result = self.disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) + result = self.cvm_disk_util.import_token_data(device_path="/dev/sda",token_data=data,token_id=1) self.assertEqual(result,True) @mock.patch("os.path.exists", return_value=True) @@ -472,7 +473,7 @@ def test_read_token(self,cmd_exc_mock,path_exists): result = self.cvm_disk_util.read_token(device_name="sda",token_id=1) self.assertEqual(result,None) - @mock.patch("DiskUtil.DiskUtil.get_token_id") + @mock.patch("CVMDiskUtil.CVMDiskUtil.get_token_id") @mock.patch("os.path.exists") @mock.patch("CommandExecutor.CommandExecutor.Execute", return_value=0) def test_export_token(self,cmd_exc_mock,path_exists,ade_encryption_token): @@ -480,25 +481,25 @@ def test_export_token(self,cmd_exc_mock,path_exists,ade_encryption_token): ade_encryption_token.return_value = 5 cmd_exc_mock.return_value = 1 path_exists.return_value = False - protector = self.disk_util.export_token("sda") + protector = self.cvm_disk_util.export_token("sda") self.assertEqual(protector, None) path_exists.return_value = True ade_encryption_token.return_value = None - protector = self.disk_util.export_token("sda") + protector = self.cvm_disk_util.export_token("sda") self.assertEqual(protector, None) def test_extract_luksv2_token(self): '''extracting Tokens field to get token from LUKS2 header. returns list''' luks_dump_out = None - tokens = self.disk_util._extract_luksv2_token(luks_dump_out) + tokens = self.cvm_disk_util.extract_luks2_token(luks_dump_out) self.assertEqual(len(tokens), 0) luks_dump_out = "" - tokens = self.disk_util._extract_luksv2_token(luks_dump_out) + tokens = self.cvm_disk_util.extract_luks2_token(luks_dump_out) self.assertEqual(len(tokens), 0) luks_dump_out = "Tokens:\n\ 1: Azure_Disk_Encryption_BackUp\n\ 5: Azure_Disk_Encryption" - tokens = self.disk_util._extract_luksv2_token(luks_dump_out) + tokens = self.cvm_disk_util.extract_luks2_token(luks_dump_out) self.assertEqual(len(tokens), 2) self.assertEqual(tokens[0][0], 1) self.assertEqual(tokens[0][1], "Azure_Disk_Encryption_BackUp") @@ -506,7 +507,7 @@ def test_extract_luksv2_token(self): self.assertEqual(tokens[1][1], "Azure_Disk_Encryption") @mock.patch("os.path.exists") - @mock.patch("DiskUtil.DiskUtil._luks_get_header_dump") + @mock.patch("DiskUtil.DiskUtil.luks_get_header_dump") def test_get_token_id(self,luks_dump,header_or_dev_path_exist): '''getting the token id from LUKS2 header, if not present return None''' header_or_dev_path_exist.return_value = True