Skip to content

Commit

Permalink
refactor: clean up stream exception logic
Browse files Browse the repository at this point in the history
  • Loading branch information
RogerHYang committed Jan 19, 2024
1 parent 305cca6 commit 5d4f372
Showing 1 changed file with 16 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,22 @@ def __next__(self) -> Any:
# pass through mistaken calls
if not hasattr(self.__wrapped__, "__next__"):
self.__wrapped__.__next__()
iteration_is_finished = False
status_code: Optional[trace_api.StatusCode] = None
try:
chunk: Any = self.__wrapped__.__next__()
except Exception as exception:
iteration_is_finished = True
if isinstance(exception, StopIteration):
status_code = trace_api.StatusCode.OK
else:
status_code = trace_api.StatusCode.ERROR
self._self_with_span.record_exception(exception)
if not self._self_is_finished:
if isinstance(exception, StopIteration):
status_code = trace_api.StatusCode.OK
else:
status_code = trace_api.StatusCode.ERROR
self._self_with_span.record_exception(exception)
if status_code is trace_api.StatusCode.ERROR:
self._self_with_span.record_exception(exception)
self._finish_tracing(status_code=status_code)
raise
else:
self._process_chunk(chunk)
status_code = trace_api.StatusCode.OK
return chunk
finally:
if iteration_is_finished and not self._self_is_finished:
self._finish_tracing(status_code=status_code)

def __aiter__(self) -> AsyncIterator[Any]:
return self
Expand All @@ -93,25 +90,20 @@ async def __anext__(self) -> Any:
# pass through mistaken calls
if not hasattr(self.__wrapped__, "__anext__"):
self.__wrapped__.__anext__()
iteration_is_finished = False
status_code: Optional[trace_api.StatusCode] = None
try:
chunk: Any = await self.__wrapped__.__anext__()
except Exception as exception:
iteration_is_finished = True
if isinstance(exception, StopAsyncIteration):
status_code = trace_api.StatusCode.OK
else:
status_code = trace_api.StatusCode.ERROR
self._self_with_span.record_exception(exception)
if not self._self_is_finished:
if isinstance(exception, StopAsyncIteration):
status_code = trace_api.StatusCode.OK
else:
status_code = trace_api.StatusCode.ERROR
self._self_with_span.record_exception(exception)
self._finish_tracing(status_code=status_code)
raise
else:
self._process_chunk(chunk)
status_code = trace_api.StatusCode.OK
return chunk
finally:
if iteration_is_finished and not self._self_is_finished:
self._finish_tracing(status_code=status_code)

def _process_chunk(self, chunk: Any) -> None:
if not self._self_iteration_count:
Expand Down

0 comments on commit 5d4f372

Please sign in to comment.