Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Configurable batch size and max wait limit for targets #1876

Open
wants to merge 97 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
11c56e3
add _perftimer file to helpers folder
BuzzCutNorman Jul 24, 2023
550ecff
add sink_timer, batch_size_rows, and batch_wait_limit_seconds
BuzzCutNorman Jul 25, 2023
68c3484
calcuation fix
BuzzCutNorman Jul 25, 2023
66f750e
correct class hint for sink_timer
BuzzCutNorman Jul 25, 2023
62c2c32
mypy fixes round 1
BuzzCutNorman Jul 25, 2023
541a46d
mypy fixes round 2
BuzzCutNorman Jul 25, 2023
028e3d9
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Jul 25, 2023
4309c8e
Merge branch 'main' of https://github.com/meltano/sdk into 1626-confi…
BuzzCutNorman Aug 4, 2023
dc2a2d8
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 8, 2023
01f4539
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 8, 2023
8a591c1
simple tests for PerfTimer and BatchPerfTimer
BuzzCutNorman Aug 8, 2023
eb6c9e7
clear _stop_time after lap is calculated
BuzzCutNorman Aug 8, 2023
f6bbf0c
wider variation when testing perftimer lap_time
BuzzCutNorman Aug 8, 2023
74b1653
Apply suggestions from code review
BuzzCutNorman Aug 10, 2023
bb0c8fb
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 14, 2023
c151f84
comment out batch timer additions
BuzzCutNorman Aug 14, 2023
49c014c
add BatchPerfTimer and supporting properties
BuzzCutNorman Aug 14, 2023
a425774
add BatchTimer start and finish to drain_one
BuzzCutNorman Aug 14, 2023
379f814
remove commented out code
BuzzCutNorman Aug 14, 2023
0303f25
change perf_diff_allowed_max to .25 better
BuzzCutNorman Aug 14, 2023
1fa9752
update test match perf_diff_allowed_max change
BuzzCutNorman Aug 14, 2023
d2db4fb
added tests for batch_size_rows and max_size
BuzzCutNorman Aug 14, 2023
99a9fb5
add batch_wait_limit_seconds tests
BuzzCutNorman Aug 15, 2023
059c392
added _lap_manager
BuzzCutNorman Aug 15, 2023
7ac5a6c
calling _lap_manager in drain_one
BuzzCutNorman Aug 15, 2023
0420db8
mypy fix: moved sink_timer check into _lap_manager
BuzzCutNorman Aug 15, 2023
4b5b3c6
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 15, 2023
1aec608
Apply suggestions from code review
BuzzCutNorman Aug 15, 2023
b5e93a6
added batch_wait_limit_seconds value test
BuzzCutNorman Aug 15, 2023
57b7f37
add tests for new settings
BuzzCutNorman Aug 15, 2023
f5874c9
BATCH_SIZE_ROWS_CONFIG added
BuzzCutNorman Aug 15, 2023
3b73e64
BATCH_WAIT_LIMIT_SECONDS_CONFIG added
BuzzCutNorman Aug 15, 2023
b48f3d0
move logging and update formatting.
BuzzCutNorman Aug 16, 2023
6c465f7
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
edgarrmondragon Aug 16, 2023
3cee381
add pertimer on_the_clock tests
BuzzCutNorman Aug 17, 2023
7025280
add batchperftimer is_too_old tests
BuzzCutNorman Aug 17, 2023
8bbb346
add PerfTimer.on_the_clock()
BuzzCutNorman Aug 17, 2023
fd8f171
add BatchPerfTimer.is_too_old
BuzzCutNorman Aug 17, 2023
2cfa516
add Sink.is_too_old tests
BuzzCutNorman Aug 17, 2023
da2808a
added property Sink.is_too_old
BuzzCutNorman Aug 17, 2023
01962e0
remove is_too_old tests
BuzzCutNorman Aug 18, 2023
c89fd74
move is_too_old to Sink
BuzzCutNorman Aug 18, 2023
2f51fd4
Add is_too_old drain to _process_record_message
BuzzCutNorman Aug 18, 2023
bf582e0
fix AttributeError NoneType has no start_time
BuzzCutNorman Aug 18, 2023
01705c6
remove counter_based_max_size and logging
BuzzCutNorman Aug 18, 2023
29ae3de
clean up test_batch_wait_limit_seconds
BuzzCutNorman Aug 18, 2023
1b5cc6a
update test for batch_dynamic_managment
BuzzCutNorman Aug 18, 2023
3d19a03
BatchPerfTimer take in a max and internally sink_max_size internally
BuzzCutNorman Aug 18, 2023
50d00c2
add batch_dynamic_management to Sink
BuzzCutNorman Aug 18, 2023
fb5d6f2
updated perf_diff tests to utilize allowed min and max
BuzzCutNorman Aug 18, 2023
e574003
add call to counter_based_max_size and logging to _lap_manager
BuzzCutNorman Aug 18, 2023
7f48881
expand is_full to utilize sink_timer.sink_max_size
BuzzCutNorman Aug 18, 2023
7a54c69
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 22, 2023
1261c04
mypy fixes
BuzzCutNorman Aug 22, 2023
0436a74
added defualt for when only batch management is set
BuzzCutNorman Aug 22, 2023
b92d8c1
add about_info test for batch_dynamic_management
BuzzCutNorman Aug 23, 2023
f78cc0a
added batch_dynamic_management to capabilities
BuzzCutNorman Aug 23, 2023
ec57105
add batch_dynamic_management to config if missing
BuzzCutNorman Aug 23, 2023
68a520c
add and or update tests for updated is_full utilizing _drain_function
BuzzCutNorman Aug 24, 2023
b36acf8
update is_full to utilize _drain_function and set_drain_function and …
BuzzCutNorman Aug 24, 2023
933fa21
remove is_too_old check from _process_record_message and update is_fu…
BuzzCutNorman Aug 24, 2023
c342e44
mypy fixes
BuzzCutNorman Aug 24, 2023
9ce87da
update docs implementation index to include target_batch_full
BuzzCutNorman Aug 24, 2023
af84ecd
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
edgarrmondragon Aug 25, 2023
31f6ef5
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
edgarrmondragon Aug 28, 2023
9dae576
Merge branch '1626-configurable-batch-size-and-max-wait-limit-for-tar…
BuzzCutNorman Aug 28, 2023
b2697ee
added documentation for updated is_full and associated new functions
BuzzCutNorman Aug 28, 2023
e874f73
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 28, 2023
f36a79f
Merge branch '1626-configurable-batch-size-and-max-wait-limit-for-tar…
BuzzCutNorman Aug 29, 2023
2952513
edits to target batch full documentation
BuzzCutNorman Aug 29, 2023
c79cf64
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Aug 29, 2023
4f75636
Merge branch '1626-configurable-batch-size-and-max-wait-limit-for-tar…
BuzzCutNorman Aug 29, 2023
879995a
removed cached_property decorator from max_size
BuzzCutNorman Aug 29, 2023
1c81669
Edits and formatting changes.
BuzzCutNorman Aug 29, 2023
7a1df1d
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
edgarrmondragon Aug 31, 2023
c804b76
further edits and formatting
BuzzCutNorman Sep 5, 2023
648656b
Merge branch '1626-configurable-batch-size-and-max-wait-limit-for-tar…
BuzzCutNorman Sep 5, 2023
dc8481d
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Sep 5, 2023
ffabffa
Apply suggestions from code review
BuzzCutNorman Sep 5, 2023
50e82ae
Apply suggestion from code review
BuzzCutNorman Sep 6, 2023
f977ca2
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1626…
BuzzCutNorman Oct 6, 2023
b2181e7
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Oct 16, 2023
484c7cb
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Oct 24, 2023
2a870af
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1626…
BuzzCutNorman Dec 5, 2023
fd787f2
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Dec 5, 2023
e159ae6
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Dec 8, 2023
7c4756c
added return type annotations to start_time, stop_time, and lap_time
BuzzCutNorman Dec 8, 2023
e50f1a2
resolve mypy unsupported operand types error
BuzzCutNorman Dec 8, 2023
f4c2c38
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Jan 9, 2024
5c1b5ea
change private Sink._lap_manager() to public Sink.lap_manager()
BuzzCutNorman Jan 9, 2024
39dd6e7
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1626…
BuzzCutNorman Jan 11, 2024
ab2782d
added settings to default_settings in test_target_about_info
BuzzCutNorman Jan 12, 2024
d06481d
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Jan 22, 2024
295802f
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Jan 22, 2024
17f143c
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Jan 29, 2024
a0e34c9
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 1626…
BuzzCutNorman Feb 2, 2024
49a64e6
Merge branch 'main' into 1626-configurable-batch-size-and-max-wait-li…
BuzzCutNorman Feb 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions singer_sdk/helpers/_perftimer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""performace timers which deal with dynamic timing events."""
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved

from __future__ import annotations

import time


class PerfTimerError(Exception):
"""A custom exception used to report errors in use of BatchPerfTimer class."""


class PerfTimer:
"""A Basic Performance Timer Class."""

_start_time: float | None = None
_stop_time: float | None = None
_lap_time: float | None = None

@property
def start_time(self):
return self._start_time

@property
def stop_time(self):
return self._stop_time

@property
def lap_time(self):
return self._lap_time

def start(self) -> None:
"""Start the timer."""
if self._start_time is not None:
msg = "Timer is running. Use .stop() to stop it"
raise PerfTimerError(msg)

self._start_time = time.perf_counter()

def stop(self) -> None:
"""Stop the timer, Stores the elapsed time, and reset."""
if self._start_time is None:
msg = "Timer is not running. Use .start() to start it"
raise PerfTimerError(msg)

self._stop_time = time.perf_counter()
self._lap_time = self._stop_time - self._start_time
self._start_time = None
self._stop_time = None


class BatchPerfTimer(PerfTimer):
"""The Performance Timer for Target bulk inserts."""

def __init__(
self,
max_size: int,
max_perf_counter: float,
) -> None:
self._sink_max_size: int = max_size
self._max_perf_counter: float = max_perf_counter

SINK_MAX_SIZE_CELING: int = 100000
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
"""The max size a bulk insert can be"""

@property
def sink_max_size(self) -> int:
"""The current MAX_SIZE_DEFAULT."""
return self._sink_max_size

@property
def max_perf_counter(self) -> float:
"""How many seconds can pass before a insert."""
return self._max_perf_counter

@property
def perf_diff_allowed_min(self) -> float:
"""The mininum negative variance allowed, 1/3 worse than wanted."""
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
return -1.0 * (self.max_perf_counter * 0.33)

@property
def perf_diff_allowed_max(self) -> float:
"""The maximum postive variace allowed, # 3/4 better than wanted."""
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
return self.max_perf_counter * 0.75

