Encrypting EBS volumes programmatically with python
2980 words 14 min read
Table of Contents
Encrypting attached AWS EBS volume involves a number of steps. This article will show you how to encrypt your volumes using python.
Let’s set the scene, you have an environment hosting a number of AWS EC2 instances and now security have said, “Hey, these EBS volumes should be encrypted!” No argument from me. So how do we go about this programmatically.
You can enable default volume encryption in the management console. Check this link out on how to do it. It was not always there hence the need for this process.
A word on reuse of code
Just popping this in as if you looked at my article on terminating EC2 instances by tag, you will notice there is a fair bit of code reused here. A phrase I hear quite a lot over the years is ‘let’s not re-invent the wheel!’ So whilst this is fantastic advice, it sometimes takes longer finding where the wheel was last left! In my own development work I found that sometimes I could not find a snippet of code I once used. Thankfully these days we have things like online git repositories.
Getting back on topic….
Method.
Unfortunately (at the time of writing) there is no easy method of changing an EBS volume from being unencrypted to encrypted. The process is along these lines
Shutdown the instance (if running)
Create a snapshot (This will be unencrypted) of the EBS volume
Copy the snapshot and enable encryption of the copy
Create a new EBS volume from the encrypted snapshot
Detach the unencrypted EBS volume and attach the encrypted volume
Start the instance (if it was running)
Optional: Delete the unencrypted EBS volumes
Hopefully you thought the same as me, you definitely do not want to be doing this manually!
Encryption Key
The example in this guide uses the AWS accounts default EBS encryption key. Should you for any reason want to share the encrypted volumes or snapshots with another account you cannot use the default key. It’s the default key for the specific AWS account the key resides. You could create your own key and encrypt using that. The key you create can then be shared with other accounts.
The image below shows the default (AWS Managed) ebs encryption key.
Process Flow
This is another one of those developed in bash for a GoCD pipeline scripts which I am again rewriting into Python.
We already have a feel for the steps involved listed in the method section above, let’s take a look at the flowchart.
Flowchart - Full
You will notice this is the same flow diagram used in the article Terminate EC2 instance by tag . The flow will be the same, the “Process Instances” procedure is where are the interesting stuff with be done. Let’s dive into that process
Flowchart - Process Instances
As you can see, not a straightforward process.
Script
Similar to my previous scripts we break it down into functions. I will also include remarks to elaborate on some of the code. Not everyone will want or need the remarks so there will be a link at the end to the public github repository which is a lot cleaner.
Imports
This first block of code is setting the scene by importing libraries or only parts of a library by using from.
These are the variables we can use in any CD package
1
2
3
4
5
6
7
8
search_tag="some_key_name"# Tag to search for instancessearch_value="some_key_value"#value in tag to search for instances snap_prefix=dtstr+"-post_encryption-snapshot"#snapshot descriptionarole="AddARoleHere"#role to assume across accountsaccounts=['AWS Accounts Here','Another AWS Account here']# list of accounts e.g ['0000000000000','1111111111111','2222222222222222','333333333333333333']total_acc=len(accounts)region="eu-west-2"verbose=True#False will suppress most of the output
Functions
Main
This is the function which ties it all together. It handles the calls to the other functions.
defmain():globalec2globalinstancesglobalaz2globalIidglobalinstglobalFailedIidglobalSuccessIidprocessing_acc=0FailedIid=[]SuccessIid=[]client=boto3.client("sts")account_id=client.get_caller_identity()["Account"]ifverbose:print(f"script is executing in {account_id}")foraccinaccounts:processing_acc+=1ifverbose:print(f"Processing account : {processing_acc}")ifacc!=account_id:assume_roles(acc,accounts,arole)ec2=boto3.client('ec2',aws_access_key_id=acc_key,aws_secret_access_key=sec_key,aws_session_token=sess_tok,region_name='eu-west-1')else:ifverbose:print(f"Execution account, no assume required")ec2=boto3.client('ec2')instances=get_instances(processing_acc)forinstininstances:try:az=inst.get("Placement")az2=az.get("AvailabilityZone")Iid=[]Iid.append(inst.get('InstanceId'))#The for and first if can be removed, used to retrieve the name tag for verbose outputfortagsininst.get('Tags'):iftags["Key"]=='Name':Iname=tags["Value"]process_instance(Iname)print("-------------------------------------------------------------------------")exceptbotocore.exceptions.ClientErroraser:print("error on main")print(er.response['Error']['Message'])FailedIid.append(Iid[0])continueprint(f"Errors encountered with these instances: {FailedIid}")print(f"Successfully processed these instances: {SuccessIid}")if__name__=="__main__":main()
assume_roles
This is exactly the same as my reference script. Roles are good. Use roles! Bit like cake, I like cake!
If the AWS account being processed is not the AWS account the script is being executed from, you will need to assume a role defined in the target account. The role will have to have sufficient access to perform the necessary EC2 operations. In the source AWS account the instance, person or service performing the execution will need access to use the AWS Security Token Service (STS). AWS STS will grant temporary short lived credentials to perform the necessary activities. This is the recommended approach and removed the need to have credentials stored in code.
Another blatant re-use, handy this ;) Once you have worked out the AWS account security settings , you can start retrieving the EC2 instances for processing. The EC2 instances retrieved are filtered to only retrieve instances matching the search_tag and search_value. These are then added to a list for further processing.
This is the function which justified its own flowchart above. All of the processing of the EC2 instance is performed within this function and it makes all the calls to the functions which handle the snapshots, detaching, attaching etc.
defprocess_instance(Iname):globalvolattglobalvolidglobaltags_listglobalIstateglobalIstateCodeprocessed_status="no"#set a value to establish if the instance state has been retrievedIstate=inst.get('State')IstateCode=Istate.get('Code')get_status(Iid)ifverbose:print(f"Instance Name : {Iname} ; Instance Id : {Iid[0]} ; Instance state : {IstateCode}")print(f"Checking volumes attached to {Iid[0]} for encryption settings")vols=ec2.describe_volumes(Filters=[{'Name':'attachment.instance-id','Values':[str(Iid[0]),],},],)x=0fordevinvols.get('Volumes'):#Loop through each EBS volumes attached to the EC2 instancex=x+1#used to increment and show the volume number being processedifverbose:print(f"processing volume : {x}")try:att=dev.get("Attachments")#retrieve the attachment list in the volume jsonencstatus=dev.get('Encrypted')#retrieve the encryption statevolid=dev.get('VolumeId')ifverbose:print(f"volumeid : {volid}")print(f"attachment : {att}")volatt=att[0].get('Device')ifencstatus==False:#its not encrypted so lets sort that out, its why we are here :)ifverbose:print(f"Volume will need to be encrypted")ifinitial_status=="running"andprocessed_status=="no":#if the instance is running, shut it down. You only want to do this once and not iterate through it with each volumeifverbose:print(f"shutting down {Iid[0]}")shutdown_instance(Iid)#call the shutdown instance functionprocessed_status="yes"#set it as processed so we don't try shut it down again for the next volumetags_list=dev.get('Tags',[])#We need the tags to add them to the new encrypted volumemoveon="no"whilenotIid[0]inFailedIidandmoveon=="no":#step through this section until one of two things happen. The instance is added to the failure list or we set moveonto yesdetach_old_ebs()#function to detach the old EBS volumesnapshot_volumes()#function to create the unencrypted snapshotsnapshot_copy()#function to create an encrypted copy of the unencrypted snapshot and delete the unencrypted snapshotcreate_ebs()#function to create an encrypted EBS volume from the encrypted snapshotattach_new_ebs()#attach the new encrypted EBS volumeset_delete_terminate()#for good messure set the deleteonterminate flag to truedelete_ebs()#delete the old unencrypted ebsmoveon="yes"#flag to exit the while loopelse:ifverbose:print(f"{volid} is already encrypted")exceptbotocore.exceptions.ClientErroraser:#capture and output any exceptionsprint("error on check_volumes")print(er.response['Error']['Message'])FailedIid.append(Iid[0])ifinitial_status=="running":ifverbose:print(f"starting instance {Iid[0]}")start_instance(Iid)#if the instance was running when we started processing, start it back upelse:#the EC2 instance was stopped when we started. We got this far so lets mark it as a success.ifnotIid[0]inFailedIid:SuccessIid.append(Iid[0])
The next few functions are all called by the preceding function so I will try keep them in a logical order starting of with retrieving the EC2 instances run state.
get_status
As this script will be stopping and starting instances you want to know what the starting state was. Why? you might ask. well let’s say you were using the cloud in the correct manner, it’s possible that the instances are already in a stopped state for cost saving purposes. You want to be able to leave them like that when you are finished. This function will work out the state based on the meta state codes. The variable names initial_status will be returned to the calling function.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
defget_status(Iid):globalinitial_statusifIstateCode==16orIstateCode==32orIstateCode==64orIstateCode==80:ifIstateCode==16:initial_status="running"elifIstateCode==32orIstateCode==64:initial_status="shutting_down"elifIstateCode==80:initial_status="stopped"else:print(f"Warning : Instance {Iid[0]} is not running, stopping or stopped. Please perform a manual check")initial_status="not_sure"FailedIid.append(Iid[0])returninitial_status#Capture and return the status to the calling function
shutdown_instance
Shutdown the EC2 instance being processed.
You might notice a number of try and except statements in the functions throughout this example. Try something, if it fails, capture and output the error. All part of improving my code and learning something new. Good link here on this very subject.
1
2
3
4
5
6
7
8
defshutdown_instance(Iid):try:ec2.stop_instances(InstanceIds=Iid)shutdown_instance_wait(Iid,initial_status)exceptbotocore.exceptions.ClientErroraser:print(er.message)print("error on shutdown_instance")FailedIid.append(Iid[0])
shutdown_instance_wait
call the waiter to ensure you do not try to detach or attach disks whilst the EC2 instance is running. That will not work!
e.g. the _class _EC2.Waiter.InstanceTerminated polls EC2.Client.describe_instances() every 15 seconds until a successful state is reached. An error is returned after 40 failed checks (10 minutes if you were working it out).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
defshutdown_instance_wait(Iid):shutdown_instance_waiter=ec2.get_waiter('instance_stopped')try:shutdown_instance_waiter.wait(InstanceIds=Iid)ifverbose:print(f"Instance {Iid[0]} has shutdown successfully")exceptbotocore.exceptions.WaiterErroraser:if"Max attempts exceeded"iner.message:print("error on shutdown_instance_wait")print(f"Instance {Iid[0]} did not shutdown in 600 seconds")FailedIid.append(Iid[0])else:print("error on shutdown_instance_wait_else")print(er.message)FailedIid.append(Iid[0])returninitial_status
detach_old_ebs
Now that the instance is safely stopped we can detach the EBS volume being processed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
defdetach_old_ebs():try:globalebs_waitdetach_ebs=ec2.detach_volume(Device=volatt,InstanceId=Iid[0],VolumeId=volid,)ifverbose:print(f"Waiting for volume {volid} to be detached")ebs_wait=volidcreate_ebs_wait()exceptbotocore.exceptions.ClientErroraser:print("error on detach_old_ebs")print(er.response['Error']['Message'])FailedIid.append(Iid[0])
create_ebs_wait
We need to ensure the volume is detached before we proceed with either trying to attach another volume to its device attachment or before trying to terminate it. This waiter turned into a bit of a pain. The proceeding steps would not always work as they were seeing the original EBS as still attached. In the end the introduction of a 5 second timer resolved the issue which implies the waiter is not that reliable for the “volume_available” wait! No tip for him! (Yes, I was dying to say that :) ). We will use this waiter again when we attach the new EBS volume
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
defcreate_ebs_wait():globalebs_checkebs_check=[]ebs_check.append(ebs_wait)try:create_ebs_available_waiter=ec2.get_waiter('volume_available')create_ebs_available_waiter.wait(VolumeIds=ebs_check)time.sleep(5)#Not impressed that I needed to use this. googling proved I was not alone in thisexceptbotocore.exceptions.WaiterErroraser:if"Max attempts exceeded"iner.message:print(f"Volume {enc_ebs_id} was not available in the max wait time")else:print("error on create_ebs_wait")print(er.message)FailedIid.append(Iid[0])
snapshot_volumes
Create an unencrypted snapshot of the now detached unencrypted EBS volume. Add the tags we retrieved from the EBS volume to the snapshot.
defsnapshot_volumes():globalsnap_shotglobalunenc_snapshottry:snap_shot=ec2.create_snapshot(VolumeId=volid,Description=snap_prefix,TagSpecifications=[{'ResourceType':'snapshot','Tags':tags_list,}],)create_snapshots_wait(snap_shot)unenc_snapshot=snap_shot.get('SnapshotId')ifverbose:print(f"Unencrypted snapshot : {unenc_snapshot}")exceptbotocore.exceptions.ClientErroraser:print("error on snapshot_volumes")print(er.response['Error']['Message'])FailedIid.append(Iid[0])
create_snapshots_wait
This function will be called by the snapshot_volumes function and also the snapshot_copy function. Both of these rely upon the same waiter, snapshot_completed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
defcreate_snapshots_wait(snap_shot):globalsnap_checksnap_check=[]snap_check.append(snap_shot.get('SnapshotId'))try:create_snapshot_waiter=ec2.get_waiter('snapshot_completed')ifverbose:print(f"Waiting for {snap_check[0]}")create_snapshot_waiter.wait(SnapshotIds=snap_check)exceptbotocore.exceptions.WaiterErroraser:if"Max attempts exceeded"iner.message:print("error on create_snapshots_wait")print(f"Instance {Iid[0]} did not shutdown in 600 seconds")FailedIid.append(Iid[0])else:print("error on create_snapshots_wait_else")print(er.message)FailedIid.append(Iid[0])
snapshot_copy
Now we make use of the unencrypted snapshot and create a copy turning on encryption. Again we make use of the original EBS volume tags. We call the preceding wait function again.
defcreate_ebs():globalenc_ebs_idtry:enc_ebs=ec2.create_volume(AvailabilityZone=az2,Encrypted=True,SnapshotId=enc_snapshot,TagSpecifications=[{'ResourceType':'volume','Tags':tags_list,}],)enc_ebs_id=enc_ebs.get('VolumeId')ebs_wait=enc_ebs_idcreate_ebs_wait()exceptbotocore.exceptions.ClientErroraser:print("error on create_ebs")print(er.message)FailedIid.append(Iid[0])
We call the previously describe waiter we used for the EBS volume detach function
attach_new_ebs
We are as sure as we can be (without adding a further check) that the attachment is now free so we can attach the new EBS volume
1
2
3
4
5
6
7
8
9
10
11
12
13
defattach_new_ebs():try:ifverbose:print(f"Attaching volume {enc_ebs_id} to {volatt}")attach_ebs=ec2.attach_volume(Device=volatt,InstanceId=Iid[0],VolumeId=enc_ebs_id,)exceptbotocore.exceptions.ClientErroraser:print("error on attach_new_ebs")print(er.response['Error']['Message'])FailedIid.append(Iid[0])
set_delete_terminate
Ensures that the EBS volumes have the value deleteonetermination set to true. Without this you can end up with a lot of orphaned EBS volumes. It’s not required for volume encryption just some housekeeping.
This one depends on how much confidence you have. It will terminate the original unencrypted EBS volume…gulp (You should already have a backup / snapshot lifecycle in place to give you that warm feeling)
1
2
3
4
5
6
7
8
9
defdelete_ebs():try:delete_ebs=ec2.delete_volume(VolumeId=volid)exceptbotocore.exceptions.ClientErroraser:print("error on delete_ebs")print(er.response['Error']['Message'])FailedIid.append(Iid[0])
start_instance
And finally you want to start the instance back up if this was the original state.
1
2
3
4
5
6
7
8
defstart_instance(Iid):try:ec2.start_instances(InstanceIds=Iid)SuccessIid.append(Iid[0])exceptbotocore.exceptions.ClientErroraser:print("error on start_instance")print(er.response['Error']['Message'])FailedIid.append(Iid[0])
Conclusion
There we go, a python script which retrieves AWS EC2 instances across multiple AWS Accounts assuming roles where required. The instances are checked for unencrypted EBS volumes, if found they are processed to provide the end result of encrypted EBS volumes.
The script will output a list of successful and failed instance id’s for checking.
As usual, here is a link to my GitHub repo containing the source code. Feel free to ask any question or suggest any improvements.
Till next time.
Disclaimer: This script will terminate EBS volumes. Use at your own risk!