Skip to content

Commit

Permalink
support for views #275,#175,#103
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed May 12, 2022
1 parent d356ad3 commit 2c85d15
Show file tree
Hide file tree
Showing 15 changed files with 706 additions and 212 deletions.
4 changes: 2 additions & 2 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# BLOCK_SIZE=2048*10
# QUERY_LITERAL_BINDS=False
# number of threads to spawn for poll db
# NTHREADS_POLLDB = 1
# NTHREADS_POLLDB=1
# batch size for LOGICAL_SLOT_CHANGES for minimizing tmp file disk usage
# LOGICAL_SLOT_CHUNK_SIZE = 5000
# LOGICAL_SLOT_CHUNK_SIZE=5000

# Elasticsearch
# ELASTICSEARCH_SCHEME=http
Expand Down
160 changes: 160 additions & 0 deletions examples/book_view/benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import json
from random import choice
from typing import Set

import click
import sqlalchemy as sa
from faker import Faker
from schema import Book
from sqlalchemy.orm import sessionmaker

from pgsync.base import pg_engine
from pgsync.constants import DELETE, INSERT, TG_OP, TRUNCATE, UPDATE
from pgsync.utils import get_config, show_settings, Timer

FIELDS = {
"isbn": "isbn13",
"title": "sentence",
"description": "text",
"copyright": "word",
}


def insert_op(session: sessionmaker, model, nsize: int) -> None:
faker: Faker = Faker()
rows: Set = set([])
for _ in range(nsize):
kwargs = {}
for column in model.__table__.columns:
if column.foreign_keys:
foreign_key = list(column.foreign_keys)[0]
pk = [
column.name
for column in foreign_key.column.table.columns
if column.primary_key
][0]
fkey = (
session.query(foreign_key.column.table)
.order_by(sa.func.random())
.limit(1)
)
value = getattr(fkey[0], pk)
kwargs[column.name] = value
elif column.primary_key:
continue
else:
field = FIELDS.get(column.name)
if not field:
# continue
raise RuntimeError(f"field {column.name} not in mapping")
value = getattr(faker, field)()
kwargs[column.name] = value
print(f"Inserting {model.__table__} VALUES {kwargs}")
row = model(**kwargs)
rows.add(row)

with Timer(f"Created {nsize} {model.__table__} in"):
try:
session.add_all(rows)
session.commit()
except Exception as e:
print(f"Exception {e}")
session.rollback()


def update_op(session: sessionmaker, model, nsize: int) -> None:
column: str = choice(list(FIELDS.keys()))
if column not in [column.name for column in model.__table__.columns]:
raise RuntimeError()
faker: Faker = Faker()
with Timer(f"Updated {nsize} {model.__table__}"):
for _ in range(nsize):
field = FIELDS.get(column)
value = getattr(faker, field)()
row = (
session.query(model)
.filter(getattr(model, column) != value)
.order_by(sa.func.random())
.limit(1)
)
if row:
print(f'Updating {model.__table__} SET {column} = "{value}"')
try:
setattr(row[0], column, value)
session.commit()
except Exception as e:
session.rollback()


def delete_op(session: sessionmaker, model, nsize: int) -> None:
with Timer(f"Deleted {nsize} {model.__table__}"):
for _ in range(nsize):
row = session.query(model).order_by(sa.func.random()).limit(1)
pk = [
column.name
for column in filter(
lambda x: x.primary_key, model.__table__.columns
)
][0]
if row:
try:
value = getattr(row[0], pk)
print(f"Deleting {model.__table__} WHERE {pk} = {value}")
session.query(model).filter(
getattr(model, pk) == value
).delete()
session.commit()
except Exception as e:
session.rollback()


@click.command()
@click.option(
"--config",
"-c",
help="Schema config",
type=click.Path(exists=True),
)
@click.option("--daemon", "-d", is_flag=True, help="Run as a daemon")
@click.option("--nsize", "-n", default=5000, help="Number of samples")
@click.option(
"--tg_op",
"-t",
help="TG_OP",
type=click.Choice(
TG_OP,
case_sensitive=False,
),
)
def main(config, nsize, daemon, tg_op):

show_settings()

