Skip to content

Commit

Permalink
[hip] Switched the completion list to use a list instead of a stack. (i…
Browse files Browse the repository at this point in the history
…ree-org#18048)

This was sometimes causing completion events to come in out of
order.

This was previously merged into the shared/sdxl_quantized branch.
Working on PR to move all of the updates made in hip over to cuda.

---------

Signed-off-by: Andrew Woloszyn <andrew.woloszyn@gmail.com>
  • Loading branch information
AWoloszyn authored Aug 1, 2024
1 parent 887c276 commit c282ea5
Showing 1 changed file with 146 additions and 66 deletions.
212 changes: 146 additions & 66 deletions runtime/src/iree/hal/drivers/hip/pending_queue_actions.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,68 @@ static void iree_hal_hip_queue_action_list_destroy(
//===----------------------------------------------------------------------===//

// Ready action atomic slist entry struct.
typedef struct iree_hal_hip_atomic_slist_entry_t {
typedef struct iree_hal_hip_entry_list_node_t {
iree_hal_hip_queue_action_t* ready_list_head;
iree_atomic_slist_intrusive_ptr_t slist_next;
} iree_hal_hip_atomic_slist_entry_t;
struct iree_hal_hip_entry_list_node_t* next;
} iree_hal_hip_entry_list_node_t;

typedef struct iree_hal_hip_entry_list_t {
iree_slim_mutex_t guard_mutex;

iree_hal_hip_entry_list_node_t* head IREE_GUARDED_BY(guard_mutex);
iree_hal_hip_entry_list_node_t* tail IREE_GUARDED_BY(guard_mutex);
} iree_hal_hip_entry_list_t;

static iree_hal_hip_entry_list_node_t* iree_hal_hip_entry_list_pop(
iree_hal_hip_entry_list_t* list) {
iree_hal_hip_entry_list_node_t* out = NULL;
iree_slim_mutex_lock(&list->guard_mutex);
if (list->head) {
out = list->head;
list->head = list->head->next;
if (out == list->tail) {
list->tail = NULL;
}
}
iree_slim_mutex_unlock(&list->guard_mutex);
return out;
}

void iree_hal_hip_entry_list_push(iree_hal_hip_entry_list_t* list,
iree_hal_hip_entry_list_node_t* next) {
iree_slim_mutex_lock(&list->guard_mutex);
next->next = NULL;
if (list->tail) {
list->tail->next = next;
list->tail = next;
} else {
list->head = next;
list->tail = next;
}
iree_slim_mutex_unlock(&list->guard_mutex);
}

static void iree_hal_hip_ready_action_list_deinitialize(
iree_hal_hip_entry_list_t* list, iree_allocator_t host_allocator) {
iree_hal_hip_entry_list_node_t* head = list->head;
while (head) {
if (!head) break;
iree_hal_hip_queue_action_list_destroy(head->ready_list_head);
list->head = head->next;
iree_allocator_free(host_allocator, head);
}
iree_slim_mutex_deinitialize(&list->guard_mutex);
}

// Ready action atomic slist.
IREE_TYPED_ATOMIC_SLIST_WRAPPER(iree_hal_hip_ready_action,
iree_hal_hip_atomic_slist_entry_t,
offsetof(iree_hal_hip_atomic_slist_entry_t,
slist_next));
static void iree_hal_hip_ready_action_list_initialize(
iree_hal_hip_entry_list_t* list) {
list->head = NULL;
list->tail = NULL;
iree_slim_mutex_initialize(&list->guard_mutex);
}

// Ready action atomic slist entry struct.
typedef struct iree_hal_hip_atomic_slist_completion_t {
typedef struct iree_hal_hip_completion_list_node_t {
// The callback and user data for that callback. To be called
// when the associated event has completed.
iree_status_t (*callback)(void* user_data);
Expand All @@ -206,45 +255,69 @@ typedef struct iree_hal_hip_atomic_slist_completion_t {
// If this event was created just for the completion thread, and therefore
// needs to be cleaned up.
bool created_event;
iree_atomic_slist_intrusive_ptr_t slist_next;
} iree_hal_hip_atomic_slist_completion_t;

// Ready action atomic slist.
IREE_TYPED_ATOMIC_SLIST_WRAPPER(iree_hal_hip_completion,
iree_hal_hip_atomic_slist_completion_t,
offsetof(iree_hal_hip_atomic_slist_completion_t,
slist_next));
struct iree_hal_hip_completion_list_node_t* next;
} iree_hal_hip_completion_list_node_t;

typedef struct iree_hal_hip_completion_list_t {
iree_slim_mutex_t guard_mutex;
iree_hal_hip_completion_list_node_t* head IREE_GUARDED_BY(guard_mutex);
iree_hal_hip_completion_list_node_t* tail IREE_GUARDED_BY(guard_mutex);
} iree_hal_hip_completion_list_t;

static iree_hal_hip_completion_list_node_t* iree_hal_hip_completion_list_pop(
iree_hal_hip_completion_list_t* list) {
iree_hal_hip_completion_list_node_t* out = NULL;
iree_slim_mutex_lock(&list->guard_mutex);
if (list->head) {
out = list->head;
list->head = list->head->next;
if (out == list->tail) {
list->tail = NULL;
}
}
iree_slim_mutex_unlock(&list->guard_mutex);
return out;
}

static void iree_hal_hip_ready_action_slist_destroy(
iree_hal_hip_ready_action_slist_t* list, iree_allocator_t host_allocator) {
while (true) {
iree_hal_hip_atomic_slist_entry_t* entry =
iree_hal_hip_ready_action_slist_pop(list);
if (!entry) break;
iree_hal_hip_queue_action_list_destroy(entry->ready_list_head);
iree_allocator_free(host_allocator, entry);
void iree_hal_hip_completion_list_push(
iree_hal_hip_completion_list_t* list,
iree_hal_hip_completion_list_node_t* next) {
iree_slim_mutex_lock(&list->guard_mutex);
next->next = NULL;
if (list->tail) {
list->tail->next = next;
list->tail = next;
} else {
list->head = next;
list->tail = next;
}
iree_hal_hip_ready_action_slist_deinitialize(list);
iree_slim_mutex_unlock(&list->guard_mutex);
}

static void iree_hal_hip_completion_list_initialize(
iree_hal_hip_completion_list_t* list) {
list->head = NULL;
list->tail = NULL;
iree_slim_mutex_initialize(&list->guard_mutex);
}

static void iree_hal_hip_completion_slist_destroy(
iree_hal_hip_completion_slist_t* list,
static void iree_hal_hip_completion_list_deinitialize(
iree_hal_hip_completion_list_t* list,
const iree_hal_hip_dynamic_symbols_t* symbols,
iree_allocator_t host_allocator) {
while (true) {
iree_hal_hip_atomic_slist_completion_t* entry =
iree_hal_hip_completion_slist_pop(list);
if (!entry) break;
if (entry->created_event) {
IREE_HIP_IGNORE_ERROR(symbols, hipEventDestroy(entry->event));
iree_hal_hip_completion_list_node_t* head = list->head;
while (head) {
if (head->created_event) {
IREE_HIP_IGNORE_ERROR(symbols, hipEventDestroy(head->event));
}
iree_allocator_free(host_allocator, entry);
list->head = list->head->next;
iree_allocator_free(host_allocator, head);
}
iree_hal_hip_completion_slist_deinitialize(list);
iree_slim_mutex_deinitialize(&list->guard_mutex);
}

static iree_hal_hip_queue_action_t* iree_hal_hip_atomic_slist_entry_pop_front(
iree_hal_hip_atomic_slist_entry_t* list) {
iree_hal_hip_entry_list_node_t* list) {
IREE_ASSERT(list->ready_list_head);

iree_hal_hip_queue_action_t* action = list->ready_list_head;
Expand Down Expand Up @@ -289,8 +362,8 @@ typedef struct iree_hal_hip_working_area_t {
// Notification to the parent thread to indicate the worker committed exiting.
// TODO: maybe remove this. We can just wait on the worker thread to exit.
iree_notification_t exit_notification;
iree_hal_hip_ready_action_slist_t ready_worklist; // atomic
iree_atomic_int32_t worker_state; // atomic
iree_hal_hip_entry_list_t ready_worklist;
iree_atomic_int32_t worker_state; // atomic
// TODO: use status to provide more context for the error.
iree_atomic_intptr_t error_code; // atomic

Expand Down Expand Up @@ -323,8 +396,8 @@ typedef struct iree_hal_hip_completion_area_t {
iree_notification_t state_notification;
// Notification to the parent thread to indicate the worker committed exiting.
iree_notification_t exit_notification;
iree_hal_hip_completion_slist_t completion_list; // atomic
iree_atomic_int32_t worker_state; // atomic
iree_hal_hip_completion_list_t completion_list;
iree_atomic_int32_t worker_state; // atomic

iree_atomic_intptr_t error_code; // atomic

Expand All @@ -348,7 +421,7 @@ static void iree_hal_hip_working_area_initialize(
iree_hal_hip_working_area_t* working_area) {
iree_notification_initialize(&working_area->state_notification);
iree_notification_initialize(&working_area->exit_notification);
iree_hal_hip_ready_action_slist_initialize(&working_area->ready_worklist);
iree_hal_hip_ready_action_list_initialize(&working_area->ready_worklist);
iree_atomic_store_int32(&working_area->worker_state,
IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING,
iree_memory_order_release);
Expand All @@ -365,8 +438,8 @@ static void iree_hal_hip_working_area_initialize(

static void iree_hal_hip_working_area_deinitialize(
iree_hal_hip_working_area_t* working_area) {
iree_hal_hip_ready_action_slist_destroy(&working_area->ready_worklist,
working_area->host_allocator);
iree_hal_hip_ready_action_list_deinitialize(&working_area->ready_worklist,
working_area->host_allocator);
iree_notification_deinitialize(&working_area->exit_notification);
iree_notification_deinitialize(&working_area->state_notification);
iree_slim_mutex_deinitialize(&working_area->pending_work_items_count_mutex);
Expand All @@ -380,7 +453,7 @@ static void iree_hal_hip_completion_area_initialize(
iree_hal_hip_completion_area_t* completion_area) {
iree_notification_initialize(&completion_area->state_notification);
iree_notification_initialize(&completion_area->exit_notification);
iree_hal_hip_completion_slist_initialize(&completion_area->completion_list);
iree_hal_hip_completion_list_initialize(&completion_area->completion_list);
iree_atomic_store_int32(&completion_area->worker_state,
IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING,
iree_memory_order_release);
Expand All @@ -397,9 +470,9 @@ static void iree_hal_hip_completion_area_initialize(

static void iree_hal_hip_completion_area_deinitialize(
iree_hal_hip_completion_area_t* completion_area) {
iree_hal_hip_completion_slist_destroy(&completion_area->completion_list,
completion_area->symbols,
completion_area->host_allocator);
iree_hal_hip_completion_list_deinitialize(&completion_area->completion_list,
completion_area->symbols,
completion_area->host_allocator);
iree_notification_deinitialize(&completion_area->exit_notification);
iree_notification_deinitialize(&completion_area->state_notification);
iree_slim_mutex_deinitialize(
Expand Down Expand Up @@ -525,6 +598,8 @@ iree_hal_hip_pending_queue_actions_cast(iree_hal_resource_t* base_value) {

static bool iree_hal_hip_worker_committed_exiting(
iree_hal_hip_working_area_t* working_area);
static bool iree_hal_hip_completion_committed_exiting(
iree_hal_hip_completion_area_t* working_area);

void iree_hal_hip_pending_queue_actions_destroy(
iree_hal_resource_t* base_actions) {
Expand Down Expand Up @@ -567,7 +642,7 @@ void iree_hal_hip_pending_queue_actions_destroy(
// Wait until the worker acknowledged exiting.
iree_notification_await(
&completion_area->exit_notification,
(iree_condition_fn_t)iree_hal_hip_worker_committed_exiting,
(iree_condition_fn_t)iree_hal_hip_completion_committed_exiting,
completion_area, iree_infinite_timeout());
}

Expand Down Expand Up @@ -997,7 +1072,7 @@ static iree_status_t iree_hal_hip_pending_queue_actions_issue_execution(
iree_slim_mutex_unlock(
&action->owning_actions->working_area.pending_work_items_count_mutex);

iree_hal_hip_atomic_slist_completion_t* entry = NULL;
iree_hal_hip_completion_list_node_t* entry = NULL;
// TODO: avoid host allocator malloc; use some pool for the allocation.
iree_status_t status = iree_allocator_malloc(
action->owning_actions->host_allocator, sizeof(*entry), (void**)&entry);
Expand All @@ -1013,7 +1088,8 @@ static iree_status_t iree_hal_hip_pending_queue_actions_issue_execution(
entry->created_event = created_event;
entry->callback = iree_hal_hip_execution_device_signal_host_callback;
entry->user_data = action;
iree_hal_hip_completion_slist_push(

iree_hal_hip_completion_list_push(
&action->owning_actions->completion_area.completion_list, entry);

iree_slim_mutex_lock(
Expand Down Expand Up @@ -1167,7 +1243,7 @@ iree_status_t iree_hal_hip_pending_queue_actions_issue(
return status;
}

iree_hal_hip_atomic_slist_entry_t* entry = NULL;
iree_hal_hip_entry_list_node_t* entry = NULL;
// TODO: avoid host allocator malloc; use some pool for the allocation.
if (iree_status_is_ok(status)) {
status = iree_allocator_malloc(actions->host_allocator, sizeof(*entry),
Expand All @@ -1185,8 +1261,7 @@ iree_status_t iree_hal_hip_pending_queue_actions_issue(
// Now push the ready list to the worker and have it to issue the actions to
// the GPU.
entry->ready_list_head = ready_list.head;
iree_hal_hip_ready_action_slist_push(&actions->working_area.ready_worklist,
entry);
iree_hal_hip_entry_list_push(&actions->working_area.ready_worklist, entry);

// We can only overwrite the worker state if the previous state is idle
// waiting; we cannot overwrite exit related states. so we need to perform
Expand Down Expand Up @@ -1241,23 +1316,28 @@ static bool iree_hal_hip_worker_committed_exiting(
IREE_HAL_HIP_WORKER_STATE_EXIT_COMMITTED;
}

static bool iree_hal_hip_completion_committed_exiting(
iree_hal_hip_completion_area_t* working_area) {
return iree_atomic_load_int32(&working_area->worker_state,
iree_memory_order_acquire) ==
IREE_HAL_HIP_WORKER_STATE_EXIT_COMMITTED;
}

// Processes all ready actions in the given |worklist|.
static iree_status_t iree_hal_hip_worker_process_ready_list(
iree_allocator_t host_allocator,
iree_hal_hip_ready_action_slist_t* worklist) {
iree_allocator_t host_allocator, iree_hal_hip_entry_list_t* worklist) {
IREE_TRACE_ZONE_BEGIN(z0);

iree_status_t status = iree_ok_status();
while (true) {
iree_hal_hip_atomic_slist_entry_t* entry =
iree_hal_hip_ready_action_slist_pop(worklist);
iree_hal_hip_entry_list_node_t* entry =
iree_hal_hip_entry_list_pop(worklist);
if (!entry) break;

// Process the current batch of ready actions.
while (entry->ready_list_head) {
iree_hal_hip_queue_action_t* action =
iree_hal_hip_atomic_slist_entry_pop_front(entry);

switch (action->state) {
case IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE:
status = iree_hal_hip_pending_queue_actions_issue_execution(action);
Expand All @@ -1276,7 +1356,7 @@ static iree_status_t iree_hal_hip_worker_process_ready_list(
// Let common destruction path take care of destroying the worklist.
// When we know all host stream callbacks are done and not touching
// anything.
iree_hal_hip_ready_action_slist_push(worklist, entry);
iree_hal_hip_entry_list_push(worklist, entry);
break;
}

Expand Down Expand Up @@ -1331,12 +1411,12 @@ static void iree_hal_hip_completion_wait_pending_completion_items(
}

static iree_status_t iree_hal_hip_worker_process_completion(
iree_hal_hip_completion_slist_t* worklist,
iree_hal_hip_completion_list_t* worklist,
iree_hal_hip_completion_area_t* completion_area) {
iree_status_t status = iree_ok_status();
while (true) {
iree_hal_hip_atomic_slist_completion_t* entry =
iree_hal_hip_completion_slist_pop(worklist);
iree_hal_hip_completion_list_node_t* entry =
iree_hal_hip_completion_list_pop(worklist);
if (!entry) break;

IREE_TRACE_ZONE_BEGIN_NAMED(z1, "hipEventSynchronize");
Expand All @@ -1347,7 +1427,7 @@ static iree_status_t iree_hal_hip_worker_process_completion(
// Let common destruction path take care of destroying the worklist.
// When we know all host stream callbacks are done and not touching
// anything.
iree_hal_hip_completion_slist_push(worklist, entry);
iree_hal_hip_completion_list_push(worklist, entry);
status =
iree_make_status(IREE_STATUS_ABORTED, "could not wait on hip event");
break;
Expand All @@ -1373,7 +1453,7 @@ static iree_status_t iree_hal_hip_worker_process_completion(
// The main function for the completion worker thread.
static int iree_hal_hip_completion_execute(
iree_hal_hip_completion_area_t* completion_area) {
iree_hal_hip_completion_slist_t* worklist = &completion_area->completion_list;
iree_hal_hip_completion_list_t* worklist = &completion_area->completion_list;

iree_status_t status = IREE_HIP_RESULT_TO_STATUS(
completion_area->symbols, hipSetDevice(completion_area->device),
Expand Down Expand Up @@ -1459,7 +1539,7 @@ static int iree_hal_hip_completion_execute(
// The main function for the ready-list processing worker thread.
static int iree_hal_hip_worker_execute(
iree_hal_hip_working_area_t* working_area) {
iree_hal_hip_ready_action_slist_t* worklist = &working_area->ready_worklist;
iree_hal_hip_entry_list_t* worklist = &working_area->ready_worklist;

// Hip stores thread-local data based on the device. Some hip commands pull
// the device from there, and it defaults to device 0 (e.g. hipEventCreate),
Expand Down

0 comments on commit c282ea5

Please sign in to comment.