Skip to content

6. Module Creation [In Progress]

WebbinRoot edited this page Mar 19, 2025 · 8 revisions

Important

Turns out its hard to make an easily modular service everyone can add to :) At the moment think of this like 0.5 instructions and are subject to change as I try to improve quality of live/coding. Feel free to reach out with questions.

New Services/Enumeration Modules

This covers adding a new service/enumeration module in depth. If you want to leverage an existing service and data collected, you can skip to the exploit/misc modules. I have notes interspersed throughout this section to help provide some context/GCP knowledge to help in coding, and we walk through adding the enum_secrets.py Secrets Manager module via the examples.

Introduction:

GCPwn supports numerous "services" like Cloud Storage or Secret Manager. Within each service there are usually specific "resources". For example,

  • Cloud Storage --> buckets, blobs, HMAC keys
  • Cloud Compute --> instances
  • Secrets Manager --> secret, secret versions

Part 1: Research and Initial Setup

Note

For a given principal (user, service account, etc.) you can set the principal's permissions (i.e. IAM bindings) at the project-level. However, some resources allow you to be even more specific and set IAM bindings at the resource level. For example, you might grant "scott" roles/viewer permissions at the project-level, and then ALSO make "scott" the "storage admin" for a specific bucket. In this case, there are bindings describing scott's permissions on 2 separate resources: the project & the bucket. Looking at just the project bindings and not factoring in the bucket bindings might give you the wrong idea of scott's true permissions over the target bucket. To fully dive into IAM you can check out documentation here

  1. Review Service Resource-Level Permissions: Review the select service you are planning to add to GCPwn to see if they support resource-level permissions. You can find the full list of GCP resources that support policies here.

    1. Example: Secret Manager does support resource-level permissions as it allows IAM bindings on "Secrets" as seen in the screenshot below. In other words, we could grant "scott" the "roles/viewer" permissions at the project-level, but then for specific secret(s) attach IAM binding(s) that grants "scott" different permissions. Not every resource within a service would support resource-level permissions however. Note “Secret Version” resources are not listed. image
  2. Review Service Resource Entities: You can try out the service (ex. Cloud Storage) in your own GCP account to get an idea of what resources are created/needed. For the source of truth use Google to find the Python SDKs for the target service you are planning to add. Looking at the SDK documentation, Google usually has a "Client" object from which you call list/get/describe methods from to get back resources for the service (ex. buckets, blobs, secrets, functions, etc.). These list/get/describe methods can give you a good idea of what you will be enumerating.

    1. Example: Google the Python SDK for GCP Secret Manager. Note the “SecretManagerServiceClient" where we would eventually pass in our credentials to then call corresponding methods. Once you choose the client in the left-hand navigation menu, note the right-hand ribbon contains all the read/write/delete/etc. methods. We can see there is a method called "list_secrets" & "list_secret_versions" so we have a god idea that we will be gathering secrets and secret versions. Some Googles and trying out the service helps me understand a "secret" is the overall entity it contains "secret versions" where the actual secret values can be gathered from. imageimageimage
  3. Add Database Table Definitions for Resources: GCPwn stores data in SQLite tables with one table generally representing one resource (one table for Cloud Storage buckets, one table for Cloud Storage blobs, one table for Cloud Functions functions, etc.). GCPwn defines table definitions in utils/database.yaml. Behind the scenes GCPwn parses the YAML file to create all the tables on first startup. At this point you should have an idea of what resources are going to enumerate (ex. buckets, blobs, secrets, functions, etc.). To add a table definition in utils/database.yaml:

    1. Table Names: Table names should be in the format [service_name]-[resource_name].
    2. Table Columns: GCP resources in python are usually represented as "Objects" from a programming perspective. The SDK lists the return types and, in turn, the fields for data returned from the get/list/describe API calls we were looking at in the previous step. For example, the Cloud Functions SDK defines a "get_function" method which in turn specifies a return type defined here which has several "Attributes" like "name", "build_config", etc. The attributes in this object correlate to the fields in the table definitions. Note some of these attributes are JSONs and you can keep clicking into them to find nested relationships. For now for simplicity sake, only add the column names that match the attributes on the main object and data will be saved as JSON (see ex. below for more info).
    3. Primary Keys: The primary key in the YAML definition will always inherently include your workspace ID and doesn't need to be added. You usually you want to specify the project_id and/or name/id/etc for whatever resource you are saving.
    4. Example: Look through the methods defind in the Google Python SDK for the Secret Manager per the previous step. We know we are going to be saving/enumerating "secrets" and "secret versions". With this in mind, review the "get_secret" method. Note the "Returns" field has a hyperlink under "Type" to "google.cloud.secretmanager_v1.types.Secret". Click into that you will see the "Secret" object and the top-level fields like name, replication, create_time, labels, etc. Remember we only want top-level keys so fields like “replication” that are hyperlinked with a depth will be stored as a JSON, and would just be "replication" in the YAML file. In this case the "name" field will always be unique so we can just set that as our primary key.imageimageimage
  4. Set Up Folder Structure: Begin setting up folders for your module. Run git clone https://github.com/NetSPI/gcpwn.git to get the latest GCPwn code. If needed (service is new to the tool), create a folder structure within "Modules" for your service as seen in the example.

    1. Example: We are adding SecretsManager. Note the folder structure added below under the Modules folder. image

Part 2 - Setting up Base Module Features

Important

At this point it is assumed you have the tables you need defined in utils/database.yaml, have the basic folder structure setup, and have looked through the Python SDKs for your service/resources and understand what methods you will be needing.

  1. Add Module Details to Catalog: Add details for your specific module you want to create to utils/module_mappings.yaml.
    1. module_name: Should start with enum_[resource(s)] or exploit_[action_shortdescription]. Note you can have one enumeration module enumerate multiple resources, whatever makes sense. For example, enum_buckets enumerates both buckets and blobs.
    2. module_category: should be Enumeration, Exploit, or Unauthenticated.
    3. info_blurb: is up to whatever you want to add as a help menu. version is just the current version of your module and date.
    4. author is where you can put your alias.
    5. location is where your module is located in the folder structure and would be Modules.[Service].[module_category].[module_name]

Module Definition Layout

[service_name]_modules
-	module_name: <module_name_1>
-	module_category: <module_category>
-	info_blurb: <info_blurb>
-	version: version_num – Date
-	author:< Your name/handle>

Example: Review the module we added for enum_secrets. This will enumerate both secrets and secret versions. image

  1. Add Starting Module Template: Create a file in Modules/[Service]/Enumeration/[module_name].py using the template shown below. Note the incoming arguments for the default run_module function are described here.

Base Module Template

# Entrypoint; Try-Catch Exists on Caller
def run_module(user_args, session, first_run = False, last_run = False, output_format = ["table"]):

    # Set up Argparser to handle flag arguments
    parser = argparse.ArgumentParser(description="<Fill In>", allow_abbrev=False)
 
    # Debug/non-module specific
    parser.add_argument("-v","--debug",action="store_true",required=False,help="Get verbose data during the module run")

    args = parser.parse_args(user_args)
    
    debug = args.debug

Example: We added our base template for Modules/SecretManager/Enumeration/enum_secrets.py. If you added your module successfully, you should be able to run python3 main.py to start GCPwn, run modules list or modules, and see your newly created module in the list of possible modules. Note "enum_secrets" now appears.

