-
Notifications
You must be signed in to change notification settings - Fork 606
Expand file tree
/
Copy pathtest_nvd_api.py
More file actions
116 lines (101 loc) · 4.24 KB
/
test_nvd_api.py
File metadata and controls
116 lines (101 loc) · 4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# Copyright (C) 2021 Intel Corporation
# SPDX-License-Identifier: GPL-3.0-or-later
import os
import shutil
import tempfile
from datetime import datetime, timedelta
from test.utils import EXTERNAL_SYSTEM, SYNCHRONOUS
from pathlib import Path
import pytest
from cve_bin_tool.cvedb import CVEDB
from cve_bin_tool.data_sources import nvd_source
from cve_bin_tool.nvd_api import NVD_API
class TestNVD_API:
@classmethod
def setup_class(cls):
cls.outdir = tempfile.mkdtemp(prefix="cvedb-api-")
@classmethod
def teardown_class(cls):
shutil.rmtree(cls.outdir)
@pytest.mark.asyncio
@pytest.mark.skipif(
not EXTERNAL_SYSTEM() or not os.getenv("nvd_api_key"),
reason="NVD tests run only when EXTERNAL_SYSTEM=1",
)
async def test_get_nvd_params(self):
"""Test NVD for a future date. It should be empty"""
nvd_api = NVD_API(api_key=os.getenv("nvd_api_key") or "")
await nvd_api.get_nvd_params(
time_of_last_update=(datetime.now() + timedelta(days=2))
)
await nvd_api.get()
assert nvd_api.total_results == 0 and nvd_api.all_cve_entries == []
@pytest.mark.asyncio
@pytest.mark.skipif(
not EXTERNAL_SYSTEM() or not os.getenv("nvd_api_key"),
reason="NVD tests run only when EXTERNAL_SYSTEM=1",
)
async def test_total_results_count(self):
"""Total results should be greater than or equal to the current fetched cves"""
nvd_api = NVD_API(api_key=os.getenv("nvd_api_key") or "")
await nvd_api.get_nvd_params(
time_of_last_update=datetime.now() - timedelta(days=2)
)
await nvd_api.get()
assert len(nvd_api.all_cve_entries) >= nvd_api.total_results
@pytest.mark.asyncio
@pytest.mark.skipif(
not EXTERNAL_SYSTEM() or not os.getenv("nvd_api_key"),
reason="NVD tests run only when EXTERNAL_SYSTEM=1",
)
async def test_nvd_incremental_update(self):
"""Test to check whether we are able to fetch and save the nvd entries using time_of_last_update"""
nvd_api = NVD_API(
incremental_update=True, api_key=os.getenv("nvd_api_key") or ""
)
await nvd_api.get_nvd_params(
time_of_last_update=datetime.now() - timedelta(days=4)
)
await nvd_api.get()
source_nvd = nvd_source.NVD_Source()
cvedb = CVEDB(cachedir=self.outdir)
cvedb.data = [(source_nvd.format_data(nvd_api.all_cve_entries), "NVD")]
cvedb.init_database()
cvedb.populate_db()
cvedb.check_cve_entries()
assert cvedb.cve_count == nvd_api.total_results
@pytest.mark.asyncio
@pytest.mark.skipif(
not EXTERNAL_SYSTEM() or not os.getenv("nvd_api_key"),
reason="NVD tests run only when EXTERNAL_SYSTEM=1",
)
async def test_empty_nvd_result(self):
"""Test to check nvd results non-empty result. Total result should be greater than 0"""
nvd_api = NVD_API(api_key=os.getenv("nvd_api_key") or "")
await nvd_api.get_nvd_params()
assert nvd_api.total_results > 0
@pytest.mark.asyncio
@pytest.mark.skip(reason="NVD does not return the Received count")
async def test_api_cve_count(self):
"""Test to match the totalResults and the total CVE count on NVD"""
nvd_api = NVD_API(api_key=os.getenv("nvd_api_key") or "")
await nvd_api.get_nvd_params()
await nvd_api.load_nvd_request(0)
cve_count = await nvd_api.nvd_count_metadata(nvd_api.session)
# Difference between the total and rejected CVE count on NVD should be equal to the total CVE count
# Received CVE count might be zero
assert (
abs(nvd_api.total_results - (cve_count["Total"] - cve_count["Rejected"]))
<= cve_count["Received"]
)
@pytest.mark.asyncio
@pytest.mark.skipif(not EXTERNAL_SYSTEM(), reason="Needs network connection.")
@pytest.mark.skipif(not SYNCHRONOUS(), reason="Database-modifying test, run synchronously")
async def test_fetch_api(self):
await self.nvd.fetch_cves()
assert Path(self.nvd.nvd_api_file).exists()
# Test extract to db
cvedb = CVEDB(cachedir=self.outdir)
cvedb.init_database()
cvedb.populate_db()
cvedb.db_close()