Skip to content

Commit

Permalink
streaming: Don't accidentally serialize TLS
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Jan 5, 2025
1 parent 5c5c475 commit 97cedb1
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ function migrate_stream!(stream::Stream, w::Integer=myid())

# TODO: Wire up listener to ferry cancel_token notifications to remote
# worker once migrations occur during runtime
tls = get_tls()
tls = poolset(get_tls())
@assert w == myid() "Only pull-based migration is currently supported"
#remote_cancel_token = clone_cancel_token_remote(get_tls().cancel_token, worker_id)

Expand All @@ -322,8 +322,11 @@ function migrate_stream!(stream::Stream, w::Integer=myid())
# Initialize the StreamStore on the destination with the unsent inputs/outputs.
STREAM_THUNK_ID[] = thunk_id
@assert !in_task()
set_tls!(tls)
#get_tls().cancel_token = MemPool.access_ref(identity, remote_cancel_token; local_only=true)

# N.B. It's not very valid to share TLS across tasks, but we do this
# so that input/output tasks have access to TID and cancel token
set_tls!(copy(MemPool.access_ref(identity, tls; local_only=true)))

unsent_inputs, unsent_outputs = unsent
for (input_uid, inputs) in unsent_inputs
input_stream = store.input_streams[input_uid]
Expand Down

0 comments on commit 97cedb1

Please sign in to comment.