-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reverse lookup #599
base: master
Are you sure you want to change the base?
Reverse lookup #599
Changes from 5 commits
30acf3c
5940304
c74e89e
2839931
dfc1bb0
292186b
8f47af2
f98e0b3
9a5ba3e
d6c6453
c4f2777
5f5221b
b3a0e27
a7405b7
4b58c8c
b42daf7
bacb08d
70b183a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,226 @@ | ||||
from __future__ import annotations | ||||
|
||||
import inspect | ||||
import typing as t | ||||
from dataclasses import dataclass | ||||
|
||||
from piccolo.columns.base import Selectable | ||||
from piccolo.columns.column_types import ( | ||||
JSON, | ||||
JSONB, | ||||
Column, | ||||
LazyTableReference, | ||||
) | ||||
|
||||
if t.TYPE_CHECKING: # pragma: no cover | ||||
from piccolo.table import Table | ||||
|
||||
|
||||
class ReverseLookupSelect(Selectable): | ||||
""" | ||||
This is a subquery used within a select to fetch reverse lookup data. | ||||
""" | ||||
|
||||
def __init__( | ||||
self, | ||||
*columns: Column, | ||||
reverse_lookup: ReverseLookup, | ||||
as_list: bool = False, | ||||
load_json: bool = False, | ||||
table: t.Type[Table], | ||||
): | ||||
""" | ||||
:param columns: | ||||
Which columns to include from the related table. | ||||
:param as_list: | ||||
If a single column is provided, and ``as_list`` is ``True`` a | ||||
flattened list will be returned, rather than a list of objects. | ||||
:param load_json: | ||||
If ``True``, any JSON strings are loaded as Python objects. | ||||
:param table: | ||||
Parent table for reverse lookup. | ||||
|
||||
""" | ||||
self.as_list = as_list | ||||
self.columns = columns | ||||
self.reverse_lookup = reverse_lookup | ||||
self.load_json = load_json | ||||
self.table = table | ||||
|
||||
safe_types = [int, str] | ||||
|
||||
# If the columns can be serialised / deserialise as JSON, then we | ||||
# can fetch the data all in one go. | ||||
self.serialisation_safe = all( | ||||
(column.__class__.value_type in safe_types) | ||||
and (type(column) not in (JSON, JSONB)) | ||||
for column in columns | ||||
) | ||||
|
||||
@property | ||||
def foreign_key_columns_index(self) -> int: | ||||
reverse_lookup_table = ( | ||||
self.reverse_lookup._meta.resolved_reverse_joining_table | ||||
) | ||||
fk_columns = [ | ||||
i._meta.name | ||||
for i in reverse_lookup_table._meta.foreign_key_columns | ||||
] | ||||
return fk_columns.index(self.table._meta.tablename) | ||||
|
||||
def get_select_string(self, engine_type: str, with_alias=True) -> str: | ||||
reverse_lookup_table = ( | ||||
self.reverse_lookup._meta.resolved_reverse_joining_table | ||||
) | ||||
reverse_lookup_table_name = reverse_lookup_table._meta.tablename | ||||
reverse_lookup_pk = reverse_lookup_table._meta.primary_key._meta.name | ||||
reverse_lookup_fk_table_name = ( | ||||
reverse_lookup_table._meta.foreign_key_columns[ | ||||
self.foreign_key_columns_index | ||||
]._meta.name | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think instead of |
||||
) | ||||
reverse_lookup_fk = reverse_lookup_table._meta.foreign_key_columns[ | ||||
self.foreign_key_columns_index | ||||
]._meta.db_column_name | ||||
|
||||
reverse_select = f""" | ||||
"{reverse_lookup_table_name}" | ||||
JOIN "{reverse_lookup_fk_table_name}" "inner_{reverse_lookup_fk_table_name}" ON ( | ||||
"{reverse_lookup_table_name}"."{reverse_lookup_fk}" | ||||
= "inner_{reverse_lookup_fk_table_name}"."{reverse_lookup_pk}" | ||||
) WHERE "{reverse_lookup_table_name}"."{reverse_lookup_fk}" | ||||
= "{reverse_lookup_fk_table_name}"."{reverse_lookup_pk}" | ||||
""" # noqa: E501 | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this reverse_select = f"""
"{reverse_lookup_table_name}"
WHERE "{reverse_lookup_table_name}"."{reverse_lookup_fk}"
= "{reverse_lookup_fk_table_name}"."{reverse_lookup_pk}"
""" |
||||
|
||||
if engine_type == "postgres": | ||||
if self.as_list: | ||||
column_name = self.columns[0]._meta.db_column_name | ||||
return f""" | ||||
ARRAY( | ||||
SELECT | ||||
"{reverse_lookup_table_name}"."{column_name}" | ||||
FROM {reverse_select} | ||||
) AS "{reverse_lookup_table_name}s" | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should use piccolo/piccolo/columns/m2m.py Line 93 in 14012e5
|
||||
""" | ||||
elif not self.serialisation_safe: | ||||
column_name = reverse_lookup_pk | ||||
return f""" | ||||
ARRAY( | ||||
SELECT | ||||
"{reverse_lookup_table_name}"."{column_name}" | ||||
FROM {reverse_select} | ||||
) AS "{reverse_lookup_table_name}s" | ||||
""" | ||||
else: | ||||
if len(self.columns) > 0: | ||||
column_names = ", ".join( | ||||
f'"{reverse_lookup_table_name}"."{column._meta.db_column_name}"' # noqa: E501 | ||||
for column in self.columns | ||||
) | ||||
else: | ||||
column_names = ", ".join( | ||||
f'"{reverse_lookup_table_name}"."{column._meta.db_column_name}"' # noqa: E501 | ||||
for column in reverse_lookup_table._meta.columns | ||||
) | ||||
return f""" | ||||
( | ||||
SELECT JSON_AGG("{reverse_lookup_table_name}s") | ||||
FROM ( | ||||
SELECT {column_names} FROM {reverse_select} | ||||
) AS "{reverse_lookup_table_name}s" | ||||
) AS "{reverse_lookup_table_name}s" | ||||
""" | ||||
elif engine_type == "sqlite": | ||||
if len(self.columns) > 1 or not self.serialisation_safe: | ||||
column_name = reverse_lookup_pk | ||||
else: | ||||
try: | ||||
column_name = self.columns[0]._meta.db_column_name | ||||
except IndexError: | ||||
column_name = reverse_lookup_pk | ||||
|
||||
return f""" | ||||
( | ||||
SELECT group_concat( | ||||
"{reverse_lookup_table_name}"."{column_name}" | ||||
) | ||||
FROM {reverse_select} | ||||
) | ||||
AS "{reverse_lookup_table_name}s [M2M]" | ||||
""" | ||||
else: | ||||
raise ValueError(f"{engine_type} is an unrecognised engine type") | ||||
|
||||
|
||||
@dataclass | ||||
class ReverseLookupMeta: | ||||
reverse_joining_table: t.Union[t.Type[Table], LazyTableReference] | ||||
|
||||
@property | ||||
def resolved_reverse_joining_table(self) -> t.Type[Table]: | ||||
""" | ||||
Evaluates the ``reverse_joining_table`` attribute if it's a | ||||
``LazyTableReference``, raising a ``ValueError`` if it fails, | ||||
otherwise returns a ``Table`` subclass. | ||||
""" | ||||
from piccolo.table import Table | ||||
|
||||
if isinstance(self.reverse_joining_table, LazyTableReference): | ||||
return self.reverse_joining_table.resolve() | ||||
elif inspect.isclass(self.reverse_joining_table) and issubclass( | ||||
self.reverse_joining_table, Table | ||||
): | ||||
return self.reverse_joining_table | ||||
else: | ||||
raise ValueError( | ||||
"The reverse_joining_table attribute is neither a Table" | ||||
" subclass or a LazyTableReference instance." | ||||
) | ||||
|
||||
|
||||
class ReverseLookup: | ||||
def __init__( | ||||
self, | ||||
reverse_joining_table: t.Union[t.Type[Table], LazyTableReference], | ||||
): | ||||
""" | ||||
:param reverse_joining_table: | ||||
A ``Table`` for reverse lookup. | ||||
""" | ||||
self._meta = ReverseLookupMeta( | ||||
reverse_joining_table=reverse_joining_table, | ||||
) | ||||
|
||||
def __call__( | ||||
self, | ||||
*columns: Column, | ||||
as_list: bool = False, | ||||
load_json: bool = False, | ||||
table: t.Type[Table], | ||||
) -> ReverseLookupSelect: | ||||
""" | ||||
:param columns: | ||||
Which columns to include from the related table. If none are | ||||
specified, then all of the columns are returned. | ||||
:param as_list: | ||||
If a single column is provided, and ``as_list`` is ``True`` a | ||||
flattened list will be returned, rather than a list of objects. | ||||
:param load_json: | ||||
If ``True``, any JSON strings are loaded as Python objects. | ||||
:param table: | ||||
Parent table for reverse lookup. | ||||
|
||||
""" | ||||
|
||||
if as_list and len(columns) != 1: | ||||
raise ValueError( | ||||
"`as_list` is only valid with a single column argument" | ||||
) | ||||
|
||||
return ReverseLookupSelect( | ||||
*columns, | ||||
reverse_lookup=self, | ||||
as_list=as_list, | ||||
load_json=load_json, | ||||
table=table, | ||||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we compare column names with table names here. They don't have to match. I think instead of
it would be better to be able to specify:
Since the column that is used for the reverse lookup doesn't change(?), I think it would be best if we could also specify it in the
ReverseLookup
constructor.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@powellnorma Thanks for your interest and review. It's been a while since I wrote this code, but if I remeber it has to be a table because we refer to the foreign key of parent table later in the code. We need this function to find the foreign key column index if there are multiple foreign keys per single table.