Module bug_trail.data_code
Functions to fetch data from the SQLite database.
Expand source code
"""
Functions to fetch data from the SQLite database.
"""
import logging
import sqlite3
from typing import Any
from bug_trail_core.handlers import BaseErrorLogHandler
logger = logging.getLogger(__name__)
ALL_TABLES = [
"exception_instance",
"exception_type",
"logs",
"python_libraries",
"system_info",
"traceback_info",
]
ENTIRE_LOG_SET = (
"SELECT logs.*, "
"exception_instance.args, "
"exception_instance.args as exception_args, "
"exception_instance.args as exception_str, "
"exception_instance.args as comments, "
"exception_type.name as exception_name, "
"exception_type.docstring as exception_docstring, "
"exception_type.hierarchy as exception_hierarchy "
"FROM logs "
"left outer join exception_instance "
"on logs.record_id = exception_instance.record_id "
"left outer join exception_type "
"on exception_instance.type_id = exception_type.id "
"ORDER BY created DESC"
)
def table_row_count(conn: sqlite3.Connection, table_name: str) -> int:
if table_name not in ALL_TABLES:
raise TypeError("Bad table name.")
cursor = conn.cursor()
cursor.execute(f"SELECT count(*) FROM {table_name};") # nosec: table name restricted above
try:
return cursor.fetchone()[0]
except IndexError:
return 0
def connect(db_path: str) -> sqlite3.Connection:
"""Open db, central code"""
# Mock this to prevent extra dbs being created.
return sqlite3.connect(db_path)
def fetch_log_data(conn: sqlite3.Connection, db_path: str, limit: int = -1, offset: int = -1) -> list[dict[str, Any]]:
"""
Fetch all log records from the database.
Args:
conn (sqlite3.Connection): The connection to the database
db_path (str): Path to the SQLite database
limit (int, optional): Limit the number of records returned. Defaults to -1.
offset (int, optional): Offset the records returned. Defaults to -1.
Returns:
list[dict[str, Any]]: A list of dictionaries containing all log records
"""
# Connect to the SQLite database
logger.debug(f"Connected to {db_path}")
cursor = conn.cursor()
# Query to fetch all rows from the logs table
query = ENTIRE_LOG_SET
if limit != -1:
query += f" LIMIT {limit} OFFSET {offset}"
execute_safely(cursor, query, db_path)
# Fetching column names from the cursor
columns = [description[0] for description in cursor.description]
# Fetch all rows, and convert each row to a dictionary
rows = cursor.fetchall()
log_data = []
for row in rows:
log_record = dict(zip(columns, row, strict=True))
log_data.append(log_record)
# Close the connection
conn.close()
return log_data
def fetch_table_as_list_of_dict(db_path: str, table: str) -> list[dict[str, Any]]:
"""
Fetch all log records from the database.
Args:
db_path (str): Path to the SQLite database
table (str): Table to query
Returns:
list[dict[str, Any]]: A list of dictionaries containing all log records
"""
if table not in ALL_TABLES:
raise TypeError("Don't know that table.")
# Connect to the SQLite database
conn = connect(db_path)
logger.debug(f"Connected to {db_path}")
cursor = conn.cursor()
# Query to fetch all rows from the logs table
query = f"SELECT * FROM {table}" # nosec: table name restricted above
execute_safely(cursor, query, db_path)
# Fetching column names from the cursor
columns = [description[0] for description in cursor.description]
# Fetch all rows, and convert each row to a dictionary
rows = cursor.fetchall()
data = []
for row in rows:
record = dict(zip(columns, row, strict=True))
data.append(record)
# Close the connection
conn.close()
return data
def fetch_log_data_grouped(db_path: str) -> Any:
"""
Fetch all log records from the database, and group them into a nested dictionary.
Args:
db_path (str): Path to the SQLite database
Returns:
Any: A nested dictionary containing all log records
"""
# Connect to the SQLite database
conn = connect(db_path)
logger.debug(f"Connected to {db_path}")
cursor = conn.cursor()
# Query to fetch all rows from the logs table
query = ENTIRE_LOG_SET
execute_safely(cursor, query, db_path)
# Fetching column names from the cursor
columns = [description[0] for description in cursor.description]
# Fetch all rows, and convert each row to a grouped dictionary
rows = cursor.fetchall()
log_data = []
for row in rows:
log_record = dict(zip(columns, row, strict=True))
# Grouping the log record
grouped_record = {
"MessageDetails": {key: log_record[key] for key in ["msg", "args", "levelname", "levelno"]},
"SourceContext": {
key: log_record[key] for key in ["name", "pathname", "filename", "module", "funcName", "lineno"]
},
"TemporalDetails": {key: log_record[key] for key in ["created", "msecs", "relativeCreated"]},
"ProcessThreadContext": {
key: log_record[key] for key in ["process", "processName", "thread", "threadName"]
},
"ExceptionDetails": {
key: log_record.get(key)
for key in [
"exc_info",
"exc_text",
"exception_args",
"exception_str",
"comments",
"exception_name",
"exception_docstring",
"exception_hierarchy",
]
},
"StackDetails": {key: log_record[key] for key in ["stack_info"]},
"UserData": {
key: log_record[key]
for key in log_record.keys()
- {
"msg",
"args",
"levelname",
"levelno",
"name",
"pathname",
"filename",
"module",
"funcName",
"lineno",
"created",
"msecs",
"relativeCreated",
"process",
"processName",
"thread",
"threadName",
"exc_info",
"exc_text",
"stack_info",
# exception table
"exception_args",
"exception_str",
"comments",
"exception_name",
"exception_docstring",
"exception_hierarchy",
}
},
}
log_data.append(grouped_record)
# Close the connection
conn.close()
return log_data
def execute_safely(cursor: sqlite3.Cursor, query: str, db_path: str) -> None:
"""
Execute a query safely, creating the table if it doesn't exist
Args:
cursor (sqlite3.Cursor): The cursor to use
query (str): The query to execute
db_path (str): The path to the database
"""
try:
logger.debug(query)
cursor.execute(query)
except sqlite3.OperationalError as se:
if "no such table" in str(se):
# TODO: handle the possibility that the table is different for picologging
handler = BaseErrorLogHandler(db_path)
handler.create_table()
cursor.execute(query)
else:
raise
Functions
def connect(db_path: str) ‑> sqlite3.Connection-
Open db, central code
Expand source code
def connect(db_path: str) -> sqlite3.Connection: """Open db, central code""" # Mock this to prevent extra dbs being created. return sqlite3.connect(db_path) def execute_safely(cursor: sqlite3.Cursor, query: str, db_path: str) ‑> None-
Execute a query safely, creating the table if it doesn't exist
Args
cursor:sqlite3.Cursor- The cursor to use
query:str- The query to execute
db_path:str- The path to the database
Expand source code
def execute_safely(cursor: sqlite3.Cursor, query: str, db_path: str) -> None: """ Execute a query safely, creating the table if it doesn't exist Args: cursor (sqlite3.Cursor): The cursor to use query (str): The query to execute db_path (str): The path to the database """ try: logger.debug(query) cursor.execute(query) except sqlite3.OperationalError as se: if "no such table" in str(se): # TODO: handle the possibility that the table is different for picologging handler = BaseErrorLogHandler(db_path) handler.create_table() cursor.execute(query) else: raise def fetch_log_data(conn: sqlite3.Connection, db_path: str, limit: int = -1, offset: int = -1) ‑> list[dict[str, typing.Any]]-
Fetch all log records from the database.
Args
conn:sqlite3.Connection- The connection to the database
db_path:str- Path to the SQLite database
limit:int, optional- Limit the number of records returned. Defaults to -1.
offset:int, optional- Offset the records returned. Defaults to -1.
Returns
list[dict[str, Any]]- A list of dictionaries containing all log records
Expand source code
def fetch_log_data(conn: sqlite3.Connection, db_path: str, limit: int = -1, offset: int = -1) -> list[dict[str, Any]]: """ Fetch all log records from the database. Args: conn (sqlite3.Connection): The connection to the database db_path (str): Path to the SQLite database limit (int, optional): Limit the number of records returned. Defaults to -1. offset (int, optional): Offset the records returned. Defaults to -1. Returns: list[dict[str, Any]]: A list of dictionaries containing all log records """ # Connect to the SQLite database logger.debug(f"Connected to {db_path}") cursor = conn.cursor() # Query to fetch all rows from the logs table query = ENTIRE_LOG_SET if limit != -1: query += f" LIMIT {limit} OFFSET {offset}" execute_safely(cursor, query, db_path) # Fetching column names from the cursor columns = [description[0] for description in cursor.description] # Fetch all rows, and convert each row to a dictionary rows = cursor.fetchall() log_data = [] for row in rows: log_record = dict(zip(columns, row, strict=True)) log_data.append(log_record) # Close the connection conn.close() return log_data def fetch_log_data_grouped(db_path: str) ‑> Any-
Fetch all log records from the database, and group them into a nested dictionary.
Args
db_path:str- Path to the SQLite database
Returns
Any- A nested dictionary containing all log records
Expand source code
def fetch_log_data_grouped(db_path: str) -> Any: """ Fetch all log records from the database, and group them into a nested dictionary. Args: db_path (str): Path to the SQLite database Returns: Any: A nested dictionary containing all log records """ # Connect to the SQLite database conn = connect(db_path) logger.debug(f"Connected to {db_path}") cursor = conn.cursor() # Query to fetch all rows from the logs table query = ENTIRE_LOG_SET execute_safely(cursor, query, db_path) # Fetching column names from the cursor columns = [description[0] for description in cursor.description] # Fetch all rows, and convert each row to a grouped dictionary rows = cursor.fetchall() log_data = [] for row in rows: log_record = dict(zip(columns, row, strict=True)) # Grouping the log record grouped_record = { "MessageDetails": {key: log_record[key] for key in ["msg", "args", "levelname", "levelno"]}, "SourceContext": { key: log_record[key] for key in ["name", "pathname", "filename", "module", "funcName", "lineno"] }, "TemporalDetails": {key: log_record[key] for key in ["created", "msecs", "relativeCreated"]}, "ProcessThreadContext": { key: log_record[key] for key in ["process", "processName", "thread", "threadName"] }, "ExceptionDetails": { key: log_record.get(key) for key in [ "exc_info", "exc_text", "exception_args", "exception_str", "comments", "exception_name", "exception_docstring", "exception_hierarchy", ] }, "StackDetails": {key: log_record[key] for key in ["stack_info"]}, "UserData": { key: log_record[key] for key in log_record.keys() - { "msg", "args", "levelname", "levelno", "name", "pathname", "filename", "module", "funcName", "lineno", "created", "msecs", "relativeCreated", "process", "processName", "thread", "threadName", "exc_info", "exc_text", "stack_info", # exception table "exception_args", "exception_str", "comments", "exception_name", "exception_docstring", "exception_hierarchy", } }, } log_data.append(grouped_record) # Close the connection conn.close() return log_data def fetch_table_as_list_of_dict(db_path: str, table: str) ‑> list[dict[str, typing.Any]]-
Fetch all log records from the database.
Args
db_path:str- Path to the SQLite database
table:str- Table to query
Returns
list[dict[str, Any]]- A list of dictionaries containing all log records
Expand source code
def fetch_table_as_list_of_dict(db_path: str, table: str) -> list[dict[str, Any]]: """ Fetch all log records from the database. Args: db_path (str): Path to the SQLite database table (str): Table to query Returns: list[dict[str, Any]]: A list of dictionaries containing all log records """ if table not in ALL_TABLES: raise TypeError("Don't know that table.") # Connect to the SQLite database conn = connect(db_path) logger.debug(f"Connected to {db_path}") cursor = conn.cursor() # Query to fetch all rows from the logs table query = f"SELECT * FROM {table}" # nosec: table name restricted above execute_safely(cursor, query, db_path) # Fetching column names from the cursor columns = [description[0] for description in cursor.description] # Fetch all rows, and convert each row to a dictionary rows = cursor.fetchall() data = [] for row in rows: record = dict(zip(columns, row, strict=True)) data.append(record) # Close the connection conn.close() return data def table_row_count(conn: sqlite3.Connection, table_name: str) ‑> int-
Expand source code
def table_row_count(conn: sqlite3.Connection, table_name: str) -> int: if table_name not in ALL_TABLES: raise TypeError("Bad table name.") cursor = conn.cursor() cursor.execute(f"SELECT count(*) FROM {table_name};") # nosec: table name restricted above try: return cursor.fetchone()[0] except IndexError: return 0