17_perm/explore_permissions.py
# Imports
from pypika import AliasedQuery, Query, Table
from pypika.functions import Count
import sqlite3
# Create tables and insert values.
# 'permission' shows who can do what: ahmed can view things, other people cannot.
# 'samples' contains the data people want to view.
SETUP = """\
CREATE TABLE permission (
person TEXT,
capability TEXT
);
INSERT INTO permission VALUES
('ahmed', 'view')
;
CREATE TABLE samples (
label TEXT
);
INSERT INTO samples VALUES
('first'),
('second')
;
"""
# Create database.
connection = sqlite3.connect(":memory:")
connection.executescript(SETUP)
# Represent tables.
permission = Table("permission")
samples = Table("samples")
# Build a permission query.
# Result is a single row (name, 0/1), e.g., ('ahmed', 1) or ('zephyr', 0).
def make_permission_query(person, capability):
return Query.from_(permission)\
.where((permission.person == person) & (permission.capability == capability))\
.select(permission.person, Count(permission.capability).as_("allowed"))
# Demonstrate that 'make_permission_query' works as intended.
ahmed_view = make_permission_query("ahmed", "view")
print("ahmed view:", str(ahmed_view))
print(connection.execute(str(ahmed_view)).fetchall())
zephyr_view = make_permission_query("zephyr", "view")
print("zephyr view:", str(zephyr_view))
print(connection.execute(str(zephyr_view)).fetchall())
# And now the problem: get the labels a person is allowed to view.
# Desired result is a list of (allowed, label) pairs, e.g.:
#
# for ahmed: [(1, 'first'), (1, 'second')]
# for zephyr: [(0, 'first'), (0, 'second')]
#
# Note that we do _not_ want an empty list for someone who can't view
# anything, since that would make it impossible to distinguish "no data"
# from "no permission".
perm = Table("perm") # so that subquery fields can be named with dot notation
# 1: add the CTE to the query but don't use it
query = Query.with_(ahmed_view, "perm")\
.from_(AliasedQuery("perm"))\
.select(samples.label)
print(f"\n1. query with unused CTE tries to get 'label' from 'perm'\n{str(query)}")
# 2: select from samples and then add CTE
query = Query.from_(samples)\
.with_(ahmed_view, "perm")\
.select(samples.label)
print(f"\n2. query starting with samples but not joining\n{str(query)}")
# 3: select from both tables without explicit join
query = Query.from_(samples)\
.with_(ahmed_view, "perm")\
.select(samples.label, perm.person, perm.allowed)
print(f"\n3. select from both tables without explicit join tries to get everything from samples\n{str(query)}")
# 4: try joining without .on
try:
query = Query.with_(ahmed_view, "perm")\
.from_(AliasedQuery("perm"))\
.join(samples)\
.select(samples.label, perm.person, perm.allowed)
except Exception as exc:
print(f"\n4. attempt to join without .on raises exception {exc}")
# 5: try joining with True
try:
query = Query.with_(ahmed_view, "perm")\
.from_(AliasedQuery("perm"))\
.join(samples)\
.on(True)\
.select(samples.label, perm.person, perm.allowed)
except Exception as exc:
print(f"\n5. attempt to join with True raises exception {exc}")
# 6: try joining with True
try:
query = Query.with_(ahmed_view, "perm")\
.from_(AliasedQuery("perm"))\
.join(samples)\
.on(True)\
.select(samples.label, perm.person, perm.allowed)
except Exception as exc:
print(f"\n6. attempt to join with True raises exception {exc}")
# 7: try joining with 'label==label'
query = Query.with_(ahmed_view, "perm")\
.from_(AliasedQuery("perm"))\
.join(samples)\
.on(samples.label == samples.label)\
.select(samples.label, perm.person, perm.allowed)
print(f"\n7a. joining with 'label == label' for ahmed\n{str(query)}")
print(connection.execute(str(query)).fetchall())
query = Query.with_(zephyr_view, "perm")\
.from_(AliasedQuery("perm"))\
.join(samples)\
.on(samples.label == samples.label)\
.select(samples.label, perm.person, perm.allowed)
print(f"\n7b. joining with 'label == label' for zephyr\n{str(query)}")
print(connection.execute(str(query)).fetchall())