Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions db_setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
-- Create Claim table
CREATE TABLE IF NOT EXISTS "Claim" (
id SERIAL PRIMARY KEY,
subject TEXT,
claim TEXT,
object TEXT,
statement TEXT,
"effectiveDate" TIMESTAMP,
"sourceURI" TEXT,
"howKnown" TEXT,
"dateObserved" TIMESTAMP,
"digestMultibase" TEXT,
author TEXT,
curator TEXT,
aspect TEXT,
score FLOAT,
stars INTEGER,
amt FLOAT,
unit TEXT,
"howMeasured" TEXT,
"intendedAudience" TEXT,
"respondAt" TEXT,
confidence FLOAT,
"issuerId" TEXT,
"issuerIdType" TEXT,
"claimAddress" TEXT,
proof TEXT
);

-- Create Node table
CREATE TABLE IF NOT EXISTS "Node" (
id SERIAL PRIMARY KEY,
"nodeUri" TEXT UNIQUE,
name TEXT,
"entType" TEXT,
descrip TEXT,
image TEXT,
thumbnail TEXT
);

-- Create Edge table
CREATE TABLE IF NOT EXISTS "Edge" (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always separate out migrations into a separate pr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tho this doesn't actually look like a migration is it for manual testing locally?

id SERIAL PRIMARY KEY,
"startNodeId" INTEGER REFERENCES "Node"(id),
"endNodeId" INTEGER REFERENCES "Node"(id),
label TEXT,
thumbnail TEXT,
"claimId" INTEGER REFERENCES "Claim"(id)
);
211 changes: 109 additions & 102 deletions lib/db.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,103 @@
import os
from dotenv import load_dotenv
import psycopg2
from contextlib import contextmanager

from lib.config import DB_NAME, DB_USER, DB_PASSWORD, DB_HOST, DB_PORT
load_dotenv()

# Connect to the PostgreSQL database
@contextmanager
def get_conn():
global conn
"""Context manager for database connections"""
conn = None
try:
# check if conn is open
conn.status
except (NameError, AttributeError, psycopg2.OperationalError):
# conn is closed or doesn't exist yet
conn = psycopg2.connect(
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT
)
return conn
conn = psycopg2.connect(os.getenv('DATABASE_URI'))
yield conn
except Exception as e:
print(f"Database connection error: {str(e)}")
raise
finally:
if conn is not None:
conn.close()

