Skip to content

Commit

Permalink
feat(exporter): add a unified all.zip (#2731)
Browse files Browse the repository at this point in the history
Add a ZIP file containing records from all ecosystems to solve
#2423. Write everything to the
default work directory for reuse by the new unified all.zip (instead of
using temporary directories). Clean up the work directory afterward.
  • Loading branch information
hogo6002 authored Oct 16, 2024
1 parent 033ea44 commit 8edd2db
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 37 deletions.
55 changes: 44 additions & 11 deletions docker/exporter/export_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
"""OSV Exporter."""
import argparse
import concurrent.futures
import glob
import logging
import os
import subprocess
import tempfile
import zipfile as z

from google.cloud import ndb
from google.cloud import ndb, storage

from exporter import upload_single
import osv
import osv.logs

Expand All @@ -44,20 +48,24 @@ def main():
default=os.cpu_count() or DEFAULT_EXPORT_PROCESSES)
args = parser.parse_args()

query = osv.Bug.query(projection=[osv.Bug.ecosystem], distinct=True)
ecosystems = [bug.ecosystem[0] for bug in query if bug.ecosystem] + ['list']

# Set TMPDIR to change the tempfile default directory
tmp_dir = os.path.join(args.work_dir, 'tmp')
os.makedirs(tmp_dir, exist_ok=True)
os.environ['TMPDIR'] = tmp_dir

query = osv.Bug.query(projection=[osv.Bug.ecosystem], distinct=True)
ecosystems = [bug.ecosystem[0] for bug in query if bug.ecosystem] + ['list']

with concurrent.futures.ThreadPoolExecutor(
max_workers=args.processes) as executor:
for eco in ecosystems:
# Skip exporting data for child ecosystems (e.g., 'Debian:11').
if ':' in eco:
continue
executor.submit(spawn_ecosystem_exporter, args.work_dir, args.bucket, eco)
with tempfile.TemporaryDirectory() as export_dir:
with concurrent.futures.ThreadPoolExecutor(
max_workers=args.processes) as executor:
for eco in ecosystems:
# Skip exporting data for child ecosystems (e.g., 'Debian:11').
if ':' in eco:
continue
executor.submit(spawn_ecosystem_exporter, export_dir, args.bucket, eco)
# Upload a ZIP file containing records from all ecosystems.
aggregate_all_vulnerabilities(export_dir, args.bucket)


def spawn_ecosystem_exporter(work_dir: str, bucket: str, eco: str):
Expand All @@ -74,6 +82,31 @@ def spawn_ecosystem_exporter(work_dir: str, bucket: str, eco: str):
logging.error('Export of %s failed with Exit Code: %d', eco, return_code)


def aggregate_all_vulnerabilities(work_dir: str, export_bucket: str):
"""
Aggregates vulnerability records from each ecosystem into a single zip
file and uploads it to the export bucket.
"""
logging.info('Generating unified all.zip archive.')
zip_file_name = 'all.zip'
output_zip = os.path.join(work_dir, zip_file_name)
all_vulns = {}

for file_path in glob.glob(
os.path.join(work_dir, '**/*.json'), recursive=True):
all_vulns[os.path.basename(file_path)] = file_path

with z.ZipFile(output_zip, 'a', z.ZIP_DEFLATED) as all_zip:
for vuln_filename in sorted(all_vulns):
file_path = all_vulns[vuln_filename]
all_zip.write(file_path, os.path.basename(file_path))

storage_client = storage.Client()
bucket = storage_client.get_bucket(export_bucket)
upload_single(bucket, output_zip, zip_file_name)
logging.info('Unified all.zip uploaded successfully.')


if __name__ == '__main__':
_ndb_client = ndb.Client()
osv.logs.setup_gcp_logging('exporter-runner')
Expand Down
50 changes: 24 additions & 26 deletions docker/exporter/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
import concurrent.futures
import logging
import os
import tempfile
import zipfile
from typing import List

from google.cloud import ndb
from google.cloud import storage
from google.cloud.storage import retry
from google.cloud.storage.bucket import Bucket

