Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…tank into mixtral
  • Loading branch information
archana-ramalingam committed Aug 19, 2024
2 parents a8e714a + 4f86051 commit 823d95c
Show file tree
Hide file tree
Showing 36 changed files with 1,163 additions and 162 deletions.
14 changes: 14 additions & 0 deletions libshortfin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_SCAN_FOR_MODULES 0)

option(SHORTFIN_BUILD_PYTHON_BINDINGS "Builds Python Bindings" OFF)
option(SHORTFIN_BUILD_TESTS "Builds C++ tests" ON)

if(SHORTFIN_BUILD_TESTS)
include(FetchContent)
FetchContent_Declare(
googletest
URL https://github.com/google/googletest/archive/03597a01ee50ed33e9dfd640b249b4be3799d395.zip
)
# For Windows: Prevent overriding the parent project's compiler/linker settings
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
FetchContent_MakeAvailable(googletest)
include(GoogleTest)
enable_testing()
endif()

# Includes.
list(APPEND CMAKE_MODULE_PATH
Expand Down
84 changes: 80 additions & 4 deletions libshortfin/bindings/python/lib_ext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "./lib_ext.h"

#include "./utils.h"
#include "shortfin/local/async.h"
#include "shortfin/local/process.h"
#include "shortfin/local/scope.h"
#include "shortfin/local/system.h"
Expand All @@ -25,6 +26,8 @@ class Refs {
py::module_::import_("asyncio").attr("create_task");
py::object asyncio_set_event_loop =
py::module_::import_("asyncio").attr("set_event_loop");
py::object asyncio_get_running_loop =
py::module_::import_("asyncio.events").attr("get_running_loop");
py::object asyncio_set_running_loop =
py::module_::import_("asyncio.events").attr("_set_running_loop");
py::object threading_Thread =
Expand All @@ -46,13 +49,25 @@ class Refs {
py::object lazy_PyWorkerEventLoop_;
};

class PyWorker;
thread_local PyWorker *current_thread_worker = nullptr;

// Custom worker which hosts an asyncio event loop.
class PyWorker : public local::Worker {
public:
PyWorker(PyInterpreterState *interp, std::shared_ptr<Refs> refs,
Options options)
: Worker(std::move(options)), interp_(interp), refs_(std::move(refs)) {}

static PyWorker &GetCurrent() {
PyWorker *local_worker = current_thread_worker;
if (!local_worker) {
throw std::logic_error(
"There is no shortfin worker associated with this thread.");
}
return *local_worker;
}

void WaitForShutdown() override {
// Need to release the GIL if blocking.
py::gil_scoped_release g;
Expand All @@ -64,6 +79,7 @@ class PyWorker : public local::Worker {
if (options().owned_thread) {
PyThreadState_New(interp_);
}
current_thread_worker = this;

py::gil_scoped_acquire g;
// Aside from set_event_loop being old and _set_running_loop being new
Expand All @@ -74,6 +90,8 @@ class PyWorker : public local::Worker {

void OnThreadStop() override {
{
current_thread_worker = nullptr;

// Do Python level thread cleanup.
py::gil_scoped_acquire g;
loop_.reset();
Expand Down Expand Up @@ -359,6 +377,11 @@ void BindLocal(py::module_ &m) {
.def_prop_ro("raw_device", &local::ScopedDevice::raw_device,
py::rv_policy::reference_internal)
.def(py::self == py::self)
.def("__await__",
[](local::ScopedDevice &self) {
py::object future = py::cast(self.OnSync(), py::rv_policy::move);
return future.attr("__await__")();
})
.def("__repr__", &local::ScopedDevice::to_s);

py::class_<DevicesSet>(m, "_ScopeDevicesSet")
Expand Down Expand Up @@ -442,13 +465,66 @@ void BindLocal(py::module_ &m) {
.def("__init__", [](py::args, py::kwargs) {})
.def_static(
"__new__",
[refs](py::handle py_type, std::shared_ptr<local::Scope> scope,
py::args, py::kwargs) {
[refs](py::handle py_type, py::args,
std::shared_ptr<local::Scope> scope, py::kwargs) {
return custom_new<PyProcess>(py_type, std::move(scope), refs);
})
},
py::arg("type"), py::arg("args"), py::arg("scope"), py::arg("kwargs"))
.def_prop_ro("pid", &PyProcess::pid)
.def_prop_ro("scope", &PyProcess::scope)
.def("launch", &PyProcess::Launch)
.def("launch",
[](py::object self_obj) {
PyProcess &self = py::cast<PyProcess &>(self_obj);
self.Launch();
return self_obj;
})
.def("__await__",
[](PyProcess &self) {
py::object future =
py::cast(local::CompletionEvent(self.OnTermination()),
py::rv_policy::move);
return future.attr("__await__")();
})
.def("__repr__", &PyProcess::to_s);

py::class_<local::CompletionEvent>(m, "CompletionEvent")
.def(py::init<>())
.def("__await__", [](py::handle self_obj) {
auto &worker = PyWorker::GetCurrent();
auto &self = py::cast<local::CompletionEvent &>(self_obj);
py::object future = worker.loop_.attr("create_future")();
// Stashing self as an attribute on the future, keeps us alive,
// which transitively means that the wait source we get from self
// stays alive. This can be done better later with a custom
// Future.
future.attr("_sf_event") = self_obj;
py::object iter_ret = future.attr("__iter__")();

// Pass the future object as void* user data to the C callback
// interface. This works because we release the object pointer going
// in and then steal it back to a py::object once the GIL has been
// acquired (since dec_ref'ing it must happen within the GIL).
// Because the self object was stashed on an attribute above, the
// wait_source is valid for the entire sequence.
SHORTFIN_THROW_IF_ERROR(worker.WaitOneLowLevel(
/*wait_source=*/
self, iree_infinite_timeout(),
+[](void *future_vp, iree_loop_t loop,
iree_status_t status) noexcept -> iree_status_t {
py::gil_scoped_acquire g;
py::object future = py::steal(static_cast<PyObject *>(future_vp));
try {
SHORTFIN_THROW_IF_ERROR(status);
future.attr("set_result")(py::none());
} catch (std::exception &e) {
auto RuntimeError = py::handle(PyExc_RuntimeError);
future.attr("set_exception")(RuntimeError(e.what()));
}
return iree_ok_status();
},
static_cast<void *>(future.release().ptr())));
return iter_ret;
});
}

void BindHostSystem(py::module_ &global_m) {
Expand Down
2 changes: 2 additions & 0 deletions libshortfin/bindings/python/shortfin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Process = _sfl.local.Process
Scope = _sfl.local.Scope
ScopedDevice = _sfl.local.ScopedDevice
CompletionEvent = _sfl.local.CompletionEvent
System = _sfl.local.System
SystemBuilder = _sfl.local.SystemBuilder
Worker = _sfl.local.Worker
Expand All @@ -29,6 +30,7 @@
"Node",
"Scope",
"ScopedDevice",
"CompletionEvent",
"System",
"SystemBuilder",
"Worker",
Expand Down
23 changes: 23 additions & 0 deletions libshortfin/build_tools/cmake/shortfin_library.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,26 @@ function(shortfin_components_to_dynamic_libs out_dynamic_libs)
list(TRANSFORM _LIBS APPEND ".dylib.objects")
set(${out_dynamic_libs} "${_LIBS}" PARENT_SCOPE)
endfunction()

function(shortfin_gtest_test)
cmake_parse_arguments(
_RULE
""
"NAME"
"SRCS;DEPS"
${ARGN}
)

if(NOT SHORTFIN_BUILD_TESTS)
return()
endif()

add_executable(${_RULE_NAME} ${_RULE_SRCS})
target_link_libraries(${_RULE_NAME} PRIVATE
${_RULE_DEPS}
shortfin
GTest::gmock
GTest::gtest_main
)
gtest_discover_tests(${_RULE_NAME})
endfunction()
41 changes: 41 additions & 0 deletions libshortfin/examples/python/device_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python
# Copyright 2024 Advanced Micro Devices, Inc
#
# Licensed under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

import array
import asyncio

import shortfin as sf
import shortfin.array as snp


class MyProcess(sf.Process):
async def run(self):
device = self.scope.device(0)
ary1 = snp.device_array(device, [32, 1, 4], snp.int32)
ary1.storage.fill(array.array("i", [0]))
print(f"[pid:{self.pid}] ARY1:", ary1)
await device
print(f"[pid:{self.pid}] Device sync fill0")
ary1.storage.fill(array.array("i", [1]))
await device
print(f"[pid:{self.pid}] Device sync fill1")


async def main():
worker = lsys.create_worker("main")
scope = lsys.create_scope(worker)
print("+++ Launching process")
await asyncio.gather(
MyProcess(scope=scope).launch(),
MyProcess(scope=scope).launch(),
)
print("--- Process terminated")


lsys = sf.host.CPUSystemBuilder().create_system()
# lsys = sf.amdgpu.SystemBuilder().create_system()
lsys.run(main())
32 changes: 21 additions & 11 deletions libshortfin/examples/python/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,44 @@
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

import asyncio
import threading

import shortfin as sf

lsys = sf.host.CPUSystemBuilder().create_system()


class MyProcess(sf.Process):
def __init__(self, scope, arg):
super().__init__(scope)
def __init__(self, arg, **kwargs):
super().__init__(**kwargs)
self.arg = arg

async def run(self):
print("Hello async:", self.arg, self)
print(f"[pid:{self.pid}] Hello async:", self.arg, self)
processes = []
if self.arg < 10:
await asyncio.sleep(0.3)
MyProcess(self.scope, self.arg + 1).launch()
processes.append(MyProcess(self.arg + 1, scope=self.scope).launch())
await asyncio.gather(*processes)
print(f"[pid:{self.pid}] Goodbye async:", self.arg, self)


async def main():
worker = lsys.create_worker("main")
scope = lsys.create_scope(worker)
def create_worker(i):
worker = lsys.create_worker(f"main-{i}")
return lsys.create_scope(worker)

workers = [create_worker(i) for i in range(3)]
processes = []
for i in range(10):
MyProcess(scope, i).launch()
await asyncio.sleep(0.1)
MyProcess(scope, i * 100).launch()
processes.append(MyProcess(i, scope=workers[i % len(workers)]).launch())
processes.append(MyProcess(i * 100, scope=workers[i % len(workers)]).launch())
processes.append(MyProcess(i * 1000, scope=workers[i % len(workers)]).launch())
await asyncio.sleep(0.1)
MyProcess(scope, i * 1000).launch()
await asyncio.sleep(2.5)

print("<<MAIN WAITING>>")
await asyncio.gather(*processes)
print("** MAIN DONE **")
return i


Expand Down
5 changes: 2 additions & 3 deletions libshortfin/src/shortfin/array/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ storage storage::AllocateDevice(ScopedDevice &device,
throw std::invalid_argument("Cannot allocate with a null device affinity");
}
auto allocator = iree_hal_device_allocator(device.raw_device()->hal_device());
iree_hal_buffer_ptr buffer;
iree::hal_buffer_ptr buffer;
iree_hal_buffer_params_t params = {
.usage = IREE_HAL_BUFFER_USAGE_DEFAULT,
.access = IREE_HAL_MEMORY_ACCESS_ALL,
Expand All @@ -47,7 +47,7 @@ storage storage::AllocateHost(ScopedDevice &device,
throw std::invalid_argument("Cannot allocate with a null device affinity");
}
auto allocator = iree_hal_device_allocator(device.raw_device()->hal_device());
iree_hal_buffer_ptr buffer;
iree::hal_buffer_ptr buffer;
iree_hal_buffer_params_t params = {
.usage = IREE_HAL_BUFFER_USAGE_MAPPING,
.access = IREE_HAL_MEMORY_ACCESS_ALL,
Expand All @@ -74,7 +74,6 @@ storage storage::Subspan(iree_device_size_t byte_offset,
void storage::Fill(const void *pattern, iree_host_size_t pattern_length) {
device_.scope().scheduler().AppendCommandBuffer(
device_, TransactionType::TRANSFER, [&](Account &account) {
logging::info("AppendCommandBuffer() CALLBACK");
// Must depend on all of this buffer's use dependencies to avoid
// write-after-read hazard.
account.active_deps_extend(timeline_resource_->use_barrier());
Expand Down
4 changes: 2 additions & 2 deletions libshortfin/src/shortfin/array/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ class SHORTFIN_API storage {
std::string to_s() const;

private:
storage(local::ScopedDevice device, iree_hal_buffer_ptr buffer,
storage(local::ScopedDevice device, iree::hal_buffer_ptr buffer,
local::detail::TimelineResource::Ref timeline_resource)
: buffer_(std::move(buffer)),
device_(device),
timeline_resource_(std::move(timeline_resource)) {}
iree_hal_buffer_ptr buffer_;
iree::hal_buffer_ptr buffer_;
local::ScopedDevice device_;
local::detail::TimelineResource::Ref timeline_resource_;
};
Expand Down
2 changes: 2 additions & 0 deletions libshortfin/src/shortfin/local/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ shortfin_cc_component(
NAME
shortfin_local
HDRS
async.h
device.h
process.h
worker.h
scheduler.h
scope.h
system.h
SRCS
async.cc
device.cc
process.cc
worker.cc
Expand Down
Loading

0 comments on commit 823d95c

Please sign in to comment.