Skip to content

Instantly share code, notes, and snippets.

@Malayke
Created August 15, 2025 04:17
Show Gist options
  • Select an option

  • Save Malayke/434a0b2a1c4946f8540a8b376f4c6b82 to your computer and use it in GitHub Desktop.

Select an option

Save Malayke/434a0b2a1c4946f8540a8b376f4c6b82 to your computer and use it in GitHub Desktop.
aws secrets manager auto rotate lambda function
#!/bin/bash
# put policy.json to location secretsManager/policy.json
# Create inline policy directly on the role
aws iam put-role-policy \
--role-name $ROLE_NAME \
--policy-name SecretsManagerAccess \
--policy-document file://secretsManager/policy.json
# Extract values from your JSON file
LAMBDA_FUNCTION_NAME="LAMBDA_FUNCTION_NAME"
SECRET_ARN="arn:aws:secretsmanager:REGION:ACCOUNT_ID:secret:SECRET_NAME"
# Apply the permission using the values from your JSON policy
aws lambda add-permission \
--function-name $LAMBDA_FUNCTION_NAME \
--statement-id SecretsManagerInvokePermission \
--action lambda:InvokeFunction \
--principal secretsmanager.amazonaws.com \
--source-arn $SECRET_ARN
{
"Version": "2012-10-17",
"Id": "default",
"Statement": [
{
"Sid": "SecretsManagerInvokePermission",
"Effect": "Allow",
"Principal": {
"Service": "secretsmanager.amazonaws.com"
},
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:us-west-1:588656319433:function:secretManagerRotation",
"Condition": {
"StringEquals": {
"AWS:SourceArn": "arn:aws:secretsmanager:REGION:ACCOUNT_ID:secret:SECRET_NAME"
}
}
}
]
}
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import boto3
import logging
import os
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""Secrets Manager Rotation Template
This is a template for creating an AWS Secrets Manager rotation lambda
Args:
event (dict): Lambda dictionary of event parameters. These keys must include the following:
- SecretId: The secret ARN or identifier
- ClientRequestToken: The ClientRequestToken of the secret version
- Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)
context (LambdaContext): The Lambda runtime information
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not properly configured for rotation
KeyError: If the event parameters do not contain the expected keys
"""
arn = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
logger.info(f"SecretId: {arn}, ClientRequestToken: {token}, Step: {step}")
# Setup the client
service_client = boto3.client('secretsmanager')
# Make sure the version is staged correctly
metadata = service_client.describe_secret(SecretId=arn)
if not metadata['RotationEnabled']:
logger.error("Secret %s is not enabled for rotation" % arn)
raise ValueError("Secret %s is not enabled for rotation" % arn)
versions = metadata['VersionIdsToStages']
if token not in versions:
logger.error("Secret version %s has no stage for rotation of secret %s." % (token, arn))
raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn))
if "AWSCURRENT" in versions[token]:
logger.info("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn))
return
elif "AWSPENDING" not in versions[token]:
logger.error("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
if step == "createSecret":
create_secret(service_client, arn, token)
elif step == "setSecret":
set_secret(service_client, arn, token)
elif step == "testSecret":
test_secret(service_client, arn, token)
elif step == "finishSecret":
finish_secret(service_client, arn, token)
else:
raise ValueError("Invalid step parameter")
def create_secret(service_client, arn, token):
"""Create the secret
This method first checks if a secret exists by calling get_secret_value with the passed-in ClientRequestToken.
If there's no secret, it creates a new secret version with the token as the VersionId. Then it generates a new
secret value with get_random_password. Next it calls put_secret_value to store it with the staging label AWSPENDING.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
"""
# Make sure the current secret exists
try:
service_client.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")
except service_client.exceptions.ResourceNotFoundException:
logger.error("createSecret: Secret %s does not exist." % arn)
raise
# Now try to get the secret version with AWSPENDING stage, if that fails, put a new secret
try:
service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING")
logger.info("createSecret: Successfully retrieved AWSPENDING secret for %s." % arn)
except service_client.exceptions.ResourceNotFoundException:
# Get exclude characters from environment variable - ensure valid characters for the database/service
exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\'
# Generate a random password with excluded characters to ensure compatibility
passwd = service_client.get_random_password(ExcludeCharacters=exclude_characters)
# Put the secret with AWSPENDING staging label for idempotency
service_client.put_secret_value(
SecretId=arn,
ClientRequestToken=token,
SecretString=passwd['RandomPassword'],
VersionStages=['AWSPENDING']
)
logger.info("createSecret: Successfully put secret for ARN %s and version %s." % (arn, token))
def set_secret(service_client, arn, token):
"""Set the secret
This method changes the credentials in the database or service to match the new secret value in the AWSPENDING version.
It includes security checks to prevent confused deputy attacks by validating that AWSCURRENT and AWSPENDING
credentials are for the same resource.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ValueError: If security validation fails
ResourceNotFoundException: If the secret versions don't exist
"""
logger.info("setSecret: Setting secret for ARN %s with token %s." % (arn, token))
def test_secret(service_client, arn, token):
"""Test the secret
This method tests the AWSPENDING version of the secret by using it to access the database or service.
Rotation functions based on templates test the new secret by using read access to validate that
the user can login with the new password and has the expected permissions.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ValueError: If the secret test fails
ResourceNotFoundException: If the AWSPENDING secret doesn't exist
"""
logger.info("testSecret: Testing secret for ARN %s with token %s." % (arn, token))
def finish_secret(service_client, arn, token):
"""Finish the secret
This method finalizes the rotation process by moving the AWSCURRENT label from the previous secret version
to the new version. This single API call also removes the AWSPENDING label. Secrets Manager automatically
adds the AWSPREVIOUS staging label to the previous version to retain the last known good version.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn does not exist
ValueError: If the token version doesn't have AWSPENDING stage
"""
# First describe the secret to get the current version and validate stages
metadata = service_client.describe_secret(SecretId=arn)
versions = metadata["VersionIdsToStages"]
# Validate that the token version has AWSPENDING stage
if token not in versions:
logger.error("finishSecret: Version %s not found in secret %s" % (token, arn))
raise ValueError("Version %s not found in secret" % token)
if "AWSPENDING" not in versions[token]:
logger.error("finishSecret: Version %s does not have AWSPENDING stage" % token)
raise ValueError("Version %s does not have AWSPENDING stage" % token)
# Check if this version is already AWSCURRENT
if "AWSCURRENT" in versions[token]:
logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (token, arn))
return
# Find the current version that has AWSCURRENT stage
current_version = None
for version in versions:
if "AWSCURRENT" in versions[version]:
current_version = version
logger.info("finishSecret: Found current version %s with AWSCURRENT stage" % version)
break
if current_version is None:
logger.error("finishSecret: No version found with AWSCURRENT stage")
raise ValueError("No version found with AWSCURRENT stage")
# Move AWSCURRENT stage to the new version and remove AWSPENDING
# This single API call:
# 1. Moves AWSCURRENT from current_version to token version
# 2. Removes AWSPENDING from token version
# 3. Secrets Manager automatically adds AWSPREVIOUS to the previous current_version
logger.info("finishSecret: Moving AWSCURRENT from version %s to version %s" % (current_version, token))
service_client.update_secret_version_stage(
SecretId=arn,
VersionStage="AWSCURRENT",
MoveToVersionId=token,
RemoveFromVersionId=current_version
)
logger.info("finishSecret: Successfully moved AWSCURRENT stage to version %s for secret %s." % (token, arn))
logger.info("finishSecret: AWSPENDING label removed from version %s and AWSPREVIOUS automatically added to version %s" % (token, current_version))
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"secretsmanager:DescribeSecret",
"secretsmanager:GetSecretValue",
"secretsmanager:PutSecretValue",
"secretsmanager:UpdateSecretVersionStage"
],
"Resource": "arn:aws:secretsmanager:REGION:ACCOUNT_ID:secret:SECRET_NAME"
},
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetRandomPassword"
],
"Resource": "*"
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment