Skip to content

Commit

Permalink
Merge pull request #286 from flavio/parallelize-everything
Browse files Browse the repository at this point in the history
parallelize everything
  • Loading branch information
flavio authored May 29, 2024
2 parents b853001 + 4ab0bea commit de424bb
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 88 deletions.
25 changes: 11 additions & 14 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,46 +25,43 @@ The map would look like this:
namespaced resources.

The code then starts to iterate over the keys of the map, hence over the types of cluster-wide Kubernetes resources that are targeted by the policies. This is done
[here](https://github.com/kubewarden/audit-scanner/blob/038da594f989f97420bf235979ae1e60335303e6/internal/scanner/scanner.go#L223).
inside of the `ScanClusterWideResources` method of `Scanner`.
The code will get all the resources of that type. The resources are fetched with pagination to reduce the memory usage and the load on the Kuberentes API server.

> Note: the order by which the keys are iterated is not deterministic.
The code processes each chunk of resources, and for each resource it invokes the [`auditClusterResource`](https://github.com/kubewarden/audit-scanner/blob/038da594f989f97420bf235979ae1e60335303e6/internal/scanner/scanner.go#L246)
method.
The code processes each chunk of resources, and for each resource it invokes the `auditClusterResource` method of `Scanner`.

> **Important:** this portion of the code is parallelized
>
> For example, assuming the code is auditing the `Namespace` resource kind, and there are 20k namespaces in the cluster,
> the pool of workers will evaluate `100` namespaces in parallel. The size of the worker pool is currently hard coded to
> [`here`](https://github.com/kubewarden/audit-scanner/blob/038da594f989f97420bf235979ae1e60335303e6/internal/scanner/scanner.go#L32).
> the pool of workers will evaluate `100` namespaces in parallel. The size of the worker pool is configured with the `--parallel-resources` flag.
The [`auditClusterResource`](https://github.com/kubewarden/audit-scanner/blob/038da594f989f97420bf235979ae1e60335303e6/internal/scanner/scanner.go#L325) function
takes as input a Kubernetes resource (e.g.: a specific `Namespace` object) and all the policies that target that kind of resource (e.g.: kubernetes `Namespace` objects).
The `auditClusterResource` function takes as input a Kubernetes resource (e.g.: a specific `Namespace` object) and all the policies that target that kind of resource (e.g.: kubernetes `Namespace` objects).
The code then iterates over the list of policies and, for each one performs the following actions:

- Skip the policy if it doesn't target the specific object. This could happen because of labels selectors set on the policy
- Create a fake `CREATE` admission request object for that resource, send it to the Policy Server that hosts the policy, and get the response

> **Note:** this part of the code is not concurrent. Each policy is evaulated sequentially, one at a time. This is something that could be improved in the future.
> **Important:** this portion of the code is parallelized. The number of parallel policies to be evaluated is configured with the `--parallel-policies` flag.
Once all the policies interested about the specific Kubernetes object have been processed, a `ClusterPolicyReport` object is created.
Depending on how the `audit-scanner` process was started, the `ClusterPolicyReport` object is either written into etcd or is printed on the standard output.

### Scanning namespaced resources

The code starts by getting a list of all the `Namespace` objects in the cluster, except the ones manually excluded by the user.
See [here](https://github.com/kubewarden/audit-scanner/blob/038da594f989f97420bf235979ae1e60335303e6/internal/scanner/scanner.go#L183).
This is done inside of the `ScanAllNamespaces` method of `Scanner`.

For each namespace, the code invokes the [`ScanNamespace`](https://github.com/kubewarden/audit-scanner/blob/038da594f989f97420bf235979ae1e60335303e6/internal/scanner/scanner.go#L120)
method.
For each namespace, the code invokes the `ScanNamespace` method.

> **Note:** this part of the code is not concurrent. Each Namespace is evaluated sequentially. This is something that could be improved in the future.
> **Important:** this portion of the code is parallelized. The number of parallel policies to be evaluated is configured with the `--parallel-namespaces` flag.
The code uses the [`GetPoliciesForANamespace`](https://github.com/kubewarden/audit-scanner/blob/038da594f989f97420bf235979ae1e60335303e6/internal/policies/client.go#L61) method
The code uses the `GetPoliciesForANamespace` method
to build a map with the Kubernetes resource as key, and the policies targeting that resource as value.
This map is similar to the one created for the cluster-wide resources. However, in this case the types of policies associated with a Kubernetes
resource could be both `ClusterAdmissionPolicy` and `NamespaceAdmissionPolicy`.

The code then iterates over the keys of the map, hence over the types of namespaced Kubernetes resources that are targeted by the policies. This is done exactly like
with when evaluating the cluster-wide resources. See [here](https://github.com/kubewarden/audit-scanner/blob/038da594f989f97420bf235979ae1e60335303e6/internal/scanner/scanner.go#L140-L170).
with when evaluating the cluster-wide resources.
This is done inside of the `ScanNamespace` method of `Scanner`.
59 changes: 48 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@ See [Querying the reports](#querying-the-reports) for more information.
audit-scanner [flags]

Flags:
-c, --cluster scan cluster wide resources
--disable-store disable storing the results in the k8s cluster
-f, --extra-ca string File path to CA cert in PEM format of PolicyServer endpoints
-h, --help help for audit-scanner
-i, --ignore-namespaces strings comma separated list of namespace names to be skipped from scan. This flag can be repeated
--insecure-ssl skip SSL cert validation when connecting to PolicyServers endpoints. Useful for development
-k, --kubewarden-namespace string namespace where the Kubewarden components (e.g. PolicyServer) are installed (required) (default "kubewarden")
-l, --loglevel string level of the logs. Supported values are: [trace debug info warn error fatal] (default "info")
-n, --namespace string namespace to be evaluated
-o, --output-scan print result of scan in JSON to stdout
-u, --policy-server-url string URI to the PolicyServers the Audit Scanner will query. Example: https://localhost:3000. Useful for out-of-cluster debugging
-c, --cluster scan cluster wide resources
--disable-store disable storing the results in the k8s cluster
-f, --extra-ca string File path to CA cert in PEM format of PolicyServer endpoints
-h, --help help for audit-scanner
-i, --ignore-namespaces strings comma separated list of namespace names to be skipped from scan. This flag can be repeated
--insecure-ssl skip SSL cert validation when connecting to PolicyServers endpoints. Useful for development
-k, --kubewarden-namespace string namespace where the Kubewarden components (e.g. PolicyServer) are installed (required) (default "kubewarden")
-l, --loglevel string level of the logs. Supported values are: [trace debug info warn error fatal] (default "info")
-n, --namespace string namespace to be evaluated
-o, --output-scan print result of scan in JSON to stdout
--page-size int number of resources to fetch from the Kubernetes API server when paginating (default 100)
--parallel-namespaces int number of Namespaces to scan in parallel (default 1)
--parallel-policies int number of policies to evaluate for a given resource in parallel (default 5)
--parallel-resources int number of resources to scan in parallel (default 100)
-u, --policy-server-url string URI to the PolicyServers the Audit Scanner will query. Example: https://localhost:3000. Useful for out-of-cluster debugging
```

## Examples
Expand All @@ -56,6 +60,39 @@ Disable storing the results in etcd and print the reports to stdout in JSON form
audit-scanner --kubewarden-namespace kubewarden --disable-store --output-scan
```

## Tuning

The audit scanner works by entering each Namespace of the cluster and finding all the policies that are "looking" at the contents of the Namespace.
It then identifies all the resource types that are relevant to these policies (e.g. Deployments, Pods, etc.) and iterates over each resource type.

When looking into a specific type of resource, audit-scanner fetches these objects in chunks. The size of the chunk can be set using the `--page-size` flag.
The scanner fetches one chunk of resources, then iterates over each one of them, evaluating all the policies that are looking at that specific resource.

Each iteration step can be done in parallel. The number of Namespaces to be evaluated at the same time can be set using the `--parallel-namespaces` flag.
The number of resources to be evaluated at the same time can be set using the `--parallel-resources` flag.
When evaluating the policies for a specific resource, the number of policies to be evaluated at the same time can be set using the `--parallel-policies` flag.

A concrete example:

- We have 5 namespaces, each with 1000 Pods.
- We have 10 `ClusterAdmissionPolicy` resources that are looking at Pods.
- We have set `--page-size=200`, `--parallel-namespaces=2`, `--parallel-resources=100`, and `--parallel-policies=5`.

The scanner will:

- Work on 2 Namespaces at the same time.
- Inside of each Namespace:
- Fetch 200 Pods at the same time (`--page-size=200`).
- Evaluate 100 Pods at the same time (`--parallel-resources=100`).
- Evaluate 5 policies at the same time (`--parallel-policies=5`).

Things to consider:

- The pagination size has a direct impact on
- The number of API calls that the scanner will make.
- The amount of memory that the scanner will use.
- The maximum number of outgoing evaluation requests is the product of `--parallel-namespaces`, `--parallel-resources`, and `--parallel-policies`.

# Querying the reports

Using the `kubectl` command line tool, you can query the results of the scan:
Expand Down
27 changes: 25 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@ There will be a ClusterPolicyReport with results for cluster-wide resources.`,
if err != nil {
return err
}
parallelNamespacesAudits, err := cmd.Flags().GetInt("parallel-namespaces")
if err != nil {
return err
}
parallelResourcesAudits, err := cmd.Flags().GetInt("parallel-resources")
if err != nil {
return err
}
parallelPoliciesAudit, err := cmd.Flags().GetInt("parallel-policies")
if err != nil {
return err
}
pageSize, err := cmd.Flags().GetInt("page-size")
if err != nil {
return err
}

config := ctrl.GetConfigOrDie()
dynamicClient := dynamic.NewForConfigOrDie(config)
Expand All @@ -83,13 +99,16 @@ There will be a ClusterPolicyReport with results for cluster-wide resources.`,
if err != nil {
return err
}
k8sClient, err := k8s.NewClient(dynamicClient, clientset, kubewardenNamespace, skippedNs)
k8sClient, err := k8s.NewClient(dynamicClient, clientset, kubewardenNamespace, skippedNs, int64(pageSize))
if err != nil {
return err
}
policyReportStore := report.NewPolicyReportStore(client)

scanner, err := scanner.NewScanner(policiesClient, k8sClient, policyReportStore, outputScan, disableStore, insecureSSL, caCertFile)
scanner, err := scanner.NewScanner(policiesClient, k8sClient, policyReportStore, outputScan, disableStore, insecureSSL, caCertFile,
parallelNamespacesAudits,
parallelResourcesAudits,
parallelPoliciesAudit)
if err != nil {
return err
}
Expand Down Expand Up @@ -145,4 +164,8 @@ func init() {
rootCmd.Flags().BoolVar(&insecureSSL, "insecure-ssl", false, "skip SSL cert validation when connecting to PolicyServers endpoints. Useful for development")
rootCmd.Flags().StringP("extra-ca", "f", "", "File path to CA cert in PEM format of PolicyServer endpoints")
rootCmd.Flags().BoolVar(&disableStore, "disable-store", false, "disable storing the results in the k8s cluster")
rootCmd.Flags().IntP("parallel-namespaces", "", 1, "number of Namespaces to scan in parallel")
rootCmd.Flags().IntP("parallel-resources", "", 100, "number of resources to scan in parallel")
rootCmd.Flags().IntP("parallel-policies", "", 5, "number of policies to evaluate for a given resource in parallel")
rootCmd.Flags().IntP("page-size", "", 100, "number of resources to fetch from the Kubernetes API server when paginating")
}
9 changes: 5 additions & 4 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"k8s.io/client-go/tools/pager"
)

const pageSize = 100

// A client to get resources and namespaces from a Kubernetes cluster
type Client struct {
// dynamicClient is used to get resource lists
Expand All @@ -28,16 +26,19 @@ type Client struct {
clientset kubernetes.Interface
// list of skipped namespaces from audit, by name. It includes kubewardenNamespace
skippedNs []string
// pageSize is the number of resources to fetch when paginating
pageSize int64
}

// NewClient returns a new client
func NewClient(dynamicClient dynamic.Interface, clientset kubernetes.Interface, kubewardenNamespace string, skippedNs []string) (*Client, error) {
func NewClient(dynamicClient dynamic.Interface, clientset kubernetes.Interface, kubewardenNamespace string, skippedNs []string, pageSize int64) (*Client, error) {
skippedNs = append(skippedNs, kubewardenNamespace)

return &Client{
dynamicClient,
clientset,
skippedNs,
pageSize,
}, nil
}

Expand Down Expand Up @@ -70,7 +71,7 @@ func (f *Client) GetResources(gvr schema.GroupVersionResource, nsName string) (*
return resources, nil
})

listPager.PageSize = pageSize
listPager.PageSize = f.pageSize
return listPager, nil
}

Expand Down
4 changes: 3 additions & 1 deletion internal/k8s/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"k8s.io/client-go/kubernetes/scheme"
)

const pageSize = 100

func TestGetResources(t *testing.T) {
var pods []runtime.Object
for i := 0; i < pageSize+5; i++ {
Expand All @@ -26,7 +28,7 @@ func TestGetResources(t *testing.T) {
dynamicClient := dynamicFake.NewSimpleDynamicClient(scheme.Scheme, pods...)
clientset := fake.NewSimpleClientset()

k8sClient, err := NewClient(dynamicClient, clientset, "kubewarden", nil)
k8sClient, err := NewClient(dynamicClient, clientset, "kubewarden", nil, pageSize)
require.NoError(t, err)

pager, err := k8sClient.GetResources(schema.GroupVersionResource{
Expand Down
Loading

0 comments on commit de424bb

Please sign in to comment.