import os
import shutil
from dotenv import load_dotenv
if not os.path.exists(os.path.expanduser("~/.cvexplore")):
os.mkdir(os.path.expanduser("~/.cvexplore"))
user_wd = os.path.expanduser("~/.cvexplore")
if not os.path.exists(os.path.join(user_wd, ".env")):
shutil.copyfile(
os.path.join(os.path.dirname(__file__), "common/.env_example"),
os.path.join(user_wd, ".env"),
)
load_dotenv(os.path.join(user_wd, ".env"))
if not os.path.exists(os.path.join(user_wd, ".sources.ini")):
shutil.copyfile(
os.path.join(os.path.dirname(__file__), "common/.sources.ini"),
os.path.join(user_wd, ".sources.ini"),
)
import functools
import json
import logging
import re
import warnings
from collections import defaultdict
from typing import List, Tuple, Union, Iterable, Type
from pymongo import DESCENDING
from CveXplore.api.connection.api_db import ApiDatabaseSource
from CveXplore.common.config import Configuration
from CveXplore.common.cpe_converters import create_cpe_regex_string
from CveXplore.common.db_mapping import database_mapping
from CveXplore.core.database_maintenance.main_updater import MainUpdater
from CveXplore.core.general.datasources import supported_datasources
from CveXplore.database.connection.database_connection import DatabaseConnection
from CveXplore.errors import DatabaseIllegalCollection
from CveXplore.errors.datasource import UnsupportedDatasourceException
from CveXplore.errors.validation import CveNumberValidationError
from CveXplore.objects.cvexplore_object import CveXploreObject
try:
from CveXplore.core.celery_task_handler.task_handler import TaskHandler
except ModuleNotFoundError:
# This is not a problem, because using the Celery backend is optional.
# This exception will be handled when loading the TaskHandler fails with
# a NameError, because we already have logging available there.
pass
def _version():
_PKG_DIR = os.path.dirname(__file__)
version_file = os.path.join(_PKG_DIR, "VERSION")
with open(version_file, "r") as fdsec:
VERSION = fdsec.read()
return VERSION
VERSION = __version__ = _version()
[docs]
class CveXplore(object):
"""
The CveXplore class is the main entry point for CveXplore. All functionality is available from straight from this
class or via the different subclasses / attributes.
Group:
Main
"""
[docs]
def __init__(self, **kwargs):
"""
Create a new instance of CveXplore
Examples:
>>> from CveXplore import CveXplore
>>> cvx = CveXplore(datasource_type="mongodb", datasource_connection_details={"host": "mongodb://127.0.0.1:27017"})
>>> cvx.version
'0.1.2'
>>> from CveXplore import CveXplore
>>> cvx = CveXplore(datasource_type="api", datasource_connection_details={"address": ("mylocal.cve-search.int", 443), "api_path": "api"})
>>> cvx.version
'0.1.2'
Args:
kwargs['datasource_type']: Which datasource to query.
kwargs['datasource_connection_details']: Provide the connection details needed to establish a connection \
to the datasource. The connection details should be in line with the datasource's documentation.
"""
self.__version = VERSION
self._config = Configuration
self.logger = logging.getLogger(__name__)
# adjust self.config with kwargs parameters; lower case parameters are converted to uppercase and then changed
# in the self.config object
for each in kwargs:
setattr(self.config, each.upper(), kwargs[each])
self._datasource_type = self.config.DATASOURCE_TYPE
self._datasource_connection_details = self.config.DATASOURCE_CONNECTION_DETAILS
self._mongodb_connection_details = self.config.MONGODB_CONNECTION_DETAILS
self._api_connection_details = self.config.API_CONNECTION_DETAILS
os.environ["DOC_BUILD"] = json.dumps({"DOC_BUILD": "NO"})
self.logger.info(
f"Using {self.datasource_type} as datasource, connection details: {self.datasource_connection_details}"
)
if self.datasource_type not in supported_datasources:
raise UnsupportedDatasourceException(
f"Unsupported datasource selected: '{self.datasource_type}'; currently supported: {supported_datasources}"
)
if self.datasource_type == "api" and self.datasource_connection_details is None:
raise ValueError(
"Missing datasource_connection_details for selected datasource ('api')"
)
elif self.datasource_type == "api":
self.datasource_connection_details["user_agent"] = (
f"CveXplore:{self.version}"
)
if (
self.mongodb_connection_details is not None
and self.datasource_type == "mongodb"
):
self.logger.warning(
"The use of mongodb_connection_details is deprecated and will be removed in the 0.4 release, please "
"use datasource_connection_details instead"
)
self._datasource_connection_details = self.mongodb_connection_details
elif self.api_connection_details is not None and self.datasource_type == "api":
self.logger.warning(
"The use of api_connection_details is deprecated and will be removed in the 0.4 release, please "
"use datasource_connection_details instead"
)
self._datasource_connection_details = self.api_connection_details
else:
if self.datasource_type == "mongodb":
self._datasource_connection_details = {
"host": (
f"{self.config.DATASOURCE_PROTOCOL}://{self.config.DATASOURCE_HOST}:{self.config.DATASOURCE_PORT}"
if self.config.DATASOURCE_USER is None
and self.config.DATASOURCE_PASSWORD is None
else f"{self.config.DATASOURCE_PROTOCOL}://{self.config.DATASOURCE_USER}:{self.config.DATASOURCE_PASSWORD}@{self.config.DATASOURCE_HOST}:{self.config.DATASOURCE_PORT}"
),
"database": self.config.DATASOURCE_DBNAME,
}
elif self.datasource_type == "mysql":
self._datasource_connection_details = {
"sql_uri": self.config.SQLALCHEMY_DATABASE_URI
}
elif self.datasource_type == "api":
self._datasource_connection_details = {
"address": (
f"{self.config.DATASOURCE_HOST}",
int(f"{self.config.DATASOURCE_PORT}"),
),
"api_path": "api",
}
setattr(
self.config,
"DATASOURCE_CONNECTION_DETAILS",
self.datasource_connection_details,
)
self.datasource = DatabaseConnection(
database_type=self.datasource_type,
database_init_parameters=self.datasource_connection_details,
).database_connection
if self.datasource_type != "api":
self.database = MainUpdater(
datasource=self.datasource, datasource_type=self.datasource_type
)
self._database_mapping = database_mapping
from CveXplore.database.helpers.specific_db import (
CvesDatabaseFunctions,
CpeDatabaseFunctions,
CapecDatabaseFunctions,
CWEDatabaseFunctions,
)
self.cves = CvesDatabaseFunctions(collection="cves")
self.cpe = CpeDatabaseFunctions(collection="cpe")
self.capec = CapecDatabaseFunctions(collection="capec")
self.cwe = CWEDatabaseFunctions(collection="cwe")
try:
self.task_handler = TaskHandler()
except NameError:
self.logger.info(
"Failed to load Celery TaskHandler; "
"continuing without the optional Celery backend."
)
self.logger.info(f"Initialized CveXplore version: {self.version}")
@property
def config(self) -> Type[Configuration]:
"""
Returns Configuration object
Group:
Properties
"""
return self._config
@property
def datasource_type(self) -> str:
"""
Returns Datasource type
Group:
Properties
"""
return self._datasource_type
@property
def datasource_connection_details(self) -> dict:
"""
Returns Datasource Connection Details
Group:
Properties
"""
return self._datasource_connection_details
@property
def database_mapping(self) -> dict:
"""
Returns database mapping
Group:
Properties
"""
return self._database_mapping
@property
def mongodb_connection_details(self) -> dict:
"""
Returns Mongodb Connection Details
Group:
Properties
"""
warnings.warn(
"The use of mongodb_connection_details is deprecated and will be removed in the 0.4 release, "
"please use datasource_connection_details instead",
DeprecationWarning,
)
return self._mongodb_connection_details
@property
def api_connection_details(self) -> dict:
"""
Returns Api Connection Details
Group:
Properties
"""
warnings.warn(
"The use of api_connection_details is deprecated and will be removed in the 0.4 release, please "
"use datasource_connection_details instead",
DeprecationWarning,
)
return self._api_connection_details
@property
def version(self) -> str:
"""
Returns CveXplore version
Group:
Properties
"""
return self.__version
[docs]
@staticmethod
def where() -> str:
"""
Request the path where CveXplore is installed
Returns:
Path where CveXplore is installed
"""
return os.path.dirname(__file__)
[docs]
def get_single_store_entry(
self, entry_type: str, dict_filter: dict = None
) -> CveXploreObject | None:
"""
Method to perform a query on a *single* collection in the data source and return a *single* result.
Examples:
>>> from CveXplore import CveXplore
>>> cvx = CveXplore()
>>> result = cvx.get_single_store_entry("capec", {"id": "1"})
>>> result
<< Capec:1 >>
Args:
entry_type: Which specific store are you querying? Choices are:
- capec;
- cpe;
- cwe;
- via4;
- cves;
dict_filter: a dictionary representing a filter according to pymongo documentation
Returns:
An instance of CveXploreObject or None
"""
if dict_filter is None:
dict_filter = {}
entry_type = entry_type.lower()
if entry_type not in self.database_mapping:
raise DatabaseIllegalCollection(
f"Illegal collection requested: only {self.database_mapping} are allowed!"
)
result = getattr(self.datasource, f"store_{entry_type}").find_one(dict_filter)
return result
[docs]
def get_single_store_entries(
self, query: Tuple[str, dict], limit: int = 10
) -> List[CveXploreObject] | None:
"""
Method to perform a query on a *single* collection in the data source and return all the results.
Examples:
>>> from CveXplore import CveXplore
>>> cvx = CveXplore()
>>> result = cvx.get_single_store_entries(("cves", {"cvss": {"$eq": 8}}))
>>> result
[<< Cves:CVE-2011-0387 >>,
<< Cves:CVE-2015-1935 >>,
<< Cves:CVE-2014-3053 >>,
<< Cves:CVE-2010-4031 >>,
<< Cves:CVE-2016-1338 >>,
<< Cves:CVE-2013-3633 >>,
<< Cves:CVE-2017-14444 >>,
<< Cves:CVE-2017-14446 >>,
<< Cves:CVE-2017-14445 >>,
<< Cves:CVE-2016-2354 >>]
>>> from CveXplore import CveXplore
>>> cvx = CveXplore()
>>> result = cvx.get_single_store_entries(("cves", {"cvss": {"$eq": 8}}), limit=0)
>>> len(result)
32
Args:
query: Tuple which contains the entry_type and the dict_filter in a tuple. Choices for entry_type:
- capec;
- cpe;
- cwe;
- via4;
- cves;
dict_filter is a dictionary representing a filter according to pymongo documentation.
limit: Limit the number of results to return.
Returns:
A list with CveXploreObject's or None
"""
if not isinstance(query, tuple):
raise ValueError(
f"Wrong parameter type, received: {type(query)} expected: tuple"
)
if len(query) != 2:
raise ValueError(
f"Query parameter does not consist of the expected amount of variables, "
f"expected: 2 received: {len(query)}"
)
entry_type, dict_filter = query
entry_type = entry_type.lower()
if entry_type not in self.database_mapping:
raise DatabaseIllegalCollection(
f"Illegal collection requested: only {self.database_mapping} are allowed!"
)
if entry_type == "cves":
if "id" in dict_filter:
if isinstance(dict_filter["id"], str):
dict_filter["id"] = self._validate_cve_id(dict_filter["id"])
elif isinstance(dict_filter["id"], dict):
if "$in" in dict_filter["id"]:
dict_filter["id"]["$in"] = [
self._validate_cve_id(x) for x in dict_filter["id"]["$in"]
]
if "cvss" in dict_filter:
if isinstance(dict_filter["cvss"], str):
dict_filter["cvss"] = float(dict_filter["cvss"])
if "cvss3" in dict_filter:
if isinstance(dict_filter["cvss3"], str):
dict_filter["cvss3"] = float(dict_filter["cvss3"])
if "cvss4" in dict_filter:
if isinstance(dict_filter["cvss4"], str):
dict_filter["cvss4"] = float(dict_filter["cvss4"])
if "exploitabilityScore" in dict_filter:
if isinstance(dict_filter["exploitabilityScore"], str):
dict_filter["exploitabilityScore"] = float(
dict_filter["exploitabilityScore"]
)
if "exploitabilityScore3" in dict_filter:
if isinstance(dict_filter["exploitabilityScore3"], str):
dict_filter["exploitabilityScore3"] = float(
dict_filter["exploitabilityScore3"]
)
if "impactScore" in dict_filter:
if isinstance(dict_filter["impactScore"], str):
dict_filter["impactScore"] = float(dict_filter["impactScore"])
if "impactScore3" in dict_filter:
if isinstance(dict_filter["impactScore3"], str):
dict_filter["impactScore3"] = float(dict_filter["impactScore3"])
if "epss" in dict_filter:
if isinstance(dict_filter["epss"], str):
dict_filter["epss"] = float(dict_filter["epss"])
results = (
getattr(self.datasource, f"store_{entry_type}")
.find(dict_filter)
.limit(limit)
)
the_results = list(results)
if len(the_results) != 0:
return the_results
else:
return None
[docs]
def get_multi_store_entries(
self, *queries: List[Tuple[str, dict]], limit: int = 10
) -> List[CveXploreObject] | None:
"""
Method to perform *multiple* queries on *a single* or *multiple* collections in the data source and return the
results.
Examples:
>>> from CveXplore import CveXplore
>>> cvx = CveXplore()
>>> result = cvx.get_multi_store_entries([("CWE", {"id": "78"}), ("cves", {"id": "CVE-2009-0018"})])
>>> result
[<< Cwe:78 >>, << Cves:CVE-2009-0018 >>]
Args:
queries: A list of tuples which contains the entry_type and the dict_filter. Choices for entry_type:
- capec;
- cpe;
- cwe;
- via4;
- cves;
dict_filter is a dictionary representing a filter according to pymongo documentation.
limit: Limit the number of results to return.
Returns:
A list with CveXploreObject's or None
"""
results = map(
functools.partial(self.get_single_store_entries, limit=limit), *queries
)
the_results = [
result_list for result_list in results if result_list is not None
]
# flatten results
the_results = [item for row in the_results for item in row]
if len(the_results) != 0:
return the_results
else:
return None
[docs]
def cves_for_cpe(self, cpe_string: str) -> List[CveXploreObject] | None:
"""
Method for retrieving Cves that match a single CPE string. By default, the search will be made matching
the configuration fields of the cves documents.
Examples:
>>> from CveXplore import CveXplore
>>> cvx = CveXplore()
>>> cves = cvx.cves_for_cpe("cpe:2.3:o:microsoft:windows_7:*:sp1:*:*:*:*:*:*")
Args:
cpe_string: CPE string to search for.
Returns:
A list with CveXploreObject's or None
"""
cpe_regex_string = create_cpe_regex_string(cpe_string)
cves = self.get_single_store_entries(
("cves", {"vulnerable_configuration": {"$regex": cpe_regex_string}}),
limit=0,
)
return cves
def _validate_cve_id(self, cve_id: str) -> str:
"""
Method for validating a CVE ID.
Args:
cve_id: The CVE ID to validate.
Returns:
A validated CVE ID.
"""
reg_match = re.compile(r"[cC][vV][eE]-\d{4}-\d{4,10}")
if reg_match.match(cve_id) is not None:
cve_id = cve_id.upper()
else:
part_match = re.compile(r"\d{4}-\d{4,10}")
if part_match.match(cve_id) is not None:
cve_id = f"CVE-{cve_id}"
else:
raise CveNumberValidationError(
f"Could not validate the CVE number: {cve_id}. The number format should be either "
"CVE-2000-0001, cve-2000-0001 or 2000-0001."
)
return cve_id
[docs]
def cve_by_id(self, cve_id: str) -> CveXploreObject | None:
"""
Method to retrieve a single CVE from the database by its CVE ID number.
Args:
cve_id: The number format should be either CVE-2000-0001, cve-2000-0001 or 2000-0001.
Returns:
A CveXploreObject or None
"""
cve_id = self._validate_cve_id(cve_id=cve_id)
return self.get_single_store_entry("cves", {"id": cve_id})
[docs]
def cves_by_id(self, *cve_ids: str) -> Union[Iterable[CveXploreObject], Iterable]:
"""
Method to retrieve multiple CVE's from the database by their CVE ID numbers.
Args:
cve_ids: The number format should be either CVE-2000-0001, cve-2000-0001 or 2000-0001.
Returns:
A list with CveXploreObject's or empty list
"""
ret_data = []
for cve_id in cve_ids:
ret_data.append(self.cve_by_id(cve_id=cve_id))
if len(ret_data) >= 1:
return sorted(ret_data, key=lambda x: x.id)
else:
return ret_data
[docs]
def capec_by_cwe_id(self, cwe_id: int) -> List[CveXploreObject] | None:
"""
Method to retrieve CAPEC's related to a specific CWE ID
Args:
cwe_id: The CWE ID to retrieve CAPEC for.
Returns:
A list with CveXploreObject's or None
"""
cwe = self.get_single_store_entry("cwe", {"id": cwe_id})
if cwe is not None:
return list(cwe.iter_related_capecs())
else:
return cwe
[docs]
def last_cves(self, limit: int = 10) -> List[CveXploreObject] | None:
"""
Method to retrieve the last entered / changed cves.
Args:
limit: The amount of results to return; defaults to 10.
Returns:
A list with CveXploreObject's or None
"""
results = (
getattr(self.datasource, "store_cves")
.find()
.sort("modified", DESCENDING)
.limit(limit)
)
the_results = list(results)
if len(the_results) != 0:
return the_results
else:
return None
[docs]
def get_db_content_stats(self) -> dict | str:
"""
This method request the statistics from the database and consist of the time last modified and the document count
per cvedb store in the database.
Returns:
The stats from the database.
"""
stats = defaultdict(dict)
if not isinstance(self.datasource, ApiDatabaseSource):
if hasattr(self.datasource, "store_info"):
results = self.datasource.store_info.find({})
for each in results:
each.pop("_id")
db = each["db"]
if db == "epss":
continue
each.pop("db")
if "sources" in each:
each.pop("sources")
each.pop("searchables")
each["lastModified"] = str(each["lastModified"])
each["document count"] = getattr(
self.datasource, f"store_{db}"
).count_documents({})
stats[db] = each
for mgmtlist in ["mgmt_blacklist", "mgmt_whitelist"]:
stats[mgmtlist] = {
"document count": getattr(
self.datasource, f"store_{mgmtlist}"
).count_documents({})
}
return dict(stats)
else:
return "Database info could not be retrieved"
return f"Using api endpoint: {self.datasource.baseurl}"
[docs]
def __repr__(self) -> str:
"""
String representation of object
Returns:
String representation of object
"""
return f"<< CveXplore:{self.version} >>"