import osv
import osv.logs
Expand All @@ -50,20 +50,9 @@ def run(self):
#TODO(gongh@): remove all ecosystem releases from ecosystem.txt
# after notifying users.
ecosystems = [bug.ecosystem[0] for bug in query if bug.ecosystem]
with tempfile.TemporaryDirectory() as tmp_dir:
self._export_ecosystem_list_to_bucket(ecosystems, tmp_dir)
self._export_ecosystem_list_to_bucket(ecosystems, self._work_dir)
else:
with tempfile.TemporaryDirectory() as tmp_dir:
self._export_ecosystem_to_bucket(self._ecosystem, tmp_dir)

def upload_single(self, bucket, source_path, target_path):
"""Upload a single file to a GCS bucket."""
logging.info('Uploading %s', target_path)
try:
blob = bucket.blob(target_path)
blob.upload_from_filename(source_path, retry=retry.DEFAULT_RETRY)
except Exception as e:
logging.exception('Failed to export: %s', e)
self._export_ecosystem_to_bucket(self._ecosystem, self._work_dir)

def _export_ecosystem_list_to_bucket(self, ecosystems: List[str],
tmp_dir: str):
Expand All @@ -83,14 +72,14 @@ def _export_ecosystem_list_to_bucket(self, ecosystems: List[str],
with open(ecosystems_file_path, "w") as ecosystems_file:
ecosystems_file.writelines([e + "\n" for e in ecosystems])

self.upload_single(bucket, ecosystems_file_path, ECOSYSTEMS_FILE)
upload_single(bucket, ecosystems_file_path, ECOSYSTEMS_FILE)

def _export_ecosystem_to_bucket(self, ecosystem: str, tmp_dir: str):
def _export_ecosystem_to_bucket(self, ecosystem: str, work_dir: str):
"""Export the vulnerabilities in an ecosystem to GCS.
Args:
ecosystem: the ecosystem name
tmp_dir: temporary directory for scratch
work_dir: working directory for scratch
This simultaneously exports every Bug for the given ecosystem to individual
files in the scratch filesystem, and a zip file in the scratch filesystem.
Expand All @@ -102,7 +91,9 @@ def _export_ecosystem_to_bucket(self, ecosystem: str, tmp_dir: str):
storage_client = storage.Client()
bucket = storage_client.get_bucket(self._export_bucket)

zip_path = os.path.join(tmp_dir, 'all.zip')
ecosystem_dir = os.path.join(work_dir, ecosystem)
os.makedirs(ecosystem_dir, exist_ok=True)
zip_path = os.path.join(ecosystem_dir, 'all.zip')
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
files_to_zip = []

Expand All @@ -112,7 +103,7 @@ def _export_to_file_and_zipfile(bug):
if not bug.public or bug.status == osv.BugStatus.UNPROCESSED:
return

file_path = os.path.join(tmp_dir, bug.id() + '.json')
file_path = os.path.join(ecosystem_dir, bug.id() + '.json')
vulnerability = yield bug.to_vulnerability_async(include_source=True)
osv.write_vulnerability(vulnerability, file_path)

Expand All @@ -129,13 +120,23 @@ def _export_to_file_and_zipfile(bug):

with concurrent.futures.ThreadPoolExecutor(
max_workers=_EXPORT_WORKERS) as executor:
# Note: all.zip is included here
for filename in os.listdir(tmp_dir):
executor.submit(self.upload_single, bucket,
os.path.join(tmp_dir, filename),
# Note: the individual ecosystem all.zip is included here
for filename in os.listdir(ecosystem_dir):
executor.submit(upload_single, bucket,
os.path.join(ecosystem_dir, filename),
f'{ecosystem}/{filename}')


def upload_single(bucket: Bucket, source_path: str, target_path: str):
"""Upload a single file to a GCS bucket."""
logging.info('Uploading %s', target_path)
try:
blob = bucket.blob(target_path)
blob.upload_from_filename(source_path, retry=retry.DEFAULT_RETRY)
except Exception as e:
logging.exception('Failed to export: %s', e)


def main():
parser = argparse.ArgumentParser(description='Exporter')
parser.add_argument(
Expand All @@ -151,9 +152,6 @@ def main():
'to export the ecosystem.txt file')
args = parser.parse_args()

tmp_dir = os.path.join(args.work_dir, 'tmp')
os.environ['TMPDIR'] = tmp_dir

exporter = Exporter(args.work_dir, args.bucket, args.ecosystem)
exporter.run()

Expand Down

0 comments on commit 8edd2db

Please sign in to comment.