@property
def perf_diff(self) -> float:
"""Difference between wanted elapsed time and actual elpased time."""
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
diff = 0
if self._lap_time:
diff = self.max_perf_counter - self.lap_time
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
return float(diff)

def counter_based_max_size(self) -> int: # noqa: C901
"""Caclulate performance based batch size."""
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
correction = 0
if self.perf_diff < self.perf_diff_allowed_min:
if self.sink_max_size >= 15000: # noqa: PLR2004
correction = -5000
elif self.sink_max_size >= 10000: # noqa: PLR2004
correction = -1000
elif self.sink_max_size >= 1000: # noqa: PLR2004
correction = -100
elif self.sink_max_size > 10: # noqa: PLR2004
correction = -10
if (
self.perf_diff >= self.perf_diff_allowed_max
and self.sink_max_size < self.SINK_MAX_SIZE_CELING
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
):
if self.sink_max_size >= 10000: # noqa: PLR2004
correction = 10000
elif self.sink_max_size >= 1000: # noqa: PLR2004
correction = 1000
elif self.sink_max_size >= 100: # noqa: PLR2004
correction = 100
elif self.sink_max_size >= 10: # noqa: PLR2004
correction = 10
self._sink_max_size += correction
return self.sink_max_size
71 changes: 71 additions & 0 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from singer_sdk.connectors import SQLConnector
from singer_sdk.exceptions import ConformedNameClashException
from singer_sdk.helpers._conformers import replace_leading_digit
from singer_sdk.helpers._perftimer import BatchPerfTimer
from singer_sdk.sinks.batch import BatchSink

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -49,6 +50,23 @@ def __init__(
"""
self._connector: SQLConnector
self._connector = connector or self.connector_class(dict(target.config))
self._batch_size_rows: int = target.config.get(
"batch_size_rows",
super().MAX_SIZE_DEFAULT,
)
self._batch_wait_limit_seconds: int | None = target.config.get(
"batch_wait_limit_seconds",
)

self._sink_timer: BatchPerfTimer | None = None

if self._batch_wait_limit_seconds is not None:
self._batch_size_rows = 100
self._sink_timer = BatchPerfTimer(
self._batch_size_rows,
self._batch_wait_limit_seconds,
)

super().__init__(target, stream_name, schema, key_properties)

@property
Expand All @@ -69,6 +87,51 @@ def connection(self) -> sqlalchemy.engine.Connection:
"""
return self.connector.connection

@property
def batch_size_rows(self) -> int:
"""Get batch_size_rows object.

Returns:
A batch_size_rows object.
"""
return self._batch_size_rows

@batch_size_rows.setter
def batch_size_rows(self, new_value: int) -> None:
"""Set batch_size_rows object to the given value.

Args:
new_value: The value you want to set batch_size_rows too.
"""
self._batch_size_rows = new_value

@property
def batch_wait_limit_seconds(self) -> int | None:
"""Get batch_wait_limit_seconds object.

Returns:
A batch_wait_limit_seconds object.
"""
return self._batch_wait_limit_seconds

@property
def sink_timer(self) -> BatchPerfTimer | None:
"""Get sink_timer object.

Returns:
A sink_timer object.
"""
return self._sink_timer

@property
def max_size(self) -> int:
"""Get max batch size.

Returns:
Max number of records to batch before `is_full=True`
"""
return self.batch_size_rows

@property
def table_name(self) -> str:
"""Return the table name, with no schema or database part.
Expand Down Expand Up @@ -336,6 +399,14 @@ def bulk_insert_records(
with self.connector._connect() as conn, conn.begin():
result = conn.execute(insert_sql, new_records)

# Finish Line for max_size perf counter
if self.sink_timer is not None:
if self.sink_timer.start_time is not None:
self.sink_timer.stop()
self.batch_size_rows = self.sink_timer.counter_based_max_size()

# Starting Line for max_size perf counter
self.sink_timer.start()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wdyt about moving this up the stack? Perhaps to

sink.process_batch(draining_status)

That way all targets, not just SQL, benefit from this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. The code has been moved up. Please let me know if this matches up with what you had in mind.

return result.rowcount

def merge_upsert_from_table(
Expand Down
98 changes: 98 additions & 0 deletions tests/core/test_perf_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Perf Timer tests."""

from __future__ import annotations

import time

import pytest

from singer_sdk.helpers._perftimer import BatchPerfTimer, PerfTimer, PerfTimerError


def test_perftimer_properties():
timer: PerfTimer = PerfTimer()
timer._start_time = 1.1
timer._stop_time = 1.10
timer._lap_time = 0.09
assert timer.start_time is timer._start_time
assert timer.stop_time is timer.stop_time
assert timer._lap_time is timer.lap_time
assert timer.start_time == 1.1
assert timer.stop_time == 1.10
assert timer.lap_time == 0.09


def test_perftimer_actions():
timer: PerfTimer = PerfTimer()
timer.start()
assert timer.start_time is not None
assert timer.stop_time is None
assert timer.lap_time is None
time.sleep(1.1)
timer.stop()
assert timer.lap_time >= 1
assert timer.lap_time < 1.5
assert timer.start_time is None
assert timer.stop_time is None


def test_perftimer_errors():
timer: PerfTimer = PerfTimer()
with pytest.raises(
PerfTimerError,
match=r"Timer is not running. Use .start\(\) to start it",
):
timer.stop()
# starting a timer to test start() error
timer.start()
with pytest.raises(
PerfTimerError,
match=r"Timer is running. Use .stop\(\) to stop it",
):
timer.start()
# stopping the timer at the end of the test
timer.stop()


def test_batchperftimer_properties():
batchtimer: BatchPerfTimer = BatchPerfTimer(100, 1)
batchtimer._lap_time = 0.10
assert batchtimer._sink_max_size is batchtimer.sink_max_size
assert batchtimer._max_perf_counter is batchtimer.max_perf_counter
assert batchtimer.sink_max_size == 100
assert batchtimer.max_perf_counter == 1
assert batchtimer.perf_diff_allowed_max == 0.75
assert batchtimer.perf_diff_allowed_min == -0.33
assert batchtimer.perf_diff == 0.90


def test_batchperftimer_counter_based_max_size_additive():
batchtimer: BatchPerfTimer = BatchPerfTimer(10, 1)
batchtimer._lap_time = 0.24
assert batchtimer.perf_diff > batchtimer.perf_diff_allowed_max
assert batchtimer.counter_based_max_size() == 20
batchtimer._sink_max_size = 100
assert batchtimer.counter_based_max_size() == 200
batchtimer._sink_max_size = 1000
assert batchtimer.counter_based_max_size() == 2000
batchtimer._sink_max_size = 10000
assert batchtimer.counter_based_max_size() == 20000
batchtimer._sink_max_size = 100000
assert batchtimer.sink_max_size == batchtimer.SINK_MAX_SIZE_CELING
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
assert batchtimer.counter_based_max_size() == 100000


def test_batchperftimer_counter_based_max_size_reductive():
batchtimer: BatchPerfTimer = BatchPerfTimer(100000, 1)
batchtimer._lap_time = 1.5
assert batchtimer.perf_diff < batchtimer.perf_diff_allowed_min
assert batchtimer.sink_max_size == batchtimer.SINK_MAX_SIZE_CELING
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
assert batchtimer.counter_based_max_size() == 95000
batchtimer._sink_max_size = 10000
assert batchtimer.counter_based_max_size() == 9000
batchtimer._sink_max_size = 1000
assert batchtimer.counter_based_max_size() == 900
batchtimer._sink_max_size = 100
assert batchtimer.counter_based_max_size() == 90
batchtimer._sink_max_size = 10
assert batchtimer.counter_based_max_size() == 10