Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better error message, fix transaction error recording #181

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions dbos/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
IsolationLevel,
)

from sqlalchemy.exc import DBAPIError
from sqlalchemy.exc import DBAPIError, InvalidRequestError

P = ParamSpec("P") # A generic type for workflow parameters
R = TypeVar("R", covariant=True) # A generic type for workflow return values
Expand Down Expand Up @@ -545,6 +545,7 @@ def invoke_tx(*args: Any, **kwargs: Any) -> Any:
max_retry_wait_seconds = 2.0
while True:
has_recorded_error = False
txn_error: Optional[Exception] = None
try:
with session.begin():
# This must be the first statement in the transaction!
Expand Down Expand Up @@ -608,15 +609,24 @@ def invoke_tx(*args: Any, **kwargs: Any) -> Any:
max_retry_wait_seconds,
)
continue
txn_error = dbapi_error
raise
except InvalidRequestError as invalid_request_error:
dbos.logger.error(
f"InvalidRequestError in transaction {func.__qualname__} \033[1m Hint: Do not call commit() or rollback() within a DBOS transaction.\033[0m"
)
txn_error = invalid_request_error
raise
except Exception as error:
txn_error = error
raise
finally:
# Don't record the error if it was already recorded
if not has_recorded_error:
if txn_error and not has_recorded_error:
txn_output["error"] = (
_serialization.serialize_exception(error)
_serialization.serialize_exception(txn_error)
)
dbos._app_db.record_transaction_error(txn_output)
raise
return output

if inspect.iscoroutinefunction(func):
Expand Down
6 changes: 5 additions & 1 deletion dbos/_sys_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ._dbos_config import ConfigFile
from ._error import (
DBOSDeadLetterQueueError,
DBOSException,
DBOSNonExistentWorkflowError,
DBOSWorkflowConflictIDError,
)
Expand Down Expand Up @@ -405,7 +406,10 @@ def get_workflow_status_within_wf(
res["output"]
)
return resstat
return None
else:
raise DBOSException(
"Workflow status record not found. This should not happen! \033[1m Hint: Check if your workflow is deterministic.\033[0m"
)
stat = self.get_workflow_status(workflow_uuid)
self.record_operation_result(
{
Expand Down
Loading
Loading