-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #7 from seantis/import-risk-excel
Import risk excel
- Loading branch information
Showing
4 changed files
with
345 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
name: Docker | ||
|
||
# This workflow uses actions that are not certified by GitHub. | ||
# They are provided by a third-party and are governed by | ||
# separate terms of service, privacy policy, and support | ||
# documentation. | ||
|
||
on: | ||
schedule: | ||
- cron: '16 11 * * *' | ||
push: | ||
branches: [ "main" ] | ||
# Publish semver tags as releases. | ||
tags: [ 'v*.*.*' ] | ||
pull_request: | ||
branches: [ "main" ] | ||
|
||
env: | ||
# Use docker.io for Docker Hub if empty | ||
REGISTRY: ghcr.io | ||
# github.repository as <account>/<repo> | ||
IMAGE_NAME: ${{ github.repository }} | ||
|
||
|
||
jobs: | ||
build: | ||
|
||
runs-on: ubuntu-latest | ||
permissions: | ||
contents: read | ||
packages: write | ||
# This is used to complete the identity challenge | ||
# with sigstore/fulcio when running outside of PRs. | ||
id-token: write | ||
|
||
steps: | ||
- name: Checkout repository | ||
uses: actions/checkout@v3 | ||
|
||
# Install the cosign tool except on PR | ||
# https://github.com/sigstore/cosign-installer | ||
- name: Install cosign | ||
if: github.event_name != 'pull_request' | ||
uses: sigstore/cosign-installer@6e04d228eb30da1757ee4e1dd75a0ec73a653e06 #v3.1.1 | ||
with: | ||
cosign-release: 'v2.1.1' | ||
|
||
# Set up BuildKit Docker container builder to be able to build | ||
# multi-platform images and export cache | ||
# https://github.com/docker/setup-buildx-action | ||
- name: Set up Docker Buildx | ||
uses: docker/setup-buildx-action@f95db51fddba0c2d1ec667646a06c2ce06100226 # v3.0.0 | ||
|
||
# Login against a Docker registry except on PR | ||
# https://github.com/docker/login-action | ||
- name: Log into registry ${{ env.REGISTRY }} | ||
if: github.event_name != 'pull_request' | ||
uses: docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d # v3.0.0 | ||
with: | ||
registry: ${{ env.REGISTRY }} | ||
username: ${{ github.actor }} | ||
password: ${{ secrets.GITHUB_TOKEN }} | ||
|
||
# Extract metadata (tags, labels) for Docker | ||
# https://github.com/docker/metadata-action | ||
- name: Extract Docker metadata | ||
id: meta | ||
uses: docker/metadata-action@96383f45573cb7f253c731d3b3ab81c87ef81934 # v5.0.0 | ||
with: | ||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} | ||
|
||
# Build and push Docker image with Buildx (don't push on PR) | ||
# https://github.com/docker/build-push-action | ||
- name: Build and push Docker image | ||
id: build-and-push | ||
uses: docker/build-push-action@0565240e2d4ab88bba5387d719585280857ece09 # v5.0.0 | ||
with: | ||
context: . | ||
push: ${{ github.event_name != 'pull_request' }} | ||
tags: ${{ steps.meta.outputs.tags }} | ||
labels: ${{ steps.meta.outputs.labels }} | ||
cache-from: type=gha | ||
cache-to: type=gha,mode=max | ||
|
||
# Sign the resulting Docker image digest except on PRs. | ||
# This will only write to the public Rekor transparency log when the Docker | ||
# repository is public to avoid leaking data. If you would like to publish | ||
# transparency data even for private images, pass --force to cosign below. | ||
# https://github.com/sigstore/cosign | ||
- name: Sign the published Docker image | ||
if: ${{ github.event_name != 'pull_request' }} | ||
env: | ||
# https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable | ||
TAGS: ${{ steps.meta.outputs.tags }} | ||
DIGEST: ${{ steps.build-and-push.outputs.digest }} | ||
# This step uses the identity token to provision an ephemeral certificate | ||
# against the sigstore community Fulcio instance. | ||
run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,244 @@ | ||
""" | ||
Import risk excel 🕸️ into RiskMatrix ✨ | ||
This script is specific to our sitation at seantis. The script is included | ||
anyway, you might adjust it to import the excel at your organization too. | ||
""" | ||
import argparse | ||
import sys | ||
import traceback | ||
from datetime import datetime | ||
from typing import TYPE_CHECKING | ||
from typing import Iterator | ||
|
||
try: | ||
from openpyxl import load_workbook | ||
except ImportError: | ||
print("Excel import requires openpyxl library. Install with:\n") | ||
print("$ pip install openpyxl") | ||
print() | ||
sys.exit(1) | ||
|
||
import sqlalchemy | ||
from pyramid.paster import bootstrap | ||
from pyramid.paster import get_appsettings | ||
from sqlalchemy import select | ||
|
||
from riskmatrix.models import Asset | ||
from riskmatrix.models import Organization | ||
from riskmatrix.models import Risk | ||
from riskmatrix.models import RiskAssessment | ||
from riskmatrix.models import RiskCatalog | ||
from riskmatrix.orm import Base | ||
from riskmatrix.orm import get_engine | ||
from riskmatrix.scripts.util import select_existing_organization | ||
|
||
if TYPE_CHECKING: | ||
|
||
from typing import TypedDict | ||
|
||
from sqlalchemy.orm import Session | ||
|
||
class RiskDetails(TypedDict): | ||
""" A risk extracted from the excel. """ | ||
name: str | ||
category: str | ||
asset_name: str | ||
desc: str | ||
likelihood: int | ||
impact: int | ||
|
||
|
||
def parse_args(argv: list[str]) -> argparse.Namespace: | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument( | ||
'config_uri', | ||
help='Configuration file, e.g., development.ini', | ||
) | ||
parser.add_argument( | ||
'catalog', | ||
help='Risk catalog excel file, e.g., catalog.xlsx', | ||
) | ||
return parser.parse_args(argv[1:]) | ||
|
||
|
||
def get_or_create_asset( | ||
asset_name: str, | ||
organization: Organization, | ||
session: 'Session' | ||
) -> Asset: | ||
|
||
q = select(Asset).where( | ||
Asset.organization_id == organization.id, | ||
Asset.name == asset_name | ||
) | ||
|
||
if asset := session.scalars(q).one_or_none(): | ||
return asset | ||
|
||
asset = Asset(asset_name, organization) | ||
asset.organization_id = organization.id | ||
session.add(asset) | ||
return asset | ||
|
||
|
||
def get_or_create_risk( | ||
risk_name: str, | ||
catalog: RiskCatalog, | ||
session: 'Session' | ||
) -> Risk: | ||
|
||
q = select(Risk).where( | ||
Risk.organization_id == catalog.organization.id, | ||
Risk.name == risk_name | ||
) | ||
|
||
if risk := session.scalars(q).one_or_none(): | ||
return risk | ||
|
||
risk = Risk(risk_name, catalog) | ||
session.add(risk) | ||
return risk | ||
|
||
|
||
def get_or_create_risk_assessment( | ||
risk: Risk, | ||
asset: Asset, | ||
session: 'Session' | ||
) -> RiskAssessment: | ||
|
||
q = select(RiskAssessment).where( | ||
RiskAssessment.risk_id == risk.id, | ||
RiskAssessment.asset_id == asset.id, | ||
) | ||
|
||
if assessment := session.scalars(q).one_or_none(): | ||
return assessment | ||
|
||
assessment = RiskAssessment(risk=risk, asset=asset) | ||
session.add(assessment) | ||
return assessment | ||
|
||
|
||
def populate_catalog( | ||
catalog: RiskCatalog, | ||
risks: 'Iterator[RiskDetails]', | ||
session: 'Session' | ||
) -> None: | ||
|
||
for risk_details in risks: | ||
asset = get_or_create_asset( | ||
risk_details['asset_name'], catalog.organization, session | ||
) | ||
|
||
risk = get_or_create_risk( | ||
risk_details['name'], catalog, session | ||
) | ||
risk.category = risk_details['category'] | ||
risk.description = risk_details['desc'] | ||
|
||
assessment = get_or_create_risk_assessment(risk, asset, session) | ||
assessment.likelihood = risk_details['likelihood'] | ||
assessment.impact = risk_details['impact'] | ||
|
||
|
||
def risks_from_excel( | ||
excel_file: str, | ||
sheet_name: str = 'Risikokatalog' | ||
) -> 'Iterator[RiskDetails]': | ||
""" | ||
Load risks from excel. | ||
""" | ||
workbook = load_workbook(excel_file, read_only=True) | ||
|
||
sheet = workbook[sheet_name] | ||
|
||
# Rows are vertically grouped into sections by a category. A section begins | ||
# with a row that contains the category name but is otherwise empty. | ||
current_category = None | ||
|
||
# Header row sometimes spans over two rows (combined), sometimes only one. | ||
# Anyway, actual riks rows will start after row #2. | ||
start_after_row = 2 | ||
|
||
iterator = sheet.iter_rows( | ||
values_only=True, | ||
min_row=start_after_row | ||
) | ||
|
||
for row in iterator: | ||
nr = row[0] | ||
name = row[1] | ||
|
||
is_empty_row = not (nr or name) | ||
is_category_row = not nr and name | ||
|
||
if is_empty_row: | ||
continue | ||
elif is_category_row: | ||
current_category = name | ||
continue | ||
|
||
yield { | ||
'name': str(name), | ||
'category': str(current_category), | ||
'asset_name': str(row[2]), | ||
'desc': str(row[3]), | ||
'likelihood': int(str(row[7])), | ||
'impact': int(str(row[8])) | ||
} | ||
|
||
# readonly mode forces us to manually close the workbook, see also: | ||
# https://openpyxl.readthedocs.io/en/stable/optimized.html#read-only-mode | ||
workbook.close() | ||
|
||
|
||
def main(argv: list[str] = sys.argv) -> None: | ||
args = parse_args(argv) | ||
|
||
with bootstrap(args.config_uri) as env: | ||
settings = get_appsettings(args.config_uri) | ||
|
||
engine = get_engine(settings) | ||
Base.metadata.create_all(engine) | ||
|
||
with env['request'].tm: | ||
dbsession = env['request'].dbsession | ||
|
||
print('Organization to attach risk catalog to') | ||
|
||
org = select_existing_organization(dbsession) | ||
|
||
if not org: | ||
return | ||
|
||
today = datetime.today().strftime('%Y-%m-%d') | ||
|
||
catalog = RiskCatalog( | ||
'seantis risk register', | ||
organization=org, | ||
description=f'Imported from risk excel on {today}.' | ||
) | ||
|
||
catalog.organization_id = org.id | ||
|
||
try: | ||
populate_catalog( | ||
catalog, | ||
risks_from_excel(args.catalog), | ||
dbsession | ||
) | ||
except sqlalchemy.exc.IntegrityError: | ||
print('Failed to import excel, aborting.') | ||
print(traceback.format_exc()) | ||
dbsession.rollback() | ||
sys.exit(1) | ||
else: | ||
print( | ||
f'Successfully populated risk catalog "{catalog.name}" ' | ||
'from risk register excel.' | ||
) | ||
|
||
|
||
if __name__ == '__main__': | ||
main(sys.argv) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters