Skip to content

Commit

Permalink
Merge pull request #1 from regevnoam1/StaticScan
Browse files Browse the repository at this point in the history
Static scan
  • Loading branch information
regevnoam1 authored Aug 29, 2024
2 parents 741ea1f + 59d4567 commit 6d85e7c
Show file tree
Hide file tree
Showing 34 changed files with 10,771 additions and 11 deletions.
14 changes: 14 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: KubiScan",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/KubiScan.py",
"args": ["-rs","-r"],
"console": "integratedTerminal",
"pythonPath": "python3"
}
]
}
44 changes: 37 additions & 7 deletions KubiScan.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from misc import constants
import datetime
from api.api_client import api_init, running_in_container
from api.client_factory import ApiClientFactory
from api.config import set_api_client

json_filename = ""
output_file = ""
Expand Down Expand Up @@ -274,18 +276,35 @@ def print_all_risky_containers(priority=None, namespace=None, read_token_from_co

print_table_aligned_left(t)

def print_all_risky_subjects(priority=None, namespace=None):

def get_rules_by_namespace(namespace=None):
namespace_risky_roles = []
risky_roles = engine.utils.get_risky_roles()
for role in risky_roles:
if role.namespace == namespace:
return role
return None

def print_all_risky_subjects(show_rules=False, priority=None, namespace=None):
subjects = engine.utils.get_all_risky_subjects()
if priority:
subjects = filter_objects_by_priority(priority, subjects)
global curr_header
curr_header = "|Risky Users|"
print("+-----------+")
print("|Risky Users|")
t = PrettyTable(['Priority', 'Kind', 'Namespace', 'Name'])
for subject in subjects:
if subject.user_info.namespace == namespace or namespace is None:
t.add_row([get_color_by_priority(subject.priority)+subject.priority.name+WHITE, subject.user_info.kind, subject.user_info.namespace, subject.user_info.name])
if show_rules:
t = PrettyTable(['Priority', 'Kind', 'Namespace', 'Name', 'Rules'])
for subject in subjects:
if subject.user_info.namespace == namespace or namespace is None:
subject_role = get_rules_by_namespace(subject.user_info.namespace)
rules = subject_role.rules if subject_role else None
t.add_row([get_color_by_priority(subject.priority)+subject.priority.name+WHITE, subject.user_info.kind, subject.user_info.namespace, subject.user_info.name,get_pretty_rules(rules)])
else:
t = PrettyTable(['Priority', 'Kind', 'Namespace', 'Name'])
for subject in subjects:
if subject.user_info.namespace == namespace or namespace is None:
t.add_row([get_color_by_priority(subject.priority)+subject.priority.name+WHITE, subject.user_info.kind, subject.user_info.namespace, subject.user_info.name])

print_table_aligned_left(t)

Expand Down Expand Up @@ -607,6 +626,8 @@ def main():
- Prettytable
pip3 install PTable
""")
opt.add_argument('-s', '--static', action='store_true', help='Use static API client with predefined data', required=False)
opt.add_argument('-f', '--file', type=str, help='File path for static API client', required='--static' in sys.argv)

opt.add_argument('-rr', '--risky-roles', action='store_true', help='Get all risky Roles (can be used with -r to view rules)', required=False)
opt.add_argument('-rcr', '--risky-clusterroles', action='store_true', help='Get all risky ClusterRoles (can be used with -r to view rules)',required=False)
Expand Down Expand Up @@ -723,7 +744,16 @@ def main():
exit()


api_init(kube_config_file=args.kube_config, host=args.host, token_filename=args.token_filename, cert_filename=args.cert_filename, context=args.context)
if args.static:
if not args.file:
print("Error: File path must be provided with --file when using --static")
exit(1)
api_client = ApiClientFactory.get_client(use_static=True, input_file=args.file)
else:
api_client = ApiClientFactory.get_client(use_static=False)
api_init(kube_config_file=args.kube_config, host=args.host, token_filename=args.token_filename, cert_filename=args.cert_filename, context=args.context)
set_api_client(api_client)


if args.cve:
print_cve(args.cert_filename, args.client_certificate, args.client_key, args.host)
Expand All @@ -740,7 +770,7 @@ def main():
if args.risky_any_rolebindings:
print_all_risky_rolebindings(days=args.less_than, priority=args.priority, namespace=args.namespace)
if args.risky_subjects:
print_all_risky_subjects(priority=args.priority, namespace=args.namespace)
print_all_risky_subjects(show_rules=args.rules,priority=args.priority, namespace=args.namespace)
if args.risky_pods:
print_all_risky_containers(priority=args.priority, namespace=args.namespace, read_token_from_container=args.deep)
if args.all:
Expand Down
Binary file added __pycache__/static_risky_roles.cpython-310.pyc
Binary file not shown.
Binary file added api/__pycache__/__init__.cpython-310.pyc
Binary file not shown.
Binary file added api/__pycache__/api_client.cpython-310.pyc
Binary file not shown.
Binary file added api/__pycache__/api_client_temp.cpython-310.pyc
Binary file not shown.
Binary file added api/__pycache__/base_client_api.cpython-310.pyc
Binary file not shown.
Binary file added api/__pycache__/client_factory.cpython-310.pyc
Binary file not shown.
Binary file added api/__pycache__/config.cpython-310.pyc
Binary file not shown.
Binary file added api/__pycache__/static_api_client.cpython-310.pyc
Binary file not shown.
12 changes: 11 additions & 1 deletion api/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from shutil import move
from kubernetes.client.configuration import Configuration
from kubernetes.client.api_client import ApiClient
from .base_client_api import BaseApiClient

# TODO: Should be removed after the bug will be solved:
# https://github.com/kubernetes-client/python/issues/577
from api.api_client_temp import ApiClientTemp
from .api_client_temp import ApiClientTemp

# The following variables have been commented as it resulted a bug when running `kubiscan -h`
# Exception ignored in: <bound method ApiClient.__del__ of <kubernetes.client.api_client.ApiClient object ...
Expand Down Expand Up @@ -142,3 +143,12 @@ def _set_config(self):
configuration.api_key['authorization'] = "bearer " + self.token
client.Configuration.set_default(configuration)
return configuration


class RegularApiClient(BaseApiClient):
def __init__(self):
config.load_kube_config()
self.client = client.RbacAuthorizationV1Api()

def list_roles_for_all_namespaces(self):
return self.client.list_role_for_all_namespaces()
2 changes: 1 addition & 1 deletion api/api_client_temp.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,4 +690,4 @@ def list_cluster_role(self):
cluster_role = V1ClusterRole(kind='ClusterRole', metadata=metadata, rules=rules)
cluster_roles.append(cluster_role)

return V1ClusterRoleList(items=cluster_roles)
return V1ClusterRoleList(items=cluster_roles)
9 changes: 9 additions & 0 deletions api/base_client_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from abc import ABC, abstractmethod


class BaseApiClient(ABC):

@abstractmethod
def list_roles_for_all_namespaces(self):
pass

15 changes: 15 additions & 0 deletions api/client_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .static_api_client import StaticApiClient
from .api_client import RegularApiClient

class ApiClientFactory:
@staticmethod
def get_client(use_static=False, input_file=None):
if use_static:
return StaticApiClient(input_file=input_file)
else:
return RegularApiClient()


#api_client = ApiClientFactory.get_client(use_static=True, input_file="/home/noamr/Documents/KubiScan/combined.json")
#api_client = ApiClientFactory.get_client()
#print(api_client.list_roles_for_all_namespaces())
10 changes: 10 additions & 0 deletions api/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# config.py

class Config:
api_client = None

def set_api_client(client):
Config.api_client = client

def get_api_client():
return Config.api_client
61 changes: 61 additions & 0 deletions api/static_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import json
import yaml
import os
from .base_client_api import BaseApiClient

class StaticApiClient(BaseApiClient):
def __init__(self, input_file):
self.combined_data = self._load_combined_file(input_file)
self.all_roles =self.get_resources('Role')
self.all_cluster_roles = self.get_resources('ClusterRole')
self.ll_role_bindings = self.get_resources('RoleBinding')
self.all_cluster_role_bindings = self.get_resources('ClusterRoleBinding')
self.all_secrets =self.get_resources('Secret')
self.all_pods = self.get_resources('Pod')

def _load_combined_file(self, input_file):
# Determine the file format based on the file extension
_, file_extension = os.path.splitext(input_file)
file_format = 'json' if file_extension.lower() == '.json' else 'yaml' if file_extension.lower() == '.yaml' else None

if not file_format:
print("Unsupported file extension. Only '.yaml' and '.json' are supported.")
return None

try:
with open(input_file, 'r') as file:
if file_format == "yaml":
return yaml.safe_load(file)
elif file_format == "json":
return json.load(file)
except FileNotFoundError:
print(f"File not found: {input_file}")
return None
except Exception as e:
print(f"Error reading file: {e}")
return None

def get_resources(self, kind):
try:
# Initialize an empty list to collect the resources of the specified kind.
resources = []

# Since combined_data is a list of dictionaries, each containing an 'items' key.
for entry in self.combined_data:
# Check if 'items' key exists and it contains a list of dictionaries.
if 'items' in entry and isinstance(entry['items'], list):
# Extend the list of resources with those that match the specified 'kind'.
resources.extend(item for item in entry['items'] if item.get('kind') == kind)
return resources

except TypeError: # Catch type errors if data structures are not as expected
print("Error processing data. Check the structure of the JSON file.")
return []

def list_roles_for_all_namespaces(self):
return self.all_roles


# Example usage
#static_api_client = StaticApiClient(input_file="C:\\Users\\noamr\\Documents\\GitHub\\KubiScan\\combined.json")
#print(len(static_api_client.all_secrets))
Loading

0 comments on commit 6d85e7c

Please sign in to comment.