def get_claim(claim_id):
with get_conn().cursor() as cur:
# Read data from the Claim model
cur.execute("SELECT id, subject, claim, object, statement, \"effectiveDate\", \"sourceURI\", \"howKnown\", \"dateObserved\", \"digestMultibase\", author, curator, aspect, score, stars, amt, unit, \"howMeasured\", \"intendedAudience\", \"respondAt\", confidence, \"issuerId\", \"issuerIdType\", \"claimAddress\", proof FROM \"Claim\" WHERE id = {}".format(claim_id))
columns = [desc[0] for desc in cur.description]
row = cur.fetchone()
return dict(zip(columns, row))
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
id, subject, claim, object, statement,
"effectiveDate", "sourceURI", "howKnown",
"dateObserved", "digestMultibase", author,
curator, aspect, score, stars, amt, unit,
"howMeasured", "intendedAudience", "respondAt",
confidence, "issuerId", "issuerIdType",
"claimAddress", proof
FROM "Claim"
WHERE id = %s
""", (claim_id,))

columns = [desc[0] for desc in cur.description]
row = cur.fetchone()

if row is None:
return None

return dict(zip(columns, row))

def unprocessed_claims_generator():
with get_conn().cursor() as cur:
# find latest processed claim
QUERY_LATEST_CLAIMID = 'SELECT MAX("claimId") FROM "Edge"'
cur.execute(QUERY_LATEST_CLAIMID)
latest_claimid = cur.fetchone()[0]
# manually set to backfill
#latest_claimid = 118498
# Read data from the Claim model
cur.execute("SELECT id, subject, claim, object, statement, \"effectiveDate\", \"sourceURI\", \"howKnown\", \"dateObserved\", \"digestMultibase\", author, curator, aspect, score, stars, amt, unit, \"howMeasured\", \"intendedAudience\", \"respondAt\", confidence, \"issuerId\", \"issuerIdType\", \"claimAddress\", proof FROM \"Claim\" WHERE id > {}".format(latest_claimid))

columns = [desc[0] for desc in cur.description]
while True:
rows = cur.fetchmany()
if not rows:
break
for row in rows:
yield dict(zip(columns, row))

def unpublished_claims_generator():
with get_conn().cursor() as cur:
# Read data from the Claim model
# TODO track last date and only process new claims
cur.execute("SELECT id, subject, claim, object, statement, \"effectiveDate\", \"sourceURI\", \"howKnown\", \"dateObserved\", \"digestMultibase\", author, curator, aspect, score, stars, amt, unit, \"howMeasured\", \"intendedAudience\", \"respondAt\", confidence, \"issuerId\", \"issuerIdType\", \"claimAddress\", proof FROM \"Claim\" WHERE \"claimAddress\" is NULL or \"claimAddress\" = ''")
# could refactor this section with above function
columns = [desc[0] for desc in cur.description]
while True:
rows = cur.fetchmany()
if not rows:
break
for row in rows:
yield dict(zip(columns, row))

with get_conn() as conn:
with conn.cursor() as cur:
# find latest processed claim
QUERY_LATEST_CLAIMID = 'SELECT MAX("claimId") FROM "Edge"'
cur.execute(QUERY_LATEST_CLAIMID)
latest_claimid = cur.fetchone()[0]

# Read data from the Claim model
cur.execute("""
SELECT id, subject, claim, object, statement, \"effectiveDate\",
\"sourceURI\", \"howKnown\", \"dateObserved\", \"digestMultibase\",
author, curator, aspect, score, stars, amt, unit, \"howMeasured\",
\"intendedAudience\", \"respondAt\", confidence, \"issuerId\",
\"issuerIdType\", \"claimAddress\", proof
FROM \"Claim\" WHERE id > %s
""", (latest_claimid,))

columns = [desc[0] for desc in cur.description]
while True:
rows = cur.fetchmany()
if not rows:
break
for row in rows:
yield dict(zip(columns, row))

def execute_sql_query(query, params):
with get_conn().cursor() as cur:
cur.execute(query, params)
result = cur.fetchone()
conn.commit()
if result is not None:
col_names = [desc[0] for desc in cur.description]
return dict(zip(col_names, result))
else:
with get_conn() as conn:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make linter changes separately and actual changes in a smaller pr

too hard to review large pr with many lint-only changes and some real ones

with conn.cursor() as cur:
cur.execute(query, params)
result = cur.fetchone()
conn.commit()
if result is not None:
col_names = [desc[0] for desc in cur.description]
return dict(zip(col_names, result))
return None

def update_claim_address(claim_id, claim_address):
""" Update the claimAddress field of a claim """
if not claim_id:
raise Exception("Cannot update without a claim id")
query = f"UPDATE \"Claim\" set \"claimAddress\" = '{claim_address}' where id = {claim_id}"
with get_conn().cursor() as cur:
cur.execute(query)
conn.commit()
with get_conn() as conn:
with conn.cursor() as cur:
query = f"UPDATE \"Claim\" set \"claimAddress\" = %s where id = %s"
cur.execute(query, (claim_address, claim_id))
conn.commit()

def insert_data(table, data):
conn = get_conn()
quoted_keys = ['\"' + key + '\"' for key in data.keys()]
query = f"INSERT INTO \"{table}\" ({', '.join(quoted_keys)}) VALUES ({', '.join(['%s']*len(data))}) RETURNING id;"
try:
return execute_sql_query(query, tuple(data.values()))['id']
except:
import pdb; pdb.set_trace()
except Exception as e:
print(f"Error inserting data: {str(e)}")
raise

def insert_node(node):
"""Insert a Node into the database."""
Expand All @@ -107,21 +116,23 @@ def get_node_by_uri(node_uri):
"""
try:
row = execute_sql_query(select_node_sql, (node_uri,))
except:
import pdb ; pdb.set_trace()
except Exception as e:
print(f"Error getting node: {str(e)}")
raise

if row is None:
print("{} not found in db".format(node_uri))
print(f"{node_uri} not found in db")
return None
node_dict = {
'id': row['id'],
'nodeUri': row['nodeUri'],
'name': row['name'],
'entType': row['entType'],
'descrip': row['descrip'],
'image': row['image'],
'thumbnail': row['thumbnail']
}
return node_dict
return {
'id': row['id'],
'nodeUri': row['nodeUri'],
'name': row['name'],
'entType': row['entType'],
'descrip': row['descrip'],
'image': row['image'],
'thumbnail': row['thumbnail']
}

def get_edge_by_endpoints(start_node_id, end_node_id, claim_id):
"""Retrieve an Edge from the database by the IDs of its start and end Nodes."""
Expand All @@ -134,27 +145,23 @@ def get_edge_by_endpoints(start_node_id, end_node_id, claim_id):
if row is None:
return None

edge_dict = {
'id': row['id'],
'startNodeId': row['startNodeId'],
'endNodeId': row['endNodeId'],
'label': row['label'],
'thumbnail': row['thumbnail'],
'claimId': row['claimId']
}
return edge_dict
return {
'id': row['id'],
'startNodeId': row['startNodeId'],
'endNodeId': row['endNodeId'],
'label': row['label'],
'thumbnail': row['thumbnail'],
'claimId': row['claimId']
}

def del_claim(claim_id):
if not claim_id:
raise("A non-zero non-null claim id is required")

conn = get_conn()
with conn.cursor() as cur:
# delete the edges related to the claim
cur.execute('delete from "Edge" where "claimId" = {}'.format(claim_id))

cur.execute('delete from "Claim" where id = {}'.format(claim_id))
conn.commit()

return

raise ValueError("A non-zero non-null claim id is required")

with get_conn() as conn:
with conn.cursor() as cur:
# delete the edges related to the claim
cur.execute('DELETE FROM "Edge" WHERE "claimId" = %s', (claim_id,))
# delete the claim
cur.execute('DELETE FROM "Claim" WHERE id = %s', (claim_id,))
conn.commit()
Loading