config: str = get_config(config)
documents: dict = json.load(open(config))
engine = pg_engine(
database=documents[0].get("database", documents[0]["index"])
)
Session = sessionmaker(bind=engine, autoflush=False, autocommit=False)
session = Session()

model = Book
func = {
INSERT: insert_op,
UPDATE: update_op,
DELETE: delete_op,
}
# lets do only the book model for now
while True:

if tg_op:
func[tg_op](session, model, nsize)
else:
func[choice(TG_OP)](session, model, nsize)

if not daemon:
break


if __name__ == "__main__":
main()
113 changes: 113 additions & 0 deletions examples/book_view/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import json

import click
import sqlalchemy as sa
from schema import Book, Publisher
from sqlalchemy.orm import sessionmaker

from pgsync.base import Base, pg_engine, subtransactions
from pgsync.constants import DEFAULT_SCHEMA
from pgsync.helper import teardown
from pgsync.utils import get_config


@click.command()
@click.option(
"--config",
"-c",
help="Schema config",
type=click.Path(exists=True),
)
def main(config):

config: str = get_config(config)
teardown(drop_db=False, config=config)

for document in json.load(open(config)):

database = document.get("database", document["index"])
engine = pg_engine(database=database)
schema: str = document.get("schema", DEFAULT_SCHEMA)
connection = engine.connect().execution_options(
schema_translate_map={None: schema}
)
Session = sessionmaker(bind=connection, autoflush=True)
session = Session()

# Bootstrap
publishers = {
"Oxford Press": Publisher(name="Oxford Press", is_active=True),
"Penguin Books": Publisher(name="Penguin Books", is_active=False),
"Pearson Press": Publisher(name="Pearson Press", is_active=True),
"Reutgers Press": Publisher(
name="Reutgers Press", is_active=False
),
}
with subtransactions(session):
session.add_all(publishers.values())

books = {
"001": Book(
isbn="001",
title="It",
description="Stephens Kings It",
publisher=publishers["Oxford Press"],
),
"002": Book(
isbn="002",
title="The Body",
description="Lodsdcsdrem ipsum dodscdslor sit amet",
publisher=publishers["Oxford Press"],
),
"003": Book(
isbn="003",
title="Harry Potter and the Sorcerer's Stone",
description="Harry Potter has never been",
publisher=publishers["Penguin Books"],
),
"004": Book(
isbn="004",
title="Harry Potter and the Chamber of Secrets",
description="The Dursleys were so mean and hideous that summer "
"that all Harry Potter wanted was to get back to the "
"Hogwarts School for Witchcraft and Wizardry",
publisher=publishers["Penguin Books"],
),
"005": Book(
isbn="005",
title="The 17th Suspect",
description="A series of shootings exposes San Francisco to a "
"methodical yet unpredictable killer, and a reluctant "
"woman decides to put her trust in Sergeant Lindsay "
"Boxer",
publisher=publishers["Pearson Press"],
),
"006": Book(
isbn="006",
title="The President Is Missing",
description="The publishing event of 2018: Bill Clinton and James "
"Patterson's The President Is Missing is a "
"superlative thriller",
publisher=publishers["Pearson Press"],
),
"007": Book(
isbn="007",
title="Say You're Sorry",
description="deserunt mollit anim id est laborum",
publisher=publishers["Reutgers Press"],
),
"008": Book(
isbn="008",
title="Bones Don't Lie",
description="Lorem ipsum",
publisher=publishers["Reutgers Press"],
),
}
with subtransactions(session):
session.add_all(books.values())

Base(database).refresh_views()


if __name__ == "__main__":
main()
38 changes: 38 additions & 0 deletions examples/book_view/schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[
{
"database": "book_view",
"index": "book_view",
"nodes": {
"table": "book_view",
"base_tables": ["book"],
"columns": [
"id",
"isbn",
"title",
"description"
],
"primary_key": ["id"],
"children": [
{
"table": "publisher_view",
"base_tables": ["publisher"],
"columns": [
"name",
"id",
"is_active"
],
"primary_key": ["id"],
"label": "publisher_label",
"relationship": {
"variant": "object",
"type": "one_to_one",
"foreign_key": {
"child": ["id"],
"parent": ["publisher_id"]
}
}
}
]
}
}
]
Loading

0 comments on commit 2c85d15

Please sign in to comment.