image

  1. Add Default Parameters & Service Client: Add some default arguments and the "Client" object that will be used for the module API calls:
    1. Default Parameters:
      1. debug, project_id = args.debug, session.project_id: This will set debug as True/False for our run that we can use to print more verbose output for debugging purposes. The project_id variable is set to session.project_id, or our current project_id when the module is running. The nice thing about GCPwn is if you run one module across many projects, session.project_id will always hold the project_id of the current project you are in so no need for you to manage the list.
      2. action_dict = {}: action_dict is used in all modules and is a dictionary that stores IAM permissions enumerated passively as we go through our process. We will dive into this in-depth later.
      3. [resource]_list = {}: A dictionary to store resource as we enumerate them. This is then passed into a function at the end that prints our STDOUT in nice table, txt, or csv output. To jump ahead the format of this will usually be something like {project_id_1: [Object_1, Object_2]} or {project_id_1: { Object_1: [String1, String2], Object_2: [String1]} depending on your service
    2. Service Client: To call GCP methods, we have to create a GCP client for our specific service. IF your familiar with boto3 for AWS, this is like making a client in boto3. The code for defining this client can be seen throughout the Google Python SDKs for your specific service in the example code snippets. Usually it needs credentials which you are already given in the session.credentials value. So in general you will add a line like `[resource]_client = [resource_library].[resource_client_object](credentials = session.credentials). See the example below for more details.
    3. Example: Add the necessary default parameters and define the Client for the SecretManager service. Note you can find snippets of how to define the snippets for this by looking at some of the example methods in the SDK documentation (although make sure to pass in "credentials" in our case). image

Add Default Parameters + Service Client

# Entrypoint; Try-Catch Exists on Caller
def run_module(user_args, session, first_run = False, last_run = False, output_format = ["table"]):

    # Set up Argparser to handle flag arguments
    parser = argparse.ArgumentParser(description="<Fill In>", allow_abbrev=False)
 
    # Debug/non-module specific
    parser.add_argument("-v","--debug",action="store_true",required=False,help="Get verbose data during the module run")

    args = parser.parse_args(user_args)
    
    debug, project_id = args.debug, session.project_id
    action_dict, secrets_list = {}, {}
    secret_client = secretmanager_v1.SecretManagerServiceClient(credentials = session.credentials)   

    # With our authenticated client, we can begin calling secret_client.list_secrets, secret_client.get_secrets, etc.

Part 3 - Adding Service-Specific Functionality

Note

GCP resources tend to have common names and then full formal names (similar to an ARN from AWS). So a GCP function might be called scotts-function, but the full name might be projects/scotts_project_id/regions/europe-west1/functions/scotts-function. You will see this arise in certain API calls when some return common names or full names. the Python SDK will also usually ask for name or parent parameters. In some cases name aligns with teh full name as previously demonstrates which will be slightly different depending on your service, and parent usually references the parent project ID of the resource in teh format projects/[project_id/

  1. Add Conditional for Manual/Automated Listing of Resources: Thinking about our enumeration module, our first objective will be listing resources in a specific service. To generate an initial list of resources, we want to give the user two options: manually supply a list of resource names or pass in no flags (and automatically pick up resources through list calls). For the manual process, we will add argparse arguments allowing users to manually pass in either a list of resource names via STDIN or a file with resource names. This is a "mutually exclusive" group as we need the user to supply either option or neither option. Once argparse is set up, create the framework for your conditional to handle the two cases mentioned earlier.

    1. Example: Create an exclusive group and allow a user to pass in either a list of secret names via the STDIN, or the name of a file where the list of secret names will be. Add a conditional at the end saying if the user manually supplied secret names (if args.secret_names or args.secret_names_file:) go through Path 1, but if they supplied no flags, go through Path 2.

Added Base Module Template (Secret Manager Specific)

# Entrypoint; Try-Catch Exists on Caller
def run_module(user_args, session, first_run = False, last_run = False, output_format = ["table"]):

    # Set up Argparser to handle flag arguments
    parser = argparse.ArgumentParser(description="<Fill In>", allow_abbrev=False)
    exclusive_access_key_group = parser.add_mutually_exclusive_group(required=False)
    exclusive_access_key_group.add_argument("--secret-names", type=str, help="Secrets to check in the format projects/[project_id]/secrets/[secret_name]")
    exclusive_access_key_group.add_argument("--secret-names-file", type=str, help="File name to get secrets in format projects/[project_id]/secrets/[secret_name] per line")
    # Debug/non-module specific
    parser.add_argument("-v","--debug",action="store_true",required=False,help="Get verbose data during the module run")

    args = parser.parse_args(user_args)
    
    debug, project_id = args.debug, session.project_id
    action_dict, secrets_list = {}, {}
    secret_client = secretmanager_v1.SecretManagerServiceClient(credentials = session.credentials)   

    # Path 1: if the user supplies arguments
    if args.secret_names or args.secret_names_file:
        pass

    # Path 2: if the user supplies no arguments
    else:
       pass

Note

Unlike AWS with boto3, GCP has individual python packages for EVERY service. You can find the pip package name for your specified GCP python service via the Google Python SDK page usually under the "Overview" tab as shown in the example

  1. Adding to Requirements.txt: If not already added to requirements.txt, add your pip package and specify the correct version number.

    1. Example: Observe the pypi package link for the Secret manager SDK. Add it to requirements.txt image image image image
  2. Create util_helpers: Each service relies on its local utils/util_helpers.py python file for libraries and functions amongst the modules. util_helpers.py should have all libraries needed including the library just added to requirements.txt at the top along with several error handling libraries. In order for enum_[resource].py to see util_helpers.py, add an import statement at the top of enum_[resource].py.

    1. Example: Setup utils/util_helpers.py and import the util_helpers.py to enum_secrets.py function.

Modules/SecretsManager/Enumeration/enum_secrets.py

from Modules.SecretsManager.utils.util_helpers import *

# Entrypoint; Try-Catch Exists on Caller
def run_module(user_args, session, first_run = False, last_run = False, output_format = ["table"]):
[TRUNCATED]

Modules/SecretsManager/utils/util_helpers.py

from google.cloud import secretmanager_v1
import argparse
from UtilityController import *
import pandas as pd
import os

from google.api_core.exceptions import PermissionDenied
from google.api_core.exceptions import NotFound
from google.api_core.exceptions import Forbidden
from google.api_core.exceptions import InvalidArgument

from google.iam.v1 import iam_policy_pb2

Note

In GCP Python you usually have to make a "request object" with the necessary parameters, and then pass that "request object" into the actual method your calling from the client. For example, in the code below you make a "GetSecretRequest" object and pass in the "name" parameter, and then pass in that GetSecretRequest object to "get_secret" as an argument. Usually you can click into the object name (in this case "google.cloud.secretmanager_v1.types.GetSecretRequest") to see what parameters are needed and what they mean. image

  1. Create List/Get Helper Functions: Begin populating utils/util_helpers.py with get/list/describe methods from the Google Python SDK for your specific service. Rewrite them each in utils/util_helpers.py with the following additions:
    1. Function Name & Parameters: Create a function definition that matches the expected action you will be performing (ex. list_secrets). Set the parameter values needed for the specific function. This should be of the format: def function_name([resource]_client, <parameters_needed>, debug = False). <parameters_needed> will depend on a case by case basis but it might be the project_id or perhaps the resource name. See example below.
    2. Variable Definition: If working with a list, set [resource]list equal to [], or if writing a get[resource] equivalent function, set the variable equal to None. This list/variable will be returned at the end of the function execution depending on if it completes successfully.
    3. Add Try-Catch Exceptions: Wrap the function from the SDK code sample in try-catch exceptions. Leverage the exceptions from the import statements to gracefully handle different conditions. These can be broken down as follows with examples shown below:
      1. If Forbidden occurs, if the user does not have permissions to access the API, call UtilityTools.print_403_api_denied(<permission_name>, resource_name = <resource_name> and return None. If the API itself is disabled call UtilityTools.print_403_api_disabled(<service_name>, <resource_name>) and return "Not Enabled"
      2. Have a generic catch-all Exception at the end of the function that calls UtilityTools.print_500(<resource>,<permissions>,<error_code>)
    4. Add Debug Statements: Start and end the function with a debug statement stating the function has started and ended.
    5. Example: Look through the Python SDK documentation for Secret Manager. Note the list_secrets method with the corresponding code example here. In this case the “parent” would be the GCP path to where the resource exists. You can delve into what each requests need by looking at the GCP documentation for list_resource and get_resource which have hyperlinks describing the Requests and usually what the required variables are image

Modules/SecretsManager/utils/util_helpers.py

# List Method Template
def list_secrets(
        secret_client, 
        parent, 
        debug = False
    ):

    if debug: print(f"[DEBUG] Listing secrets for project {parent} ...")
    
    secrets_list = []

    try:
        request = secretmanager_v1.ListSecretsRequest(
            parent=parent,
        )

        secrets_list = list(secret_client.list_secrets(request=request))

    except Forbidden as e:

        if "Permission 'secretmanager.secrets.list' denied for resource" in str(e):
            
            UtilityTools.print_403_api_denied("secretmanager.secrets.list", resource_name = parent)
        
            return None
            
        elif "Secret Manager API has not been used in project" in str(e) and "before or it is disabled" in str(e):
            
            UtilityTools.print_403_api_disabled("Secrets Manager", parent)

            return "Not Enabled"

        elif "403 Permission denied on resource project" in str(e):

            UtilityTools.print_403_api_denied("secretmanager.secrets.list", resource_name = parent)

        return None

    except NotFound as e:

        if f"was not found" in str(e):
            UtilityTools.print_404_resource(parent)

    except Exception as e:
        
        UtilityTools.print_500(parent, "secretmanager.secrets.list", e)
        return None

    if debug: print(f"[DEBUG] Successfully called list_secrets for {parent} ...")
    
    return secrets_list

# Get Method Template
def get_secret(
        secret_client, 
        secret_name, 
        debug = False
    ):

    if debug: print(f"[DEBUG] Getting secret for project {secret_name} ...")
    
    secret_meta = None

    try:

        request = secretmanager_v1.GetSecretRequest(
            name=secret_name,
        )

        secret_meta = secret_client.get_secret(request=request)



    except Forbidden as e:
        
        if "does not have secretmanager.secrets.get access" in str(e):

            UtilityTools.print_403_api_denied("secretmanager.secrets.get", resource_name = secret_name)
                    
        elif "Secret Manager API has not been used in project" in str(e) and "before or it is disabled" in str(e):

            UtilityTools.print_403_api_disabled("Secrets Manager", secret_name)

            return "Not Enabled"

        return None

    except NotFound as e:
        if "404 Secret" in str(e):
            UtilityTools.print_404_resource(secret_name)

        return 404

    except Exception as e:

        UtilityTools.print_500(secret_name, "secretmanager.secrets.get", e)

        return None

    if debug: print(f"[DEBUG] Successfully called list_secrets for {secret_name} ...")
    
    return secret_meta
  1. Hashable Objects Explained: We commonly will use objects as keys in dictionaries and lists. Normally, when you are using an "object" in Python from a programming perspective, the object needs to be "hashable" in order to use it as a key in a dictionary or trying to compare it in a list. This makes sense in practice. After all, if you want to check if Object A already exits in a list, then you need some way to compare Object A with Object B, Object C, etc. GCP objects do not seem hashable by default. For example, you might call the get_secret() GCP method, get a GCP Secret Object back, and want to set a dictionary like dictionary[secret_object] = "test". This will error out in GCP. So I use a hacky approach of wrapping GCP objects in a class to make them hashable so they can then be used as keys in dictionaries (as will be seen in the next step). The object is essentially the same with the same attributes/function access, but now I can use the object in lists, sets, dictionary keys, etc. It adds one attribute called "validated" which is True or False depending on if we can say for certainty the resource actually exists (defaults to False). Don't worry if you don't understand this fully. Basically, make a class like that below and just change the names slightly for your resource and specify what in eq lets you compare your objects (secrets, id, name?). Note, I have an open issue with Google regarding this (here)[https://github.com/googleapis/google-cloud-python/issues/13045].

Examples: In my case, I know a secret's "name" attribute is good enough to serve as a comparer. So I make a "HashableSecret" class to essentially serve as a wrapper. I just pass "Secret" GCP objects into the class and then get an object that is essentially the same but that I can now use in dictionaries, lists, etc.

Modules/SecretsManager/Enumeration/enum_secrets.py

class HashableSecret:
    def __init__(self, secret, validated = True):
        self._secret = secret
        self.validated = validated

    def __hash__(self):
        # Hash based on the name or any other combination of unique attributes
        return hash(self._secret.name)

    def __eq__(self, other):
        # Compare based on the name or any other combination of unique attributes
        return isinstance(other, HashableSecret) and self._secret.name == other._secret.name

    def __getattr__(self, attr):
        # Delegate attribute access to the original secret object
        return getattr(self._secret, attr)

    def __repr__(self):
        # Optional: Make it easier to read the wrapped object
        return f"HashableSecret({self._secret.name})"

[TRUNCATED]
  1. List Format Explained: Quick interruption to dive a bit deeper into the growing dictionary of resources as we enumerate them. As mentioned before this is stored in some variable called something like [resource]_list. In our example, this is secrets_list (it's actually a dictionary, apologies for the confusing terminology at this time). These variables holding resource data have to be in a dictionary of the following 2 formats. The formatting depends on what resources you are enumerating if they are single dimension or nested. For example, if you enumerate Cloud Functions, that's just one object so you would use the single dimension option. However, if your enumerating Google Cloud Storage buckets, buckets can have blobs in them and so you might have a dictionary with more depth in the nested depth example. At this time the nested dimension format needs the innermost list to be of a basic primitive type like a string.

Single Dimension

{
"project_id_1": [
  Object_1,
  Object_2,
  Object_3
],
"project_id_2": [
  Object_4
]
}

Nested

{
"project_id_1": {
  Object_1: [
    Sub_Resource_1
  ],
  Object_2: [
    Sub_Resource_2
    Sub_Resource_3
  ],
  Object_3: []
},
"project_id_2": {
  Object_4: []
}
}

Example: We are going to be storing secrets which in turn can have secret versions inside of them. So an example of a final dictionary for secrets_list might be:

{
"project_id_1": {
  Secret_Object_1: [
    "secret_name_1_version_1"
  ],
  Secret_Object_2: [
    "secret_name_2_version_1"
    "secret_name_2_version_2"
  ],
  Secret_Object_3: ["secret_name_3_version_1"]
},
"project_id_2": {
  Secret_Object_4: ["secret_name_4_version_1"]
}
}
  1. Add to Initial List Workflows: Expand on the two paths (manually user supplied resources + automated list retrieval) for listing resources.
    1. Manual Approach: If the user supplies a list of specific resources either via STDIN or via a filename, convert that into a list of resource names. Add some exception logic as seen in the example below if the file does not exist. Note resources supplied have to be in the long-name format for GCP which is dependent on your service (one ex. projects/[project_id]/secrets/[secret_name]) You can leverage UtilityTools.validate_input_format([list_value],field_count) as seen in the example to check the correct number of fields are supplied beteween the / delimiters. It is important to note no GCP API calls have been made at this point. We are just adding some input logic to get the user content into a list variable.
    2. Automated Approach: If the user supplies no resources, call the list_[resource] function we just made in utils/util_helpers.py. Note list_resources requires the parent parameter which we can define as parent = f"projects/{project_id}".If the service is not enabled or an exception error returned None, set the value for the project_id in our [resource]_list as an empty list or dictionary. If resources were returned save them to the project_id key in [resource_name]_list while also wrapping the returned objects in our "Hashable" class.

Example: Note the current code in enum_secrets.py and util_helpers.py. Note as mentioned before our Secrets Manager our final dictionary will look like {project_id_1:{SecretObject1:[version1, version2],SecretObject2:[version3]} so when we get resources back from our list_secretes API call we add the keys to the dictionary below. The full resource names supplied should also match projects/[project_id]/secrets/[secret_name] hence why we are checking for 4 fields in UtilityTools.validate_input_format()

Modules/SecretsManager/Enumeration/enum_secrets.py

from Modules.SecretsManager.utils.util_helpers import *

# Entrypoint; Try-Catch Exists on Caller
def run_module(user_args, session, first_run = False, last_run = False, output_format = ["table"]):

    # Set up Argparser to handle flag arguments
    parser = argparse.ArgumentParser(description="Enumerate HMAC Keys Options", allow_abbrev=False)
    
    exclusive_access_key_group = parser.add_mutually_exclusive_group(required=False)
    exclusive_access_key_group.add_argument("--secret-names", type=str, help="Secrets to check in the format projects/[project_id]/secrets/[secret_name]")
    exclusive_access_key_group.add_argument("--secret-names-file", type=str, help="File name to get secrets in format projects/[project_id]/secrets/[secret_name] per line")

    parser.add_argument("-v","--debug",action="store_true",required=False,help="Get verbose data during the module run")

    args = parser.parse_args(user_args)
    
    debug, project_id = args.debug, session.project_id
    action_dict, secrets_list = {}, {}
    
    secret_client = secretmanager_v1.SecretManagerServiceClient(credentials = session.credentials)   

    if args.secret_names or args.secret_names_file:
        
        if args.secret_names:

            secrets_list_rudimentary = args.secret_names.split(",")

        elif args.secret_names_file:

            secrets_file = args.secret_names_file

            try:

                secrets_list_rudimentary = [line.strip() for line in open(secrets_file)]
                
            except FileNotFoundError:
                print(f"{UtilityTools.RED}[X] File {secrets_file} does not appear to exist. Exiting...{UtilityTools.RESET}")
                return -1

        # Check if input is valid
        status, incorrect_input = UtilityTools.validate_input_format(secrets_list_rudimentary, 4)
        if status != 0: 
            print(f"{UtilityTools.RED}[X] Value \"{incorrect_input}\" is incorrect. Must be 'projects/[project_id]/secrets/[secret_name] Please try again...{UtilityTools.RESET}")
            return -1

        for secret_name in secrets_list_rudimentary:

            _, secret_project_id, _, common_secret_name = secret_name.split("/")
            secret_value = HashableSecret(Instance(name = secret))
            secret_value.validated = False
            secrets_list.setdefault(secret_project_id, set([])).add(secret_value)  

    else:

        parent = f"projects/{project_id}"

        every_secret = list_secrets(secret_client, parent, debug = debug)

        if every_secret == "Not Enabled" or every_secret == None:
            secrets_list.setdefault(project_id, {})

        else:

            # Handle case where every_instance is empty
            if not every_secret:
                secrets_list.setdefault(project_id, {})

            else:
                secrets_list.setdefault(project_id, {}).update({HashableSecret(secret): {} for secret in every_secret})
                for secret in every_secret:
                    # To add in later step

Modules/SecretsManager/utils/util_helpers.py

from google.cloud import secretmanager_v1
import argparse
from UtilityController import *
import pandas as pd
import os
from Modules.IAM.utils.util_helpers import secret_get_iam_policy, secret_set_iam_policy


from google.api_core.exceptions import PermissionDenied
from google.api_core.exceptions import NotFound
from google.api_core.exceptions import Forbidden
from google.api_core.exceptions import InvalidArgument

from google.iam.v1 import iam_policy_pb2 

# Utilities
from UtilityController import *

def list_secrets(
        secret_client, 
        parent, 
        debug = False
    ):

    if debug: print(f"[DEBUG] Listing secrets for project {parent} ...")
    
    secrets_list = []

    try:
        request = secretmanager_v1.ListSecretsRequest(
            parent=parent,
        )

        secrets_list = list(secret_client.list_secrets(request=request))

    except Forbidden as e:

        if "Permission 'secretmanager.secrets.list' denied for resource" in str(e):
            
            UtilityTools.print_403_api_denied("secretmanager.secrets.list", resource_name = parent)
        
            return None
            
        elif "Secret Manager API has not been used in project" in str(e) and "before or it is disabled" in str(e):
            
            UtilityTools.print_403_api_disabled("Secrets Manager", parent)

            return "Not Enabled"

        elif "403 Permission denied on resource project" in str(e):

            UtilityTools.print_403_api_denied("secretmanager.secrets.list", resource_name = parent)

        return None

    except NotFound as e:

        if f"was not found" in str(e):
            UtilityTools.print_404_resource(parent)

    except Exception as e:
        
        UtilityTools.print_500(parent, "secretmanager.secrets.list", e)
        return None

    if debug: print(f"[DEBUG] Successfully called list_secrets for {parent} ...")
    
    return secrets_list

[TRUNCATED]
  1. Saving Data in Database: Per the setup process there should be SQLite tables now for your resources. Create a function in utils/util_helpers.py called save_[resource_name]([resource_object], session, [project_id]). The function will take the incoming object, create a large dictionary of keys (save_data) that match the columns we defined for our SQLite table, save the object attributes to the corresponding keys in the dictionary, and pass this dictionary along to a predefined function (session.insert_data()) to save the data. Some object attributes might have a certain depth or not be a simple value returned (ex. replication for the Secret object). In this case, we have to make a minidictionary, save it as a JSON, and save it as a string, and save it to the corresponding key in the outermost dictionary (see example below). Why do all this? Because I haven't found a reliable way to deserialize GCP objects without losing data 😢 but def working on simplifying this. Back in your main enumeration module, add the logic pass in the list of recently listed objects to the save function. Note this is only done for the automated branch, and not the manual branch as we don't know 100% of the resource exist yet.

Example: Add the save_secret function as described in this step. You can also add save_secret_version as we will be saving secret version anyways. Remember to find the attributes for the different objects check the SDK for object definitions. Secret Secret Versions. Back in enum_secrets.py, add secrets_list.setdefault(project_id, {}).update({HashableSecret(secret): {} for secret in every_secret}) to save our returned list to the secrets_list. Note we are also wrapping it in HashableSecret so it will work in dictionaries, etc.

Modules/SecretsManager/Enumeration/enum_secrets.py

[TRUNCATED]

    else:

        parent = f"projects/{project_id}"

        every_secret = list_secrets(secret_client, parent, debug = debug)

        if every_secret == "Not Enabled" or every_secret == None:
            secrets_list.setdefault(project_id, {})

        else:

            # Handle case where every_instance is empty
            if not every_secret:
                secrets_list.setdefault(project_id, {})

            else:
                secrets_list.setdefault(project_id, {}).update({HashableSecret(secret): {} for secret in every_secret})
                for secret in every_secret:
                    # ADDED - Save secret function
                    save_secret(secret, session, project_id)

Modules/SecretsManager/utils/util_helpers.py

from google.cloud import secretmanager_v1
import argparse
from UtilityController import *
import pandas as pd
import os
from Modules.IAM.utils.util_helpers import secret_get_iam_policy, secret_set_iam_policy


from google.api_core.exceptions import PermissionDenied
from google.api_core.exceptions import NotFound
from google.api_core.exceptions import Forbidden
from google.api_core.exceptions import InvalidArgument

from google.iam.v1 import iam_policy_pb2 

# Utilities
from UtilityController import *

# ADDED - Save Secret Function
def save_secret(secret, session, secret_project_id):

    table_name = 'secretsmanager-secrets'

    project_name = secret.name.split("/")[1]

    save_data = {"project_id":secret_project_id, "project_name":f"projects/{project_name}"}   

    if secret.name: save_data["name"] = secret.name
    
    replication = {}
    if secret.replication:
        if secret.replication.automatic: 
            replication["automatic"] = {}
            if secret.replication.automatic.customer_managed_encryption:
                replication["automatic"]["customer_managed_encryption"] = secret.replication.automatic.customer_managed_encryption

        if secret.replication.user_managed: 
            replication["user_managed"] = {}
            if secret.replication.user_managed.replicas:
                replication["user_managed"]["replicas"] = []
                for replica in secret.replication.user_managed.replicas:
                    replica_starting = {"location": None, "customer_managed_encryption": {"kms_key_name": None}}
                    if replica.location:
                        replica_starting["location"] = replica.location
                    if replica.customer_managed_encryption and replica.customer_managed_encryption.kms_key_name:
                        replica_starting["customer_managed_encryption"]["kms_key_name"] = replica.customer_managed_encryption.kms_key_name

                    if replica_starting["location"] or replica_starting["customer_managed_encryption"]["kms_key_name"]:
                        replication["user_managed"]["replicas"].append(replica_starting)
                
    save_data["replication"] = replication

    if secret.create_time: save_data["create_time"] = secret.create_time
    if secret.labels: save_data["labels"] = dict(secret.labels)
    if secret.topics: save_data["topics"] = dict(secret.topics)
    if secret.expire_time: save_data["expire_time"] = str(secret.expire_time)
    if secret.ttl: save_data["ttl"] = str(secret.ttl)
    if secret.etag: save_data["etag"] = str(secret.etag)

    rotation = {}
    if secret.rotation:
        if secret.rotation.next_rotation_time: rotation["next_rotation_time"] = str(secret.rotation.next_rotation_time)
        if secret.rotation.rotation_period: rotation["rotation_period"] = str(secret.rotation.rotation_period)

    save_data["rotation"] = rotation
    if secret.version_aliases: save_data["version_aliases"] = dict(secret.version_aliases)
    if secret.annotations: save_data["annotations"] = dict(secret.annotations)
    if secret.version_destroy_ttl: save_data["version_destroy_ttl"] = str(secret.version_destroy_ttl)

    if secret.customer_managed_encryption: 
        if secret.customer_managed_encryption.kms_key_name: 
            save_data["customer_managed_encryption"] = {"kms_key_name": secret.customer_managed_encryption.kms_key_name}

    session.insert_data(table_name, save_data)

def save_secret_version(secret, session, secret_project_id):

    table_name = 'secretsmanager-secretversions'

    project_name = secret.name.split("/")[1]
    version_num = secret.name.split("/")[-1]

    save_data = {"project_id":secret_project_id, "project_name":f"projects/{project_name}", "version_num": version_num}   

    if secret.name: save_data["name"] = secret.name
    if secret.create_time: save_data["create_time"] = secret.create_time
    if secret.destroy_time: save_data["destroy_time"] = secret.destroy_time
    if secret.state: save_data["state"] = str(secret.state)
    if secret.etag: save_data["etag"] = str(secret.etag)
    if secret.client_specified_payload_checksum: save_data["client_specified_payload_checksum"] = str(secret.client_specified_payload_checksum)
    if secret.scheduled_destroy_time	: save_data["scheduled_destroy_time	"] = str(secret.scheduled_destroy_time	)


    replication_status = {}
    if secret.replication_status:
        if secret.replication_status.automatic: 
            replication_status["automatic"] = {}
            if secret.replication_status.automatic.customer_managed_encryption:
                replication_status["automatic"]["customer_managed_encryption"] = secret.replication_status.automatic.customer_managed_encryption

        if secret.replication_status.user_managed: 
            replication_status["user_managed"] = {}
            if secret.replication_status.user_managed.replicas:
                replication_status["user_managed"]["replicas"] = []
                for replica in secret.replication_status.user_managed.replicas:
                    replica_starting = {"location": None, "customer_managed_encryption": {"kms_key_name": None}}
                    if replica.location:
                        replica_starting["location"] = replica.location
                    if replica.customer_managed_encryption and replica.customer_managed_encryption.kms_key_name:
                        replica_starting["customer_managed_encryption"]["kms_key_name"] = replica.customer_managed_encryption.kms_key_name

                    if replica_starting["location"] or replica_starting["customer_managed_encryption"]["kms_key_name"]:
                        replication_status["user_managed"]["replicas"].append(replica_starting)
                
    save_data["replication_status"] = replication_status

    if secret.customer_managed_encryption: 
        if secret.customer_managed_encryption.kms_key_name: 
            save_data["customer_managed_encryption"] = {"kms_key_name": secret.customer_managed_encryption.kms_key_name}

    session.insert_data(table_name, save_data)

Note

Having the project_id as the outermost key seems annoying but this will allow users to pass in resource lists with resources from differing projects. In most cases though there will probably just be 1 project_id

  1. Add GET Object API Per Object: At this point we have a list of objects manually/automatically supplied. So the next step is to iterate through each object and call the GET equivalent API endpoint. This will involve two loops as our list of resources has project IDs as the outermost key, with resource lists or dictionaries as the values. In the innermost loop, for each resource you would leverage the get_[resource]() API call to get the resource. Note this might be redundant, however. If we already listed all the resources, maybe we want to skip the GET check to be quieter. Thus, we introduce the --minimal-calls argparse flag. Wrap your code to GET the specific resource in this conditional so a user can skip it if needed.

Example: Add the --minimal-calls argparse flag. Begin looping through the secret objects for each project. Some items of note are:

  1. Add a debug statement when beginning the first for loop highlighting the number of resources found for the specific project
  2. Add a print statement stating in the inntermost loop that you are Reviewing [resource_name]
  3. Add a conditional to run the GET API call on the resource if the user has not specified --minimal-calls
  4. Within the conditional call our GET request from util_helpers.py. If the GET request fails with a 404, return and don't save/change anything. If the GET request is successful, save the secret per our save_secret function. If the user supplied an argument (args.secret_names or args.secret_names_file) AND the object has validated set to False, switch validated to True as we know the resource is "valid" or exists per the successful API call. Use some pop/etc actions to swap out our "unvalidated" class with the "validated" class.

enum_secrets.py

def run_module(user_args, session, first_run = False, last_run = False, output_format = ["table"]):

    # Set up Argparser to handle flag arguments
    parser = argparse.ArgumentParser(description="Enumerate HMAC Keys Options", allow_abbrev=False)
    
    exclusive_access_key_group = parser.add_mutually_exclusive_group(required=False)
    exclusive_access_key_group.add_argument("--secret-names", type=str, help="Secrets to check in the format projects/[project_id]/secrets/[secret_name]")
    exclusive_access_key_group.add_argument("--secret-names-file", type=str, help="File name to get secrets in format projects/[project_id]/secrets/[secret_name] per line")

    # Debug/non-module specific
    parser.add_argument("--minimal-calls", action="store_true",  help="Perform just List calls or minimal set of API calls")

    parser.add_argument("-v","--debug",action="store_true",required=False,help="Get verbose data during the module run")

    args = parser.parse_args(user_args)
    
    debug, project_id = args.debug, session.project_id
    action_dict, secrets_list = {}, {}
    
    secret_client = secretmanager_v1.SecretManagerServiceClient(credentials = session.credentials)   

    if args.secret_names or args.secret_names_file:
        
        if args.secret_names:

            secrets_list_rudimentary = args.secret_names.split(",")

        elif args.secret_names_file:

            secrets_file = args.secret_names_file

            try:

                secrets_list_rudimentary = [line.strip() for line in open(secrets_file)]
                
            except FileNotFoundError:
                print(f"{UtilityTools.RED}[X] File {secrets_file} does not appear to exist. Exiting...{UtilityTools.RESET}")
                return -1

        # Check if input is valid
        status, incorrect_input = UtilityTools.validate_input_format(secrets_list_rudimentary, 4)
        if status != 0: 
            print(f"{UtilityTools.RED}[X] Value \"{incorrect_input}\" is incorrect. Must be 'projects/[project_id]/secrets/[secret_name] Please try again...{UtilityTools.RESET}")
            return -1

        for secret_name in secrets_list_rudimentary:

            _, secret_project_id, _, common_secret_name = secret_name.split("/")
            secret_value = HashableSecret(Instance(name = secret))
            secret_value.validated = False
            secrets_list.setdefault(secret_project_id, set([])).add(secret_value)  

    else:

        parent = f"projects/{project_id}"

        every_secret = list_secrets(secret_client, parent, debug = debug)

        if every_secret == "Not Enabled" or every_secret == None:
            secrets_list.setdefault(project_id, {})

        else:
            # Handle case where every_instance is empty
            if not every_secret:
                secrets_list.setdefault(project_id, {})

            else:
                secrets_list.setdefault(project_id, {}).update({HashableSecret(secret): {} for secret in every_secret})
                for secret in every_secret:
                    save_secret(secret, session, project_id)

    for secret_project_id, secret_list in secrets_list.items():

        if debug: 

            if len(secret_list) != 0:
                num_of_secrets = len(secret_list)
                print(f"[DEBUG] {num_of_secrets} secrets were found")
            
            else:
                print(f"[DEBUG]  No instances were found")

        for secret in secret_list:

            validated = secret.validated
            secret_name = secret.name

            print(f"[**] [{secret_project_id}] Reviewing {secret_name}")

            if not args.minimal_calls:

                print(f"[***] GET Base Secret Entity")
                secret_get = get_secret(secret_client, secret_name, debug=debug)
                
                if secret_get:
                    if secret_get == 404:
                        continue

                    else:
                        save_secret(secret_get, session, secret_project_id)
                        
                        if args.secret_names or args.secret_names_file and validated == False:

                            validated = True
                            temp = secrets_list[secret_project_id].pop(sa)
                            secrets_list[secret_project_id][secret_get] = temp
  1. Adding Layers: You will find some enumeration modules might naturally enumerate several resources as they are nested within each other. For example, enum_buckets enumerates buckets but also adds another innermost for loop to enumerate each blob in each bucket. If your service is structured in such a way that each resource can have resources inside of it (ex. each bucket can have blobs inside of them), then you will need to replicate everything we have done but just add an inner for loop including handling manual/automated lists with corresponding save_[resource] functions/tables and adding relevant argprase arguments.

Example: IN our case each "secret" can have "secret versions" in them. So as we iterate through each "secret" object, we need to add ANOTHER loop to loop through each "secret version". Thus we are going to need to add another for loop at the end of our current code. If the resource was a name (ex. blob name) we might need to add another exclusive argparse pair to supply resource names via STDIN or filename. Since secret versions are actually just an integer, we can add parser.add_argument("--version-range", type=parse_range, help="Range of secret versions to try (ex. 1-100)") and just have the user specify an integer. For more complex use cases review enum_buckets.py.

[TRUNCATED]
# Entrypoint; Try-Catch Exists on Caller
def run_module(user_args, session, first_run = False, last_run = False, output_format = ["table"]):

    # Set up Argparser to handle flag arguments
    parser = argparse.ArgumentParser(description="Enumerate HMAC Keys Options", allow_abbrev=False)
    
    exclusive_access_key_group = parser.add_mutually_exclusive_group(required=False)
    exclusive_access_key_group.add_argument("--secret-names", type=str, help="Secrets to check in the format projects/[project_id]/secrets/[secret_name]")
    exclusive_access_key_group.add_argument("--secret-names-file", type=str, help="File name to get secrets in format projects/[project_id]/secrets/[secret_name] per line")
    parser.add_argument("--version-range", type=parse_range,  help="Range of secret versions to try (ex. 1-100)")
 
    # Debug/non-module specific
    parser.add_argument("--minimal-calls", action="store_true",  help="Perform just List calls or minimal set of API calls")

    parser.add_argument("-v","--debug",action="store_true",required=False,help="Get verbose data during the module run")

    args = parser.parse_args(user_args)
    
    debug, project_id = args.debug, session.project_id
    action_dict, secrets_list = {}, {}
    
    secret_client = secretmanager_v1.SecretManagerServiceClient(credentials = session.credentials)   

    if args.secret_names or args.secret_names_file:
        
        if args.secret_names:

            secrets_list_rudimentary = args.secret_names.split(",")

        elif args.secret_names_file:

            secrets_file = args.secret_names_file

            try:

                secrets_list_rudimentary = [line.strip() for line in open(secrets_file)]
                
            except FileNotFoundError:
                print(f"{UtilityTools.RED}[X] File {secrets_file} does not appear to exist. Exiting...{UtilityTools.RESET}")
                return -1

        # Check if input is valid
        status, incorrect_input = UtilityTools.validate_input_format(secrets_list_rudimentary, 4)
        if status != 0: 
            print(f"{UtilityTools.RED}[X] Value \"{incorrect_input}\" is incorrect. Must be 'projects/[project_id]/secrets/[secret_name] Please try again...{UtilityTools.RESET}")
            return -1

        for secret_name in secrets_list_rudimentary:

            _, secret_project_id, _, common_secret_name = secret_name.split("/")
            secret_value = HashableSecret(Instance(name = secret))
            secret_value.validated = False
            secrets_list.setdefault(secret_project_id, set([])).add(secret_value)  

    else:

        parent = f"projects/{project_id}"

        every_secret = list_secrets(secret_client, parent, debug = debug)

        if every_secret == "Not Enabled" or every_secret == None:
            secrets_list.setdefault(project_id, {})

        else:

            # Handle case where every_instance is empty
            if not every_secret:
                secrets_list.setdefault(project_id, {})

            else:
                secrets_list.setdefault(project_id, {}).update({HashableSecret(secret): {} for secret in every_secret})
                for secret in every_secret:
                    save_secret(secret, session, project_id)

    for secret_project_id, secret_list in secrets_list.items():

        if debug: 

            if len(secret_list) != 0:
                num_of_secrets = len(secret_list)
                print(f"[DEBUG] {num_of_secrets} secrets were found")
            
            else:
                print(f"[DEBUG]  No instances were found")

        for secret in secret_list:

            validated = secret.validated
            secret_name = secret.name

            print(f"[**] [{secret_project_id}] Reviewing {secret_name}")

            if not args.minimal_calls:

                print(f"[***] GET Base Secret Entity")
                secret_get = get_secret(secret_client, secret_name, debug=debug)
                
                if secret_get:
                    if secret_get == 404:
                        continue

                    else:

                        save_secret(secret_get, session, secret_project_id)
                        
                        if args.secret_names or args.secret_names_file and validated == False:

                            validated = True
                            temp = secrets_list[secret_project_id].pop(sa)
                            secrets_list[secret_project_id][secret_get] = temp

            print(f"[***] LIST Secret Versions")


            # ADDED - Note for innermost resources we only save a list of strings for secret_list per previous instructions. So while we save the whole "object" , for stdout purposes I'll just save the names.
            secret_versions_list = []

            # Manual vs Automated Approach
            if args.version_range:
                all_version_numbers = args.version_range
                for number in all_version_numbers:
                    secret_versions_list.append(f"{secret_name}/versions/{number}")
            
            else:
                secret_versions_list = list_secret_versions(secret_client, secret_name)

                if secret_versions_list:
                    if not args.secret_names and not args.secret_names_file:
                        version_nums = [path.name.split('/')[-1] for path in secret_versions_list]
                        for version in version_nums:
                            secrets_list.setdefault(secret_project_id, {}).setdefault(HashableSecret(secret), {}).setdefault(version, None)


            if secret_versions_list:

                for secret_version in secret_versions_list:
                    
                    if not args.version_range:
                        save_secret_version(secret_version, session, secret_project_id)
                        secret_version_name = secret_version.name
                    
                    else:
                        secret_version_name = secret_version
                    
                    version_num = secret_version_name.split('/')[5]


                    secret_version_condensed_name = secret_version_name.split("/")[3] + f" (Version: {version_num})"

                    #print(f"[**] [{secret_project_id}] Reviewing {secret_version_name}")

                    if not args.minimal_calls:

                        print(f"[****] GET Secret Version {version_num}")
                        secret_get_version = get_secret_version(secret_client, secret_version_name, debug=debug)
                        
                        if secret_get_version:
                            if secret_get_version == 404:
                                continue
                            else:
                                save_secret_version(secret_get_version, session, secret_project_id)
                                if args.version_range:
                                    secrets_list[secret_project_id][secret][version_num] = None
      
  1. Printing STDOUT: Once we have finally finished enumerating through all our data, we should have either the single-dimension or nested JSON referenced earlier in [resources]_list variable. At this point you can use lambda or whatever functions you want to modify/sort the data if needed before passing it to STDOUT. Otherwise, you can pass it directly into the UtilityTools function as shown below. See a breakdown of this (here)[#shared-information]

Example: Add the call at the end of enum_secrets.py to print out the secrets from our secret_list. Note we enumerate through each project ID again as this is called once per project ID.

[TRUNCATED]
                    version_num = secret_version_name.split('/')[5]


                    secret_version_condensed_name = secret_version_name.split("/")[3] + f" (Version: {version_num})"

                    #print(f"[**] [{secret_project_id}] Reviewing {secret_version_name}")

                    if not args.minimal_calls:

                        print(f"[****] GET Secret Version {version_num}")
                        secret_get_version = get_secret_version(secret_client, secret_version_name, debug=debug)
                        
                        if secret_get_version:
                            if secret_get_version == 404:
                                continue
                            else:
                                save_secret_version(secret_get_version, session, secret_project_id)
                                if args.version_range:
                                    secrets_list[secret_project_id][secret][version_num] = None

    for secret_project_id, secret_only_info in secrets_list.items():
    
        UtilityTools.summary_wrapup(
            secret_project_id, 
            "Secrets Secrest/Versions", 
            secret_only_info, 
            ["name","expire_time"],
            primary_resource = "Secret Names",
            secondary_title_name = "versions",
            output_format = output_format
        ) 

Part 4 - Adding IAM Tracking

At this point you should have a fully functional module that enumerates and saves data to SQLite tables. At this point we are going to add IAM tracking via passive means and testIamPermissions.

  1. Resource IAM Setup: In utils/resource_perm_mappings.txt add a row for your newly created actions in the form <resource_type>,_actions_allowed. <resource_type> is the name you want to give these resources that you will use to build your IAM dictionary, and _actions_allowed will be used when syncing your collected permissions.

Example: Add resource type of "secrets" and "secret_actions_allowed" to our utils/resource_perm_mappings.txt file

utils/resource_perm_mappings.txt

secrets,secret_actions_allowed
  1. IAM Enumeration Overview: The action_dict dictionary is defined at the beginning of the module as an empty dictionary. This dictionary will hold ALL permissions identified at a granular level by the conclusion of the module. A successful permission can be tied to an organization, a folder, a project, or an individual resource. The final dictionary will have the following format:
{
"organization_permissions": {
     [org_id_1]: set([permission1, permission2]),
     [org_id_2]: set([permission1, permission2])
}

"folder_permissions": {
     [folder_id_1]: set([permission1, permission2]),
     [folder_id_2]: set([permission1, permission2])
}

"project_permissions": {
     [project_id_1]: set([permission1, permission2]),
     [project_id_2]: set([permission1, permission2])
}
[project_id_1]: {
     	[permission1]: {
		<resource_type>: set([resource_name_1, resource_name_2) 
}
   	  Permission2: {
		<resource_type>: set([resource_name_1, resource_name_2) 
}
}

Example: A possible final action_dict for a user running the enum_secrets module might look like the following. The user has "secretsmanager.secrets.list" permissions at the project level. The user also has "secretsmanager.secrets.get" permissions for 3 secrets meaning they can get back the object. However, they only have "secretsmanager.secrets.delete" over 1 of the secrets.

{
"project_permissions": {
     "random-project-id-3893729873927390273": ["secretsmanager.secrets.list"],
}
"random-project-id-3893729873927390273": {
     	"secretsmanager.secrets.get": {
		"secret": ["my_cool_secret_1", "my_cool_secret_2", "my_cool_secret_3"]
         }
   	 "secretsmanager.secrets.delete": {
		"secret": ["my_cool_secret_1"]
         }
}
  1. Passive IAM Tracking: As you list and subsequently get individual resources successfully, you will add statements to populate action_dict. For projects this will generally be action_dict.setdefault('project_permissions', {}).setdefault(project_id, set()).add('secretmanager.secrets.list') while for individual resources this will be usually action_dict.setdefault([project_id], {}).setdefault('[permission]', {}).setdefault('[resource_name]', set()).add([resource_name])

Example: In our enum_secrets module, we can see that we add "secretsmanager.secrets.list" to the project if the list API call was successful. We can also add the specific resource name when successfully getting (or running "secretsmanager.secrets.get") for individual secrets.

    secret_client = secretmanager_v1.SecretManagerServiceClient(credentials = session.credentials)   

    if args.secret_names or args.secret_names_file:
        
       [TRUNCATED]  

    else:

        parent = f"projects/{project_id}"

        every_secret = list_secrets(secret_client, parent, debug = debug)
    
        # If service is not enabled or no secrets were returned set project ID as empty set
        if every_secret == "Not Enabled" or every_secret == None:
            secrets_list.setdefault(project_id, {})

        else:

            # API was successful and returned 0+ items. Populate ["project_permissions"][project_id] set with secretsmanager.secrets.list
            action_dict.setdefault('project_permissions', {}).setdefault(project_id, set()).add('secretmanager.secrets.list')

            # Handle case where every_instance is empty
            if not every_secret:
                secrets_list.setdefault(project_id, {})

            else:
                secrets_list.setdefault(project_id, {}).update({HashableSecret(secret): {} for secret in every_secret})
                for secret in every_secret:
                    save_secret(secret, session, project_id)

    for secret_project_id, secret_list in secrets_list.items():

        if debug: 
       
        [TRUNCATED]

       for secret in secret_list:

            validated = secret.validated
            secret_name = secret.name

            print(f"[**] [{secret_project_id}] Reviewing {secret_name}")

            if not args.minimal_calls:

                print(f"[***] GET Base Secret Entity")
                secret_get = get_secret(secret_client, secret_name, debug=debug)
                
                if secret_get:

                    # If secret doesn't exist (ex. user supplied) continue
                    if secret_get == 404:
                        continue

                    else:

                        # Add secret name to [project_id]['secretmanager.secrets.get'] set
                        action_dict.setdefault(secret_project_id, {}).setdefault('secretmanager.secrets.get', {}).setdefault('secrets', set()).add(secret_get.name.split("/")[-1])
                        save_secret(secret_get, session, secret_project_id)
                        
                        if args.secret_names or args.secret_names_file and validated == False:

                            validated = True
                            temp = secrets_list[secret_project_id].pop(sa)
                            secrets_list[secret_project_id][secret_get] = temp

            [TRUNCATED]

Note

Some permissions like "secretsmanager.secrets.list" can't be used on individual resources and can only be added at the project at the lowest level. Trying to pass in "secretsmanager.secrets.list" into something like testIamPermissions will throw an error. I usually pull all the GCP service permissions from (here)[https://cloud.google.com/iam/docs/permissions-reference] (this might time out your browser) and remove them one by one to filter out those that don't belong in the function.

  1. IAM testIamPermissions Add: Resources that support IAM bindings also usually support an API call called testIamPermissions. You might have heard about this with respect to project-level bindings but the same principle holds for resource-level bindings. This API call accepts an array of permissions and returns those permissions granted to the caller on the specified resource. You can find these API calls in the Google Python SDK. For example, the SecretsManager testIamPermissions API is defined here: https://cloud.google.com/python/docs/reference/secretmanager/latest/google.cloud.secretmanager_v1.services.secret_manager_service.SecretManagerServiceClient#google_cloud_secretmanager_v1_services_secret_manager_service_SecretManagerServiceClient_test_iam_permissions. If your service supports testIamPermissions, add the "--iam" flag command line argument and add a conditional with code logic to call the testIamPermissions API after you fetch each resource with GET. ALso define the function in your util_helpers.py method. Whatever array of permissions are returned add them to action_dict.

Example: Add the "--iam" command line argument and the code to call the testIamPermissions command in enum_secrets.py that is defined in util_helpers.py.

Modules/SecretsManager/Enumeration/enum_secrets.py

def run_module(user_args, session, first_run = False, last_run = False, output_format = ["table"]):

    # Set up Argparser to handle flag arguments
    parser = argparse.ArgumentParser(description="Enumerate HMAC Keys Options", allow_abbrev=False)
    
    exclusive_access_key_group = parser.add_mutually_exclusive_group(required=False)
    exclusive_access_key_group.add_argument("--secret-names", type=str, help="Secrets to check in the format projects/[project_id]/secrets/[secret_name]")
    exclusive_access_key_group.add_argument("--secret-names-file", type=str, help="File name to get secrets in format projects/[project_id]/secrets/[secret_name] per line")

    # ADDED
    parser.add_argument("--iam",action="store_true",required=False,help="Call TestIAMPermissions on Compute Instances")
 
    parser.add_argument("-v","--debug",action="store_true",required=False,help="Get verbose data during the module run")

   [TRUNCATED]
   secret_client = secretmanager_v1.SecretManagerServiceClient(credentials = session.credentials)   

    if args.secret_names or args.secret_names_file:
        
        [TRUNCATED]  

    else:

        [TRUNCATED]

    for secret_project_id, secret_list in secrets_list.items():

        if debug: 

            if len(secret_list) != 0:
                num_of_secrets = len(secret_list)
                print(f"[DEBUG] {num_of_secrets} secrets were found")
            
            else:
                print(f"[DEBUG]  No instances were found")

        for secret in secret_list:

            validated = secret.validated
            secret_name = secret.name

            print(f"[**] [{secret_project_id}] Reviewing {secret_name}")

            [TRUNCATED]
    
            # ADDED
            if args.iam:
                print(f"[***] TEST Secret Permissions")
                
                authenticated_permissions = check_secret_permissions(secret_client, secret_name, debug = debug)
                
                for permission in authenticated_permissions:
                    
                    if args.secret_names or args.secret_names_file and validated == False:
                        validated = True
                        secrets_list[secret_project_id][secret].validated = True 

                    action_dict.setdefault(secret_project_id, {}).setdefault(permission, {}).setdefault('secrets', set()).add(secret.name.split("/")[-1])

Modules/SecretsManager/utils/util_helpers.py

# ADDED
# Call this standalone to figure out trial-and-error which permissions can be applied to individual resource. Those that can only be applied
# at the resource level will trigger an error below
def check_secret_permissions(secret_client, secret_name, debug = False):
    
    authenticated_permissions = []

    try:
        
        request = iam_policy_pb2.TestIamPermissionsRequest(
            resource=secret_name,
            permissions=[
                    'secretmanager.secrets.delete',
                    'secretmanager.secrets.get',
                    'secretmanager.secrets.getIamPolicy',
                    'secretmanager.secrets.setIamPolicy',
                    'secretmanager.secrets.update'
            ] 
                
        )
        
        authenticated_permissions = secret_client.test_iam_permissions(
            request=request
        )

        # Get list of allowed permissions
        authenticated_permissions = list(authenticated_permissions.permissions)

    except NotFound as e:
        if "404 Secret" in str(e):
            print(f"{UtilityTools.RED}[X] 404 The secret is not found for {parent}{UtilityTools.RESET}")
        return 404

    except Forbidden as e:
        print(f"[-] 403 The user does not have testIamPermissions permissions on {secret_name} ")
    
    except Exception as e:
        print("An unknown exception occurred when trying to call list_functions as follows:\n" + str(e))

    return authenticated_permissions
  1. Sync IAM Permissions: With all your permissions collected via passive and testIamPermissions, save action_dict to the predefined function at the end of your code for each project with the _actions_allowed column added in the first step as the second argument.

enum_secrets.py

[TRUNCATED]
   
    for secret_project_id, secret_list in secrets_list.items():

        [TRUNCATED]

        for secret in secret_list:

            [TRUNCATED]

        session.insert_actions(action_dict, secret_project_id, column_name = "secret_actions_allowed")

Note

This should get you to a stable spot for IAM. Still building out instructions/refining enumerating IAM policy bindings. To be added soon.

Part 5 - Misc. Remaining Items

  1. Download: If your service has resources that might be useful to download, add the download flag and a final conditional when running through our list of resources to download content. This might include bucket blobs, function source code, etc. There are not strict rules for this at the moment. In general, when deciding your final filename please use destination_filename = UtilityTools.get_save_filepath(session.workspace_directory_name, "<filename>", "<resource>") to save items in the correct folder. It would also make sense to encapsulate most of your download logic in util_helpers.py This is also where you might add custom flags. For example, in Secret Manger we are downloading versions so I added the "--versions" flag so a user could specify this. Cloud Storage has several custom flags to help filter content being downloaded.

Example: As described before we enumerate through each secret version having found the respective secret. For Secrets Manager it makes sense "download" would try to get the secret version values and download those to disk. We add the corresponding logic under the download conditional below. Note we also have IAM permissions being passively added if successful.

[TRUNCATED]

    parser.add_argument("--download", action="store_true",  help="Perform just List calls or minimal set of API calls")

            [TRUNCATED]

            print(f"[***] LIST Secret Versions")

            secret_versions_list = []

            # User manual supply
            if args.version_range:
                all_version_numbers = args.version_range
                for number in all_version_numbers:
                    secret_versions_list.append(f"{secret_name}/versions/{number}")
            
            # Auto-calculate items
            else:
                secret_versions_list = list_secret_versions(secret_client, secret_name)

                if secret_versions_list:
                    if not args.secret_names and not args.secret_names_file:
                        version_nums = [path.name.split('/')[-1] for path in secret_versions_list]
                        for version in version_nums:
                            secrets_list.setdefault(secret_project_id, {}).setdefault(HashableSecret(secret), {}).setdefault(version, None)

            # If we have a list of secret versions, proceed with GET/TestIAMPermissions/possible download
            if secret_versions_list:

                for secret_version in secret_versions_list:
                    
                    if not args.version_range:
                        save_secret_version(secret_version, session, secret_project_id)
                        secret_version_name = secret_version.name
                    
                    else:
                        secret_version_name = secret_version
                    
                    version_num = secret_version_name.split('/')[5]


                    secret_version_condensed_name = secret_version_name.split("/")[3] + f" (Version: {version_num})"

                    #print(f"[**] [{secret_project_id}] Reviewing {secret_version_name}")

                    if not args.minimal_calls:

                        print(f"[****] GET Secret Version {version_num}")
                        secret_get_version = get_secret_version(secret_client, secret_version_name, debug=debug)
                        
                        if secret_get_version:
                            if secret_get_version == 404:
                                continue
                            else:
                                # Add permission to dictionary and save GET response
                                action_dict.setdefault(secret_project_id, {}).setdefault('secretmanager.versions.get', {}).setdefault('secret version', set()).add(secret_version_condensed_name)
                                save_secret_version(secret_get_version, session, secret_project_id)
                                if args.version_range:
                                    secrets_list[secret_project_id][secret][version_num] = None

                    if args.iam:
                        print(f"[****] TEST Secret Version Permissions")
                        
                        authenticated_permissions = check_secret_version_permissions(secret_client, secret_version_name, debug = debug)
                        
                        for permission in authenticated_permissions:
                            
                            if args.version_range:
                                secrets_list[secret_project_id][secret][version_num] = None

                            
                            action_dict.setdefault(secret_project_id, {}).setdefault(permission, {}).setdefault('secret version', set()).add(secret_version_condensed_name)

                    # Attempt to "access_secret_value". If successful save or "download" secret to a local file. Note the use of UtilityTools to find the final path.
                    print(f"[****] GETTING Secret Values For {version_num}")
                    secret_value = access_secret_value(secret_client, secret_version_name, debug = debug)

                    if secret_value:
                        print(f"{UtilityTools.GREEN}{UtilityTools.BOLD}[****] SECRET VALUE RETRIEVED FOR {version_num}{UtilityTools.RESET}")
                        action_dict.setdefault(secret_project_id, {}).setdefault("secretmanager.versions.access", {}).setdefault('secret version', set()).add(secret_version_condensed_name)

                        if secret_value.payload.data:
                            
                            # secret_project_id
                            # secret_version_condensed_name
                            secret_value_data = secret_value.payload.data
                            
                            secrets_list[secret_project_id][secret][version_num] = secret_value_data.decode('utf-8')
                            
                            entry = {
                                "primary_keys_to_match":{
                                    "name": secret_version_name
                                },
                                "data_to_insert":{
                                    "secret_value":secret_value_data
                                }
                            }
                            session.insert_data('secretsmanager-secretversions', entry, update_only = True )
                            if args.download:
                                destination_filename = UtilityTools.get_save_filepath(session.workspace_directory_name, "secrets_data_file.csv", "Secrets")
                                data = {
                                    "secret_project_id": [secret_project_id],
                                    "secret_name_version": [secret_version_condensed_name],
                                    "secret_value_data": [secret_value_data]
                                }                              
                                df = pd.DataFrame(data)
                                if not os.path.isfile(destination_filename):
                                    # File doesn't exist, so write (create) it
                                    df.to_csv(destination_filename, index=False)
                                else:
                                    # File exists, so append to it
                                    df.to_csv(destination_filename, mode='a', header=False, index=False)
        [TRUNCATED]

Shared Information

  1. run_module() The run_module function is the entrypoint for all modules. It has the following parameters passed in:

    1. user_args: User flags supplied when running the module. You can pass these into argparse to add as many flags as you want. As you will see throughout these instructions there are some "standard" flags we would want to implement (--iam, --download, --minimal-calls), but your free to add whatever you want on top of that.
    2. session: Probably the most important argument passed in and will be used a lot. Session contains a lot of information you will use regarding the current caller’s context. This includes notably:
      1. session.credentials – Your current user’s credentials. This is abstracted so you don't need to worry if they are OAuth creds, Service Account creds, etc. Just reference session.credentials and you can pass those into necessary arguments to authenticate. Overarching GCPwn framework handles all the auth management 😄
      2. session.project_id – Current project ID when the module is running. If the user is running a lot of automated checks this is just the default project ID or whatever project ID the run is on in the current list. If the user specified project IDs with –project-ids it would be those project IDs. Again, you don’t really need to worry about this, just know session.project_id is your current default project ID in your overall run.
    3. first_run/last_run: Still being developed but signals if the current module run is the first/last in a series of project IDs.
  2. Predefined Functions: There are some functions already created to help those who want to develop their own module. These are described here:

    1. ** UtilityTools.summary_wrapup()**: Accepts data in two forms listed below that encapsulates a summary of the resources enumerated. Framework handles the output format for you, so you just need to paste your data in.

Input Format 1 (List of Resources):

[Object1, Object2, etc]

Input Format 2 (Nested Resources):

    # {
    #     Object: [
    #         String1
    #         String2
    #     ],
    #     Object2: [
    #         String1
    #     ]
    # }

Definition:

 UtilityTools.summary_wrapup(
        project_id: Project ID of the current project you are in
        service_account_name: Name of item that will be listed (ex. Buckets, Secrets, etc.)
        objects_list: List of objects to print from the enumeration module (ex. list of GCP Compute objects).
        properties_list: List of attributes/columns to display (ex. name, id, etc.). Aligns with what is present in utils/database.yaml
        primary_resource: Name of item that will be listed (ex. Buckets, Secrets, etc.)
        secondary_title_name: If using the nested resources, what the name is of the inner list (ex. blobs, secret versions, etc)
        output_format: What output format to display the data in (txt, csv, table). Handled by GCP framework and can pass in value from run_module()
     )

Secrets Manager Example:

        UtilityTools.summary_wrapup(
            secret_project_id, 
            "Secrets Secrest/Versions", 
            secret_only_info, 
            ["name","expire_time"],
            primary_resource = "Secret Names",
            secondary_title_name = "versions",
            output_format = output_format
        ) 


Exploit Modules

  1. Pre-Requisite & Verify Need: This assumes you are familiar with the coding logic above. Mainly, review the IAM Tracking and how to use the service Google Python SDK to call certain methods.Determine a valid use case for your exploit module. If you are writing a module to purely enumerate resources, consider making an enum_[resource] module and following the "Enumeration Modules" steps above.

  2. Setup Folder Structure + Base Template:Once you have determined what you want the exploit module to do, review the "Part 2 - Setting up Base Module Features" from the Enumeration Modules to add your module to the catalog. It should begin with exploit_" and describe in general what your module does (ex. exploit_storage_setiampolicy sets the IAM policy for storage buckets). Add basic template for the module that includes run_module` (also covered in "Part 2 - Setting up Base Module Features") in the Exploit directory.

Note

You should be able to run modules or modules list at this point from inside GCPwn to see your exploit module name appear at this point in time.

  1. Gather Necessary Arguments:

  2. Execute Exploit: