Skip to content

Commit

Permalink
chore(tests): fix transaction isolation level tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanblade committed Jan 31, 2024
1 parent e2f70cd commit e3f59f6
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 241 deletions.
170 changes: 53 additions & 117 deletions databases/sync_tests/test_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from prisma import Prisma
from prisma.models import User, Profile

from ..utils import CURRENT_DATABASE
from ..utils import CURRENT_DATABASE, RawQueries


def test_model_query(client: Prisma) -> None:
Expand Down Expand Up @@ -203,122 +203,58 @@ def test_transaction_already_closed(client: Prisma) -> None:
assert exc.match('Transaction already closed')


@pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available')
def test_read_uncommited_isolation_level(client: Prisma) -> None:
"""A transaction isolation level is set to `READ_UNCOMMITED`"""
client2 = Prisma()
client2.connect()

user = client.user.create(data={'name': 'Robert'})

with client.tx(isolation_level=prisma.TransactionIsolationLevel.READ_UNCOMMITED) as tx1:
tx1_user = tx1.user.find_first_or_raise(where={'id': user.id})
tx1_count = tx1.user.count()

with client2.tx() as tx2:
tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id})
tx2.user.create(data={'name': 'Bobby'})

dirty_user = tx1.user.find_first_or_raise(where={'id': user.id})

non_repeatable_user = tx1.user.find_first_or_raise(where={'id': user.id})
phantom_count = tx1.user.count()

# Have dirty read
assert tx1_user.name != dirty_user.name
# Have non-repeatable read
assert tx1_user.name != non_repeatable_user.name
# Have phantom read
assert tx1_count != phantom_count


@pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available')
def test_read_commited_isolation_level(client: Prisma) -> None:
"""A transaction isolation level is set to `READ_COMMITED`"""
client2 = Prisma()
client2.connect()

user = client.user.create(data={'name': 'Robert'})

with client.tx(isolation_level=prisma.TransactionIsolationLevel.READ_COMMITED) as tx1:
tx1_user = tx1.user.find_first_or_raise(where={'id': user.id})
tx1_count = tx1.user.count()

with client2.tx() as tx2:
tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id})
tx2.user.create(data={'name': 'Bobby'})

dirty_user = tx1.user.find_first_or_raise(where={'id': user.id})

non_repeatable_user = tx1.user.find_first_or_raise(where={'id': user.id})
phantom_count = tx1.user.count()

# No dirty read
assert tx1_user.name == dirty_user.name
# Have non-repeatable read
assert tx1_user.name != non_repeatable_user.name
# Have phantom read
assert tx1_count != phantom_count


@pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available')
def test_repeatable_read_isolation_level(client: Prisma) -> None:
"""A transaction isolation level is set to `REPEATABLE_READ`"""
client2 = Prisma()
client2.connect()

user = client.user.create(data={'name': 'Robert'})

with client.tx(isolation_level=prisma.TransactionIsolationLevel.REPEATABLE_READ) as tx1:
tx1_user = tx1.user.find_first_or_raise(where={'id': user.id})
tx1_count = tx1.user.count()

with client2.tx() as tx2:
tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id})
tx2.user.create(data={'name': 'Bobby'})

dirty_user = tx1.user.find_first_or_raise(where={'id': user.id})

non_repeatable_user = tx1.user.find_first_or_raise(where={'id': user.id})
phantom_count = tx1.user.count()

# No dirty read
assert tx1_user.name == dirty_user.name
# No non-repeatable read
assert tx1_user.name == non_repeatable_user.name
# Have phantom read
assert tx1_count != phantom_count


@pytest.mark.skipif(True, reason='Available for SQL Server only')
def test_snapshot_isolation_level() -> None:
"""A transaction isolation level is set to `SNAPSHOT`"""
raise NotImplementedError


def test_serializable_isolation_level(client: Prisma) -> None:
"""A transaction isolation level is set to `SERIALIZABLE`"""
client2 = Prisma()
client2.connect()

user = client.user.create(data={'name': 'Robert'})

with client.tx(isolation_level=prisma.TransactionIsolationLevel.SERIALIZABLE) as tx1:
tx1_user = tx1.user.find_first_or_raise(where={'id': user.id})
tx1_count = tx1.user.count()

with client2.tx() as tx2:
tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id})
tx2.user.create(data={'name': 'Bobby'})
@pytest.mark.parametrize(
('input_level', 'expected_level'),
[
pytest.param(
prisma.TransactionIsolationLevel.READ_UNCOMMITTED,
'READ_UNCOMMITTED',
id='read uncommitted',
marks=pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available'),
),
pytest.param(
prisma.TransactionIsolationLevel.READ_COMMITTED,
'READ_COMMITTED',
id='read committed',
marks=pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available'),
),
pytest.param(
prisma.TransactionIsolationLevel.REPEATABLE_READ,
'REPEATABLE_READ',
id='repeatable read',
marks=pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available'),
),
pytest.param(
prisma.TransactionIsolationLevel.SNAPSHOT,
'SNAPSHOT',
id='snapshot',
marks=pytest.mark.skipif(True, reason='Available for SQL Server only'),
),
pytest.param(
prisma.TransactionIsolationLevel.SERIALIZABLE,
'SERIALIZABLE',
id='serializable',
marks=pytest.mark.skipif(
CURRENT_DATABASE == 'sqlite', reason='PRAGMA has only effect in shared-cache mode'
),
),
],
)
# TODO: remove after issue will be resolved
@pytest.mark.skipif(CURRENT_DATABASE in ['mysql', 'mariadb'], reason='https://github.com/prisma/prisma/issues/22890')
def test_isolation_level(
client: Prisma, raw_queries: RawQueries, input_level: prisma.TransactionIsolationLevel, expected_level: str
) -> None:
"""A transaction isolation level is set correctly"""
with client.tx(isolation_level=input_level) as tx:
results = tx.query_raw(raw_queries.select_tx_isolation)

dirty_user = tx1.user.find_first_or_raise(where={'id': user.id})
assert len(results) == 1

non_repeatable_user = tx1.user.find_first_or_raise(where={'id': user.id})
phantom_count = tx1.user.count()
row = results[0]
assert any(row)

# No dirty read
assert tx1_user.name == dirty_user.name
# No non-repeatable read
assert tx1_user.name == non_repeatable_user.name
# No phantom read
assert tx1_count == phantom_count
level = next(iter(row.values()))
# The result can depends on the database, so we do upper() and replace()
level = str(level).upper().replace(' ', '_').replace('-', '_')
assert level == expected_level
174 changes: 53 additions & 121 deletions databases/tests/test_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from prisma import Prisma
from prisma.models import User, Profile

from ..utils import CURRENT_DATABASE
from ..utils import CURRENT_DATABASE, RawQueries


@pytest.mark.asyncio
Expand Down Expand Up @@ -215,126 +215,58 @@ async def test_transaction_already_closed(client: Prisma) -> None:


@pytest.mark.asyncio
@pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available')
async def test_read_uncommited_isolation_level(client: Prisma) -> None:
"""A transaction isolation level is set to `READ_UNCOMMITED`"""
client2 = Prisma()
await client2.connect()

user = await client.user.create(data={'name': 'Robert'})

async with client.tx(isolation_level=prisma.TransactionIsolationLevel.READ_UNCOMMITED) as tx1:
tx1_user = await tx1.user.find_first_or_raise(where={'id': user.id})
tx1_count = await tx1.user.count()

async with client2.tx() as tx2:
await tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id})
await tx2.user.create(data={'name': 'Bobby'})

dirty_user = await tx1.user.find_first_or_raise(where={'id': user.id})

non_repeatable_user = await tx1.user.find_first_or_raise(where={'id': user.id})
phantom_count = await tx1.user.count()

# Have dirty read
assert tx1_user.name != dirty_user.name
# Have non-repeatable read
assert tx1_user.name != non_repeatable_user.name
# Have phantom read
assert tx1_count != phantom_count


@pytest.mark.asyncio
@pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available')
async def test_read_commited_isolation_level(client: Prisma) -> None:
"""A transaction isolation level is set to `READ_COMMITED`"""
client2 = Prisma()
await client2.connect()

user = await client.user.create(data={'name': 'Robert'})

async with client.tx(isolation_level=prisma.TransactionIsolationLevel.READ_COMMITED) as tx1:
tx1_user = await tx1.user.find_first_or_raise(where={'id': user.id})
tx1_count = await tx1.user.count()

async with client2.tx() as tx2:
await tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id})
await tx2.user.create(data={'name': 'Bobby'})

dirty_user = await tx1.user.find_first_or_raise(where={'id': user.id})

non_repeatable_user = await tx1.user.find_first_or_raise(where={'id': user.id})
phantom_count = await tx1.user.count()

# No dirty read
assert tx1_user.name == dirty_user.name
# Have non-repeatable read
assert tx1_user.name != non_repeatable_user.name
# Have phantom read
assert tx1_count != phantom_count


@pytest.mark.asyncio
@pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available')
async def test_repeatable_read_isolation_level(client: Prisma) -> None:
"""A transaction isolation level is set to `REPEATABLE_READ`"""
client2 = Prisma()
await client2.connect()

user = await client.user.create(data={'name': 'Robert'})

async with client.tx(isolation_level=prisma.TransactionIsolationLevel.REPEATABLE_READ) as tx1:
tx1_user = await tx1.user.find_first_or_raise(where={'id': user.id})
tx1_count = await tx1.user.count()

async with client2.tx() as tx2:
await tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id})
await tx2.user.create(data={'name': 'Bobby'})

dirty_user = await tx1.user.find_first_or_raise(where={'id': user.id})

non_repeatable_user = await tx1.user.find_first_or_raise(where={'id': user.id})
phantom_count = await tx1.user.count()

# No dirty read
assert tx1_user.name == dirty_user.name
# No non-repeatable read
assert tx1_user.name == non_repeatable_user.name
# Have phantom read
assert tx1_count != phantom_count


@pytest.mark.asyncio
@pytest.mark.skipif(True, reason='Available for SQL Server only')
async def test_snapshot_isolation_level() -> None:
"""A transaction isolation level is set to `SNAPSHOT`"""
raise NotImplementedError


@pytest.mark.asyncio
async def test_serializable_isolation_level(client: Prisma) -> None:
"""A transaction isolation level is set to `SERIALIZABLE`"""
client2 = Prisma()
await client2.connect()

user = await client.user.create(data={'name': 'Robert'})

async with client.tx(isolation_level=prisma.TransactionIsolationLevel.SERIALIZABLE) as tx1:
tx1_user = await tx1.user.find_first_or_raise(where={'id': user.id})
tx1_count = await tx1.user.count()

async with client2.tx() as tx2:
await tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id})
await tx2.user.create(data={'name': 'Bobby'})
@pytest.mark.parametrize(
('input_level', 'expected_level'),
[
pytest.param(
prisma.TransactionIsolationLevel.READ_UNCOMMITTED,
'READ_UNCOMMITTED',
id='read uncommitted',
marks=pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available'),
),
pytest.param(
prisma.TransactionIsolationLevel.READ_COMMITTED,
'READ_COMMITTED',
id='read committed',
marks=pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available'),
),
pytest.param(
prisma.TransactionIsolationLevel.REPEATABLE_READ,
'REPEATABLE_READ',
id='repeatable read',
marks=pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available'),
),
pytest.param(
prisma.TransactionIsolationLevel.SNAPSHOT,
'SNAPSHOT',
id='snapshot',
marks=pytest.mark.skipif(True, reason='Available for SQL Server only'),
),
pytest.param(
prisma.TransactionIsolationLevel.SERIALIZABLE,
'SERIALIZABLE',
id='serializable',
marks=pytest.mark.skipif(
CURRENT_DATABASE == 'sqlite', reason='PRAGMA has only effect in shared-cache mode'
),
),
],
)
# TODO: remove after issue will be resolved
@pytest.mark.skipif(CURRENT_DATABASE in ['mysql', 'mariadb'], reason='https://github.com/prisma/prisma/issues/22890')
async def test_isolation_level(
client: Prisma, raw_queries: RawQueries, input_level: prisma.TransactionIsolationLevel, expected_level: str
) -> None:
"""A transaction isolation level is set correctly"""
async with client.tx(isolation_level=input_level) as tx:
results = await tx.query_raw(raw_queries.select_tx_isolation)

dirty_user = await tx1.user.find_first_or_raise(where={'id': user.id})
assert len(results) == 1

non_repeatable_user = await tx1.user.find_first_or_raise(where={'id': user.id})
phantom_count = await tx1.user.count()
row = results[0]
assert any(row)

# No dirty read
assert tx1_user.name == dirty_user.name
# No non-repeatable read
assert tx1_user.name == non_repeatable_user.name
# No phantom read
assert tx1_count == phantom_count
level = next(iter(row.values()))
# The result can depends on the database, so we do upper() and replace()
level = str(level).upper().replace(' ', '_').replace('-', '_')
assert level == expected_level
Loading

0 comments on commit e3f59f6

Please sign in to comment.