From 2c85d153e652aa646a1dad03a4dee7f385021af3 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Thu, 12 May 2022 21:43:47 +0100 Subject: [PATCH] support for views #275,#175,#103 --- .env.sample | 4 +- examples/book_view/benchmark.py | 160 ++++++++++++++++++++++++ examples/book_view/data.py | 113 +++++++++++++++++ examples/book_view/schema.json | 38 ++++++ examples/book_view/schema.py | 92 ++++++++++++++ pgsync/base.py | 198 ++++++------------------------ pgsync/constants.py | 1 + pgsync/node.py | 13 +- pgsync/sync.py | 55 ++++++--- pgsync/view.py | 209 ++++++++++++++++++++++++++++++-- requirements/dev.txt | 10 +- requirements/prod.txt | 10 +- requirements/test.txt | 10 +- setup.cfg | 1 + tests/fixtures/schema.json | 4 +- 15 files changed, 706 insertions(+), 212 deletions(-) create mode 100644 examples/book_view/benchmark.py create mode 100644 examples/book_view/data.py create mode 100644 examples/book_view/schema.json create mode 100644 examples/book_view/schema.py diff --git a/.env.sample b/.env.sample index b141f8c7..04e57cde 100644 --- a/.env.sample +++ b/.env.sample @@ -15,9 +15,9 @@ # BLOCK_SIZE=2048*10 # QUERY_LITERAL_BINDS=False # number of threads to spawn for poll db -# NTHREADS_POLLDB = 1 +# NTHREADS_POLLDB=1 # batch size for LOGICAL_SLOT_CHANGES for minimizing tmp file disk usage -# LOGICAL_SLOT_CHUNK_SIZE = 5000 +# LOGICAL_SLOT_CHUNK_SIZE=5000 # Elasticsearch # ELASTICSEARCH_SCHEME=http diff --git a/examples/book_view/benchmark.py b/examples/book_view/benchmark.py new file mode 100644 index 00000000..fb9c84f6 --- /dev/null +++ b/examples/book_view/benchmark.py @@ -0,0 +1,160 @@ +import json +from random import choice +from typing import Set + +import click +import sqlalchemy as sa +from faker import Faker +from schema import Book +from sqlalchemy.orm import sessionmaker + +from pgsync.base import pg_engine +from pgsync.constants import DELETE, INSERT, TG_OP, TRUNCATE, UPDATE +from pgsync.utils import get_config, show_settings, Timer + +FIELDS = { + "isbn": "isbn13", + "title": "sentence", + "description": "text", + "copyright": "word", +} + + +def insert_op(session: sessionmaker, model, nsize: int) -> None: + faker: Faker = Faker() + rows: Set = set([]) + for _ in range(nsize): + kwargs = {} + for column in model.__table__.columns: + if column.foreign_keys: + foreign_key = list(column.foreign_keys)[0] + pk = [ + column.name + for column in foreign_key.column.table.columns + if column.primary_key + ][0] + fkey = ( + session.query(foreign_key.column.table) + .order_by(sa.func.random()) + .limit(1) + ) + value = getattr(fkey[0], pk) + kwargs[column.name] = value + elif column.primary_key: + continue + else: + field = FIELDS.get(column.name) + if not field: + # continue + raise RuntimeError(f"field {column.name} not in mapping") + value = getattr(faker, field)() + kwargs[column.name] = value + print(f"Inserting {model.__table__} VALUES {kwargs}") + row = model(**kwargs) + rows.add(row) + + with Timer(f"Created {nsize} {model.__table__} in"): + try: + session.add_all(rows) + session.commit() + except Exception as e: + print(f"Exception {e}") + session.rollback() + + +def update_op(session: sessionmaker, model, nsize: int) -> None: + column: str = choice(list(FIELDS.keys())) + if column not in [column.name for column in model.__table__.columns]: + raise RuntimeError() + faker: Faker = Faker() + with Timer(f"Updated {nsize} {model.__table__}"): + for _ in range(nsize): + field = FIELDS.get(column) + value = getattr(faker, field)() + row = ( + session.query(model) + .filter(getattr(model, column) != value) + .order_by(sa.func.random()) + .limit(1) + ) + if row: + print(f'Updating {model.__table__} SET {column} = "{value}"') + try: + setattr(row[0], column, value) + session.commit() + except Exception as e: + session.rollback() + + +def delete_op(session: sessionmaker, model, nsize: int) -> None: + with Timer(f"Deleted {nsize} {model.__table__}"): + for _ in range(nsize): + row = session.query(model).order_by(sa.func.random()).limit(1) + pk = [ + column.name + for column in filter( + lambda x: x.primary_key, model.__table__.columns + ) + ][0] + if row: + try: + value = getattr(row[0], pk) + print(f"Deleting {model.__table__} WHERE {pk} = {value}") + session.query(model).filter( + getattr(model, pk) == value + ).delete() + session.commit() + except Exception as e: + session.rollback() + + +@click.command() +@click.option( + "--config", + "-c", + help="Schema config", + type=click.Path(exists=True), +) +@click.option("--daemon", "-d", is_flag=True, help="Run as a daemon") +@click.option("--nsize", "-n", default=5000, help="Number of samples") +@click.option( + "--tg_op", + "-t", + help="TG_OP", + type=click.Choice( + TG_OP, + case_sensitive=False, + ), +) +def main(config, nsize, daemon, tg_op): + + show_settings() + + config: str = get_config(config) + documents: dict = json.load(open(config)) + engine = pg_engine( + database=documents[0].get("database", documents[0]["index"]) + ) + Session = sessionmaker(bind=engine, autoflush=False, autocommit=False) + session = Session() + + model = Book + func = { + INSERT: insert_op, + UPDATE: update_op, + DELETE: delete_op, + } + # lets do only the book model for now + while True: + + if tg_op: + func[tg_op](session, model, nsize) + else: + func[choice(TG_OP)](session, model, nsize) + + if not daemon: + break + + +if __name__ == "__main__": + main() diff --git a/examples/book_view/data.py b/examples/book_view/data.py new file mode 100644 index 00000000..4c5d7626 --- /dev/null +++ b/examples/book_view/data.py @@ -0,0 +1,113 @@ +import json + +import click +import sqlalchemy as sa +from schema import Book, Publisher +from sqlalchemy.orm import sessionmaker + +from pgsync.base import Base, pg_engine, subtransactions +from pgsync.constants import DEFAULT_SCHEMA +from pgsync.helper import teardown +from pgsync.utils import get_config + + +@click.command() +@click.option( + "--config", + "-c", + help="Schema config", + type=click.Path(exists=True), +) +def main(config): + + config: str = get_config(config) + teardown(drop_db=False, config=config) + + for document in json.load(open(config)): + + database = document.get("database", document["index"]) + engine = pg_engine(database=database) + schema: str = document.get("schema", DEFAULT_SCHEMA) + connection = engine.connect().execution_options( + schema_translate_map={None: schema} + ) + Session = sessionmaker(bind=connection, autoflush=True) + session = Session() + + # Bootstrap + publishers = { + "Oxford Press": Publisher(name="Oxford Press", is_active=True), + "Penguin Books": Publisher(name="Penguin Books", is_active=False), + "Pearson Press": Publisher(name="Pearson Press", is_active=True), + "Reutgers Press": Publisher( + name="Reutgers Press", is_active=False + ), + } + with subtransactions(session): + session.add_all(publishers.values()) + + books = { + "001": Book( + isbn="001", + title="It", + description="Stephens Kings It", + publisher=publishers["Oxford Press"], + ), + "002": Book( + isbn="002", + title="The Body", + description="Lodsdcsdrem ipsum dodscdslor sit amet", + publisher=publishers["Oxford Press"], + ), + "003": Book( + isbn="003", + title="Harry Potter and the Sorcerer's Stone", + description="Harry Potter has never been", + publisher=publishers["Penguin Books"], + ), + "004": Book( + isbn="004", + title="Harry Potter and the Chamber of Secrets", + description="The Dursleys were so mean and hideous that summer " + "that all Harry Potter wanted was to get back to the " + "Hogwarts School for Witchcraft and Wizardry", + publisher=publishers["Penguin Books"], + ), + "005": Book( + isbn="005", + title="The 17th Suspect", + description="A series of shootings exposes San Francisco to a " + "methodical yet unpredictable killer, and a reluctant " + "woman decides to put her trust in Sergeant Lindsay " + "Boxer", + publisher=publishers["Pearson Press"], + ), + "006": Book( + isbn="006", + title="The President Is Missing", + description="The publishing event of 2018: Bill Clinton and James " + "Patterson's The President Is Missing is a " + "superlative thriller", + publisher=publishers["Pearson Press"], + ), + "007": Book( + isbn="007", + title="Say You're Sorry", + description="deserunt mollit anim id est laborum", + publisher=publishers["Reutgers Press"], + ), + "008": Book( + isbn="008", + title="Bones Don't Lie", + description="Lorem ipsum", + publisher=publishers["Reutgers Press"], + ), + } + with subtransactions(session): + session.add_all(books.values()) + + Base(database).refresh_views() + + +if __name__ == "__main__": + main() diff --git a/examples/book_view/schema.json b/examples/book_view/schema.json new file mode 100644 index 00000000..4db4df8c --- /dev/null +++ b/examples/book_view/schema.json @@ -0,0 +1,38 @@ +[ + { + "database": "book_view", + "index": "book_view", + "nodes": { + "table": "book_view", + "base_tables": ["book"], + "columns": [ + "id", + "isbn", + "title", + "description" + ], + "primary_key": ["id"], + "children": [ + { + "table": "publisher_view", + "base_tables": ["publisher"], + "columns": [ + "name", + "id", + "is_active" + ], + "primary_key": ["id"], + "label": "publisher_label", + "relationship": { + "variant": "object", + "type": "one_to_one", + "foreign_key": { + "child": ["id"], + "parent": ["publisher_id"] + } + } + } + ] + } + } +] diff --git a/examples/book_view/schema.py b/examples/book_view/schema.py new file mode 100644 index 00000000..65ff797d --- /dev/null +++ b/examples/book_view/schema.py @@ -0,0 +1,92 @@ +import json + +import click +import sqlalchemy as sa +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.schema import UniqueConstraint + +from pgsync.base import create_database, create_schema, pg_engine +from pgsync.constants import DEFAULT_SCHEMA +from pgsync.helper import teardown +from pgsync.utils import get_config +from pgsync.view import CreateView + +Base = declarative_base() + + +class Publisher(Base): + __tablename__ = "publisher" + __table_args__ = (UniqueConstraint("name"),) + id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) + name = sa.Column(sa.String, nullable=False) + is_active = sa.Column(sa.Boolean, default=False) + + +class Book(Base): + __tablename__ = "book" + __table_args__ = (UniqueConstraint("isbn"),) + id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) + isbn = sa.Column(sa.String, nullable=False) + title = sa.Column(sa.String, nullable=False) + description = sa.Column(sa.String, nullable=True) + copyright = sa.Column(sa.String, nullable=True) + publisher_id = sa.Column( + sa.Integer, sa.ForeignKey(Publisher.id, ondelete="CASCADE") + ) + publisher = sa.orm.relationship( + Publisher, + backref=sa.orm.backref("publishers"), + ) + + +def setup(config=None): + for document in json.load(open(config)): + database: str = document.get("database", document["index"]) + schema: str = document.get("schema", DEFAULT_SCHEMA) + create_database(database) + engine = pg_engine(database=database) + create_schema(engine, schema) + engine = engine.connect().execution_options( + schema_translate_map={None: schema} + ) + Base.metadata.drop_all(engine) + Base.metadata.create_all(engine) + + metadata = sa.MetaData(schema=schema) + metadata.reflect(engine, views=True) + + book_model = metadata.tables[f"{schema}.book"] + engine.execute( + CreateView( + schema, + "book_view", + book_model.select(), + ) + ) + + publisher_model = metadata.tables[f"{schema}.publisher"] + engine.execute( + CreateView( + schema, + "publisher_view", + publisher_model.select(), + ) + ) + + +@click.command() +@click.option( + "--config", + "-c", + help="Schema config", + type=click.Path(exists=True), +) +def main(config): + + config: str = get_config(config) + teardown(config=config) + setup(config) + + +if __name__ == "__main__": + main() diff --git a/pgsync/base.py b/pgsync/base.py index 61bf40dc..54ed8437 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -2,7 +2,6 @@ import logging import os import sys -import warnings from collections import defaultdict from typing import Dict, List, Optional, Tuple @@ -37,7 +36,7 @@ ) from .trigger import CREATE_TRIGGER_TEMPLATE from .urls import get_postgres_url -from .view import create_view, drop_view +from .view import create_view, DropView, RefreshView try: import citext # noqa @@ -81,6 +80,7 @@ def __init__(self, database: str, verbose: bool = False, *args, **kwargs): self.models: Dict[str] = {} self.__metadata: dict = {} self.__indices: dict = {} + self.__views: dict = {} self.verbose: bool = verbose def connect(self) -> None: @@ -210,6 +210,14 @@ def schemas(self) -> list: self.__schemas.remove(schema) return self.__schemas + def views(self, schema: str) -> list: + """Get all materialized and non-materialized views.""" + if schema not in self.__views: + self.__views[schema] = sa.inspect(self.engine).get_view_names( + schema + ) + return self.__views[schema] + def indices(self, table: str) -> list: """Get the database table indexes.""" if table not in self.__indices: @@ -457,160 +465,29 @@ def logical_slot_count_changes( ).scalar() # Views... - def _primary_keys( - self, schema: str, tables: List[str] - ) -> sa.sql.selectable.Select: - with warnings.catch_warnings(): - warnings.simplefilter("ignore", category=sa.exc.SAWarning) - pg_class = self.model("pg_class", "pg_catalog") - pg_index = self.model("pg_index", "pg_catalog") - pg_attribute = self.model("pg_attribute", "pg_catalog") - pg_namespace = self.model("pg_namespace", "pg_catalog") - - alias = pg_class.alias("x") - inclause: list = [] - for table in tables: - pairs = table.split(".") - if len(pairs) == 1: - inclause.append( - self.__engine.dialect.identifier_preparer.quote(pairs[0]) - ) - elif len(pairs) == 2: - inclause.append( - f"{pairs[0]}.{self.__engine.dialect.identifier_preparer.quote(pairs[-1])}" - ) - else: - raise Exception( - f"cannot determine schema and table from {table}" - ) - - return ( - sa.select( - [ - sa.func.REPLACE( - sa.func.REVERSE( - sa.func.SPLIT_PART( - sa.func.REVERSE( - sa.cast( - sa.cast( - pg_index.c.indrelid, - sa.dialects.postgresql.REGCLASS, - ), - sa.Text, - ) - ), - ".", - 1, - ) - ), - '"', - "", - ).label("table_name"), - sa.func.ARRAY_AGG(pg_attribute.c.attname).label( - "primary_keys" - ), - ] - ) - .join( - pg_attribute, - pg_attribute.c.attrelid == pg_index.c.indrelid, - ) - .join( - pg_class, - pg_class.c.oid == pg_index.c.indexrelid, - ) - .join( - alias, - alias.c.oid == pg_index.c.indrelid, - ) - .join( - pg_namespace, - pg_namespace.c.oid == pg_class.c.relnamespace, - ) - .where( - *[ - pg_namespace.c.nspname.notin_(["pg_catalog", "pg_toast"]), - pg_index.c.indisprimary, - sa.cast( - sa.cast( - pg_index.c.indrelid, - sa.dialects.postgresql.REGCLASS, - ), - sa.Text, - ).in_(inclause), - pg_attribute.c.attnum == sa.any_(pg_index.c.indkey), - ] - ) - .group_by(pg_index.c.indrelid) - ) - - def _foreign_keys( - self, schema: str, tables: List[str] - ) -> sa.sql.selectable.Select: - with warnings.catch_warnings(): - warnings.simplefilter("ignore", category=sa.exc.SAWarning) - table_constraints = self.model( - "table_constraints", - "information_schema", - ) - key_column_usage = self.model( - "key_column_usage", - "information_schema", - ) - constraint_column_usage = self.model( - "constraint_column_usage", - "information_schema", - ) - - return ( - sa.select( - [ - table_constraints.c.table_name, - sa.func.ARRAY_AGG( - sa.cast( - key_column_usage.c.column_name, - sa.TEXT, - ) - ).label("foreign_keys"), - ] - ) - .join( - key_column_usage, - sa.and_( - key_column_usage.c.constraint_name - == table_constraints.c.constraint_name, - key_column_usage.c.table_schema - == table_constraints.c.table_schema, - key_column_usage.c.table_schema == schema, - ), - ) - .join( - constraint_column_usage, - sa.and_( - constraint_column_usage.c.constraint_name - == table_constraints.c.constraint_name, - constraint_column_usage.c.table_schema - == table_constraints.c.table_schema, - ), - ) - .where( - *[ - table_constraints.c.table_name.in_(tables), - table_constraints.c.constraint_type == "FOREIGN KEY", - ] - ) - .group_by(table_constraints.c.table_name) - ) - def create_view( self, schema: str, tables: list, user_defined_fkey_tables: dict - ): + ) -> None: create_view( - self.engine, schema, tables, user_defined_fkey_tables, self + self.engine, + self.model, + self.fetchall, + schema, + tables, + user_defined_fkey_tables, ) - def drop_view(self, schema: str): - drop_view(self.engine, schema) + def drop_view(self, schema: str) -> None: + """Drop a view.""" + logger.debug(f"Dropping view: {schema}.{MATERIALIZED_VIEW}") + self.engine.execute(DropView(schema, MATERIALIZED_VIEW)) + logger.debug(f"Dropped view: {schema}.{MATERIALIZED_VIEW}") + + def refresh_view(self, name: str, schema: str) -> None: + """Refresh a materialized view.""" + logger.debug(f"Refreshing view: {schema}.{MATERIALIZED_VIEW}") + self.engine.execute(RefreshView(schema, name, concurrently=False)) + logger.debug(f"Refreshed view: {schema}.{MATERIALIZED_VIEW}") # Triggers... def create_triggers( @@ -622,14 +499,16 @@ def create_triggers( MATERIALIZED_VIEW, f"{schema}.{MATERIALIZED_VIEW}" ) ) - views = sa.inspect(self.engine).get_view_names(schema) - queries = [] + queries: list = [] for table in self.tables(schema): schema, table = self._get_schema(schema, table) - if (tables and table not in tables) or (table in views): + if (tables and table not in tables) or ( + table in self.views(schema) + ): continue logger.debug(f"Creating trigger on table: {schema}.{table}") - for name, for_each, tg_op in [ + + for name, level, tg_op in [ ("notify", "ROW", ["INSERT", "UPDATE", "DELETE"]), ("truncate", "STATEMENT", ["TRUNCATE"]), ]: @@ -638,7 +517,7 @@ def create_triggers( sa.DDL( f'CREATE TRIGGER "{table}_{name}" ' f'AFTER {" OR ".join(tg_op)} ON "{schema}"."{table}" ' - f"FOR EACH {for_each} EXECUTE PROCEDURE " + f"FOR EACH {level} EXECUTE PROCEDURE " f"{TRIGGER_FUNC}()", ) ) @@ -687,7 +566,7 @@ def enable_triggers(self, schema: str) -> None: self.execute(query) @property - def txid_current(self): + def txid_current(self) -> int: """ Get last committed transaction id from the database. @@ -773,7 +652,9 @@ def _parse_logical_slot(data): data = f"{data[match.span()[1]:]} " yield key, value - payload = dict(schema=None, tg_op=None, table=None, old={}, new={}) + payload: dict = dict( + schema=None, tg_op=None, table=None, old={}, new={} + ) match = LOGICAL_SLOT_PREFIX.search(row) if not match: @@ -815,7 +696,6 @@ def _parse_logical_slot(data): return payload # Querying... - def execute(self, statement, values=None, options=None): """Execute a query statement.""" conn = self.__engine.connect() diff --git a/pgsync/constants.py b/pgsync/constants.py index 0e3b1c92..5580aad7 100644 --- a/pgsync/constants.py +++ b/pgsync/constants.py @@ -21,6 +21,7 @@ # Node attributes NODE_ATTRIBUTES = [ + "base_tables", "children", "columns", "label", diff --git a/pgsync/node.py b/pgsync/node.py index 510ffeca..e6181cee 100644 --- a/pgsync/node.py +++ b/pgsync/node.py @@ -29,6 +29,7 @@ RelationshipVariantError, TableNotInNodeError, ) +from .view import is_materialized_view @dataclass @@ -107,12 +108,14 @@ class Node(object): model: sa.sql.selectable.Alias table: str schema: str + materialized: bool = False primary_key: Optional[list] = None label: Optional[str] = None transform: Optional[dict] = None columns: Optional[list] = None relationship: Optional[dict] = None parent: Optional[Node] = None + base_tables: Optional[list] = None def __post_init__(self): self.columns = self.columns or [] @@ -190,15 +193,11 @@ def is_root(self) -> bool: @property def name(self) -> str: - """ - returns a fully qualified node name - """ + """Returns a fully qualified node name.""" return f"{self.schema}.{self.table}" def add_child(self, node: Node) -> None: - """ - all nodes except the root node must have a relationship defined - """ + """All nodes except the root node must have a relationship defined.""" node.parent: Node = self if not node.is_root and ( not node.relationship.type or not node.relationship.variant @@ -263,6 +262,8 @@ def build(self, root: dict) -> Node: transform=root.get("transform", {}), columns=root.get("columns", []), relationship=root.get("relationship", {}), + base_tables=root.get("base_tables", []), + materialized=is_materialized_view(self.base.engine, schema, table), ) self.nodes.add(node.table) diff --git a/pgsync/sync.py b/pgsync/sync.py index c270820a..c6579c08 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -91,6 +91,7 @@ def __init__( ) self.redis: RedisQueue = RedisQueue(self.__name) self.tree: Tree = Tree(self) + self.__root: Node = None if validate: self.validate(repl_slots=repl_slots) self.create_setting() @@ -99,6 +100,12 @@ def __init__( ) self.count: dict = dict(xlog=0, db=0, redis=0) + @property + def root(self) -> str: + if self.__root is None: + self.__root = self.tree.build(self.nodes) + return self.__root + def validate(self, repl_slots: Optional[bool] = True) -> None: """Perform all validation right away.""" @@ -173,14 +180,12 @@ def validate(self, repl_slots: Optional[bool] = True) -> None: f"read/writable" ) - root: Node = self.tree.build(self.nodes) - root.display() - for node in root.traverse_breadth_first(): + self.root.display() + for node in self.root.traverse_breadth_first(): pass def analyze(self) -> None: - root: Node = self.tree.build(self.nodes) - for node in root.traverse_breadth_first(): + for node in self.root.traverse_breadth_first(): if node.is_root: continue @@ -235,10 +240,9 @@ def analyze(self) -> None: def create_setting(self) -> None: """Create Elasticsearch setting and mapping if required.""" - root: Node = self.tree.build(self.nodes) self.es._create_setting( self.index, - root, + self.root, setting=self.setting, mapping=self.mapping, routing=self.routing, @@ -253,12 +257,14 @@ def setup(self) -> None: # tables with user defined foreign keys user_defined_fkey_tables: dict = {} - root: Node = self.tree.build(self.nodes) - for node in root.traverse_breadth_first(): + for node in self.root.traverse_breadth_first(): if node.schema != schema: continue tables |= set(node.relationship.through_tables) tables |= set([node.table]) + # we also need to bootstrap the base tables + tables |= set(node.base_tables) + # we want to get both the parent and the child keys here # even though only one of them is the foreign_key. # this is because we define both in the schema but @@ -287,20 +293,22 @@ def teardown(self, drop_view: bool = True) -> None: for schema in self.schemas: tables: Set = set([]) - root: Node = self.tree.build(self.nodes) - for node in root.traverse_breadth_first(): + for node in self.root.traverse_breadth_first(): tables |= set(node.relationship.through_tables) tables |= set([node.table]) + # we also need to teardown the base tables + tables |= set(node.base_tables) + self.drop_triggers(schema=schema, tables=tables) if drop_view: self.drop_view(schema=schema) self.drop_replication_slot(self.__name) - def get_doc_id(self, primary_keys: List[str]) -> str: + def get_doc_id(self, primary_keys: List[str], table: str) -> str: """Get the Elasticsearch document id from the primary keys.""" if not primary_keys: raise PrimaryKeyNotFoundError( - "No primary key found on target table" + f"No primary key found on table: {table}" ) return f"{PRIMARY_KEY_DELIMITER}".join(map(str, primary_keys)) @@ -522,7 +530,7 @@ def _update_op( and old_values != new_values ): doc: dict = { - "_id": self.get_doc_id(old_values), + "_id": self.get_doc_id(old_values, root.table), "_index": self.index, "_op_type": "delete", } @@ -624,7 +632,7 @@ def _delete_op( payload_data[key] for key in root.model.primary_keys ] doc: dict = { - "_id": self.get_doc_id(root_primary_values), + "_id": self.get_doc_id(root_primary_values, root.table), "_index": self.index, "_op_type": "delete", } @@ -951,7 +959,7 @@ def sync( print("-" * 10) doc: dict = { - "_id": self.get_doc_id(primary_keys), + "_id": self.get_doc_id(primary_keys, node.table), "_index": self.index, "_source": row, } @@ -997,7 +1005,9 @@ def poll_redis(self) -> None: if payloads: logger.debug(f"poll_redis: {payloads}") self.count["redis"] += len(payloads) + self.refresh_views() self.on_publish(payloads) + time.sleep(REDIS_POLL_INTERVAL) @threaded @@ -1042,6 +1052,12 @@ def poll_db(self) -> None: logger.debug(f"on_notify: {payload}") self.count["db"] += 1 + def refresh_views(self): + for node in self.root.traverse_breadth_first(): + if node.table in self.views(node.schema): + if node.materialized: + self.refresh_view(node.table, node.schema) + def on_publish(self, payloads: list) -> None: """ Redis publish event handler. @@ -1050,6 +1066,13 @@ def on_publish(self, payloads: list) -> None: It is called when an event is received from Redis. Deserialize the payload from Redis and sync to Elasticsearch. """ + # this is used for the views. + # we substitute the views for the base table here + for i, payload in enumerate(payloads): + for node in self.root.traverse_breadth_first(): + if payload["table"] in node.base_tables: + payloads[i]["table"] = node.table + logger.debug(f"on_publish len {len(payloads)}") # Safe inserts are insert operations that can be performed in any order # Optimize the safe INSERTS diff --git a/pgsync/view.py b/pgsync/view.py index 15e9ab8c..6042ed88 100644 --- a/pgsync/view.py +++ b/pgsync/view.py @@ -1,5 +1,7 @@ """PGSync views.""" import logging +import warnings +from typing import Callable, List import sqlalchemy as sa from sqlalchemy.dialects.postgresql import array @@ -32,7 +34,6 @@ def __init__( def compile_create_view( element: CreateView, compiler: PGDDLCompiler, **kwargs ) -> str: - statement: str = compiler.sql_compiler.process( element.selectable, literal_binds=True, @@ -70,6 +71,29 @@ def compile_drop_view( ) +class RefreshView(DDLElement): + def __init__( + self, + schema: str, + name: str, + concurrently: bool = False, + ): + self.schema: str = schema + self.name: str = name + self.concurrently: bool = concurrently + + +@compiler.compiles(RefreshView) +def compile_refresh_view( + element: RefreshView, compiler: PGDDLCompiler, **kwargs +) -> str: + concurrently: str = "CONCURRENTLY" if element.concurrently else "" + return ( + f"REFRESH MATERIALIZED VIEW {concurrently} " + f'"{element.schema}"."{element.name}"' + ) + + class CreateIndex(DDLElement): def __init__(self, name: str, schema: str, view: str, columns: list): self.schema: str = schema @@ -100,12 +124,156 @@ def compile_drop_index( return f"DROP INDEX IF EXISTS {element.name}" +def _primary_keys( + engine, model: Callable, schema: str, tables: List[str] +) -> sa.sql.selectable.Select: + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=sa.exc.SAWarning) + pg_class = model("pg_class", "pg_catalog") + pg_index = model("pg_index", "pg_catalog") + pg_attribute = model("pg_attribute", "pg_catalog") + pg_namespace = model("pg_namespace", "pg_catalog") + + alias = pg_class.alias("x") + inclause: list = [] + for table in tables: + pairs = table.split(".") + if len(pairs) == 1: + inclause.append(engine.dialect.identifier_preparer.quote(pairs[0])) + elif len(pairs) == 2: + inclause.append( + f"{pairs[0]}.{engine.dialect.identifier_preparer.quote(pairs[-1])}" + ) + else: + raise Exception(f"cannot determine schema and table from {table}") + + return ( + sa.select( + [ + sa.func.REPLACE( + sa.func.REVERSE( + sa.func.SPLIT_PART( + sa.func.REVERSE( + sa.cast( + sa.cast( + pg_index.c.indrelid, + sa.dialects.postgresql.REGCLASS, + ), + sa.Text, + ) + ), + ".", + 1, + ) + ), + '"', + "", + ).label("table_name"), + sa.func.ARRAY_AGG(pg_attribute.c.attname).label( + "primary_keys" + ), + ] + ) + .join( + pg_attribute, + pg_attribute.c.attrelid == pg_index.c.indrelid, + ) + .join( + pg_class, + pg_class.c.oid == pg_index.c.indexrelid, + ) + .join( + alias, + alias.c.oid == pg_index.c.indrelid, + ) + .join( + pg_namespace, + pg_namespace.c.oid == pg_class.c.relnamespace, + ) + .where( + *[ + pg_namespace.c.nspname.notin_(["pg_catalog", "pg_toast"]), + pg_index.c.indisprimary, + sa.cast( + sa.cast( + pg_index.c.indrelid, + sa.dialects.postgresql.REGCLASS, + ), + sa.Text, + ).in_(inclause), + pg_attribute.c.attnum == sa.any_(pg_index.c.indkey), + ] + ) + .group_by(pg_index.c.indrelid) + ) + + +def _foreign_keys( + model, schema: str, tables: List[str] +) -> sa.sql.selectable.Select: + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=sa.exc.SAWarning) + table_constraints = model( + "table_constraints", + "information_schema", + ) + key_column_usage = model( + "key_column_usage", + "information_schema", + ) + constraint_column_usage = model( + "constraint_column_usage", + "information_schema", + ) + + return ( + sa.select( + [ + table_constraints.c.table_name, + sa.func.ARRAY_AGG( + sa.cast( + key_column_usage.c.column_name, + sa.TEXT, + ) + ).label("foreign_keys"), + ] + ) + .join( + key_column_usage, + sa.and_( + key_column_usage.c.constraint_name + == table_constraints.c.constraint_name, + key_column_usage.c.table_schema + == table_constraints.c.table_schema, + key_column_usage.c.table_schema == schema, + ), + ) + .join( + constraint_column_usage, + sa.and_( + constraint_column_usage.c.constraint_name + == table_constraints.c.constraint_name, + constraint_column_usage.c.table_schema + == table_constraints.c.table_schema, + ), + ) + .where( + *[ + table_constraints.c.table_name.in_(tables), + table_constraints.c.constraint_type == "FOREIGN KEY", + ] + ) + .group_by(table_constraints.c.table_name) + ) + + def create_view( engine, + model: Callable, + fetchall: Callable, schema: str, tables: list, user_defined_fkey_tables: dict, - base: "Base", # noqa F821 ) -> None: """ View describing primary_keys and foreign_keys for each table @@ -132,7 +300,7 @@ def create_view( rows: dict = {} if MATERIALIZED_VIEW in views: - for table_name, primary_keys, foreign_keys in base.fetchall( + for table_name, primary_keys, foreign_keys in fetchall( sa.select(["*"]).select_from( sa.text(f"{schema}.{MATERIALIZED_VIEW}") ) @@ -152,8 +320,8 @@ def create_view( for table in set(tables): tables.add(f"{schema}.{table}") - for table_name, columns in base.fetchall( - base._primary_keys(schema, tables) + for table_name, columns in fetchall( + _primary_keys(engine, model, schema, tables) ): rows.setdefault( table_name, @@ -162,9 +330,7 @@ def create_view( if columns: rows[table_name]["primary_keys"] |= set(columns) - for table_name, columns in base.fetchall( - base._foreign_keys(schema, tables) - ): + for table_name, columns in fetchall(_foreign_keys(model, schema, tables)): rows.setdefault( table_name, {"primary_keys": set([]), "foreign_keys": set([])}, @@ -218,7 +384,26 @@ def create_view( logger.debug(f"Created view: {schema}.{MATERIALIZED_VIEW}") -def drop_view(engine, schema: str) -> None: - logger.debug(f"Dropping view: {schema}.{MATERIALIZED_VIEW}") - engine.execute(DropView(schema, MATERIALIZED_VIEW)) - logger.debug(f"Dropped view: {schema}.{MATERIALIZED_VIEW}") +def is_materialized_view( + engine, + schema: str, + table: str, +) -> bool: + with engine.connect() as conn: + return ( + conn.execute( + sa.select([sa.column("matviewname")]) + .select_from(sa.text("pg_matviews")) + .where( + sa.and_( + *[ + sa.column("matviewname") == table, + sa.column("schemaname") == schema, + ] + ) + ) + .with_only_columns([sa.func.COUNT()]) + .order_by(None) + ).scalar() + > 0 + ) diff --git a/requirements/dev.txt b/requirements/dev.txt index d9c7c283..900cf0a7 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -10,9 +10,9 @@ attrs==21.4.0 # via pytest black==22.3.0 # via -r requirements/base.in -boto3==1.22.8 +boto3==1.22.12 # via -r requirements/base.in -botocore==1.25.8 +botocore==1.25.12 # via # boto3 # s3transfer @@ -49,7 +49,7 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in -faker==13.7.0 +faker==13.11.0 # via -r requirements/base.in filelock==3.6.0 # via virtualenv @@ -129,7 +129,7 @@ pydocstyle==6.1.1 # via flake8-docstrings pyflakes==2.4.0 # via flake8 -pyparsing==3.0.8 +pyparsing==3.0.9 # via packaging pytest==6.2.5 # via @@ -154,7 +154,7 @@ python-dotenv==0.20.0 # via environs pyyaml==6.0 # via pre-commit -redis==4.2.2 +redis==4.3.1 # via -r requirements/base.in requests==2.27.1 # via requests-aws4auth diff --git a/requirements/prod.txt b/requirements/prod.txt index 334a68ad..b22845b7 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -8,9 +8,9 @@ async-timeout==4.0.2 # via redis black==22.3.0 # via -r requirements/base.in -boto3==1.22.8 +boto3==1.22.12 # via -r requirements/base.in -botocore==1.25.8 +botocore==1.25.12 # via # boto3 # s3transfer @@ -38,7 +38,7 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in -faker==13.7.0 +faker==13.11.0 # via -r requirements/base.in greenlet==1.1.2 # via sqlalchemy @@ -66,7 +66,7 @@ platformdirs==2.5.2 # via black psycopg2-binary==2.9.3 # via -r requirements/base.in -pyparsing==3.0.8 +pyparsing==3.0.9 # via packaging python-dateutil==2.8.2 # via @@ -75,7 +75,7 @@ python-dateutil==2.8.2 # faker python-dotenv==0.20.0 # via environs -redis==4.2.2 +redis==4.3.1 # via -r requirements/base.in requests==2.27.1 # via requests-aws4auth diff --git a/requirements/test.txt b/requirements/test.txt index 20646c8a..18b7bcc5 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -10,9 +10,9 @@ attrs==21.4.0 # via pytest black==22.3.0 # via -r requirements/base.in -boto3==1.22.8 +boto3==1.22.12 # via -r requirements/base.in -botocore==1.25.8 +botocore==1.25.12 # via # boto3 # s3transfer @@ -42,7 +42,7 @@ elasticsearch-dsl==7.4.0 # via -r requirements/base.in environs==9.5.0 # via -r requirements/base.in -faker==13.7.0 +faker==13.11.0 # via -r requirements/base.in flake8==4.0.1 # via @@ -108,7 +108,7 @@ pydocstyle==6.1.1 # via flake8-docstrings pyflakes==2.4.0 # via flake8 -pyparsing==3.0.8 +pyparsing==3.0.9 # via packaging pytest==6.2.5 # via @@ -131,7 +131,7 @@ python-dateutil==2.8.2 # faker python-dotenv==0.20.0 # via environs -redis==4.2.2 +redis==4.3.1 # via -r requirements/base.in requests==2.27.1 # via requests-aws4auth diff --git a/setup.cfg b/setup.cfg index 0a2c58ef..95728f6d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,6 +21,7 @@ max-complexity = 12 doctests = True statistics = True benchmark = True +ignore = D101,D102,D103,D105,D107,D401,T000,C901,T201,W503,E203,T203 [isort] line_length = 79 diff --git a/tests/fixtures/schema.json b/tests/fixtures/schema.json index 5a0974ff..099a876d 100644 --- a/tests/fixtures/schema.json +++ b/tests/fixtures/schema.json @@ -1,7 +1,7 @@ [ { - "database": "index", - "index": "index", + "database": "fakedb", + "index": "fake_index", "nodes": { "table": "book", "columns": [