Skip to content

Commit

Permalink
Server Side Pagination for Artifacts and Executions (#117)
Browse files Browse the repository at this point in the history
* Initial changes for performance fix for pagination

* Code for pagination

* adding pagination code

* code for pagination

* pagination code

* ellipsis in pagination

* resolved the automatic mlmd creation issue

* Server side filtering and sorting for both executions and artifacts

* fixing errors found while testing

* small changes to fix some issues found in testing

* Fixed errors found in testing

* Finxing error find in testing

* Adding changes to add execution_uuid in the artifacts page

* adding execution_type_name to ui artifacts and executions

* removed some redundent code

* comments and some final changes

* removing _transform_to_artifacts_dataframe function

---------

Co-authored-by: Abhinav Chobey <chobey@abhinav-cmf-hpe.labs.hpecorp.net>
  • Loading branch information
varkha-d-sharma and Abhinav Chobey authored Oct 27, 2023
1 parent 4876fa9 commit 036495e
Show file tree
Hide file tree
Showing 16 changed files with 875 additions and 453 deletions.
35 changes: 24 additions & 11 deletions cmflib/cmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1293,17 +1293,30 @@ def commit_metrics(self, metrics_name: str):

def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties: t.Optional[t.Dict] = None):
custom_props = {} if custom_properties is None else custom_properties
metrics = create_new_artifact_event_and_attribution(
store=self.store,
execution_id=self.execution.id,
context_id=self.child_context.id,
uri=uri,
name=metrics_name,
type_name="Step_Metrics",
event_type=mlpb.Event.Type.OUTPUT,
custom_properties=custom_props,
milliseconds_since_epoch=int(time.time() * 1000),
)
c_hash = uri.strip()
existing_artifact = []
existing_artifact.extend(self.store.get_artifacts_by_uri(c_hash))
if (existing_artifact
and len(existing_artifact) != 0 ):
metrics = link_execution_to_artifact(
store=self.store,
execution_id=self.execution.id,
uri=c_hash,
input_name=metrics_name,
event_type=mlpb.Event.Type.OUTPUT,
)
else:
metrics = create_new_artifact_event_and_attribution(
store=self.store,
execution_id=self.execution.id,
context_id=self.child_context.id,
uri=uri,
name=metrics_name,
type_name="Step_Metrics",
event_type=mlpb.Event.Type.OUTPUT,
custom_properties=custom_props,
milliseconds_since_epoch=int(time.time() * 1000),
)
if self.graph:
self.driver.create_metrics_node(
metrics_name,
Expand Down
68 changes: 67 additions & 1 deletion cmflib/cmfquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,63 @@ def get_all_exe_in_stage(self, stage_name: str) -> t.List[mlpb.Execution]:
return self.store.get_executions_by_context(stage.id)
return []

def get_all_executions_by_ids_list(self, exe_ids: t.List[int]) -> pd.DataFrame:
"""Return executions for given execution ids list as a pandas data frame.
Args:
exe_ids: List of execution identifiers.
Returns:
Data frame with all executions for the list of given execution identifiers.
"""

df = pd.DataFrame()
executions = self.store.get_executions_by_id(exe_ids)
for exe in executions:
d1 = self._transform_to_dataframe(exe)
df = pd.concat([df, d1], sort=True, ignore_index=True)
return df

def get_all_artifacts_by_context(self, pipeline_name: str) -> pd.DataFrame:
"""Return artifacts for given pipeline name as a pandas data frame.
Args:
pipeline_name: Name of the pipeline.
Returns:
Data frame with all artifacts associated with given pipeline name.
"""
df = pd.DataFrame()
contexts = self.store.get_contexts_by_type("Parent_Context")
context_id = self.get_pipeline_id(pipeline_name)
for ctx in contexts:
if ctx.id == context_id:
child_contexts = self.store.get_children_contexts_by_context(ctx.id)
for cc in child_contexts:
artifacts = self.store.get_artifacts_by_context(cc.id)
for art in artifacts:
d1 = self.get_artifact_df(art)
df = pd.concat([df, d1], sort=True, ignore_index=True)
return df

def get_all_artifacts_by_ids_list(self, artifact_ids: t.List[int]) -> pd.DataFrame:
"""Return all artifacts for the given artifact ids list.
Args:
artifact_ids: List of artifact identifiers
Returns:
Data frame with all artifacts for the given artifact ids list.
"""
df = pd.DataFrame()
artifacts = self.store.get_artifacts_by_id(artifact_ids)
for art in artifacts:
d1 = self.get_artifact_df(art)
df = pd.concat([df, d1], sort=True, ignore_index=True)
return df

def get_all_executions_in_stage(self, stage_name: str) -> pd.DataFrame:
"""Return executions of the given stage as pandas data frame.
Args:
stage_name: Stage name. See doc strings for the prev method.
Returns:
Expand Down Expand Up @@ -471,6 +525,16 @@ def get_all_artifacts_for_execution(self, execution_id: int) -> pd.DataFrame:
)
return df

def get_all_artifact_types(self) -> t.List[str]:
"""Return names of all artifact types.
Returns:
List of all artifact types.
"""
artifact_list = self.store.get_artifact_types()
types=[i.name for i in artifact_list]
return types

def get_all_executions_for_artifact(self, artifact_name: str) -> pd.DataFrame:
"""Return executions that consumed and produced given artifact.
Expand All @@ -491,6 +555,7 @@ def get_all_executions_for_artifact(self, artifact_name: str) -> pd.DataFrame:
"Type": "INPUT" if event.type == mlpb.Event.Type.INPUT else "OUTPUT",
"execution_id": event.execution_id,
"execution_name": self.store.get_executions_by_id([event.execution_id])[0].name,
"execution_type_name":self.store.get_executions_by_id([event.execution_id])[0].properties['Execution_type_name'],
"stage": stage_ctx.name,
"pipeline": self.store.get_parent_contexts_by_context(stage_ctx.id)[0].name,
}
Expand Down Expand Up @@ -598,6 +663,7 @@ def find_producer_execution(self, artifact_name: str) -> t.Optional[mlpb.Executi
executions_ids = set(
event.execution_id
for event in self.store.get_events_by_artifact_ids([artifact.id])

if event.type == mlpb.Event.OUTPUT
)
if not executions_ids:
Expand Down
2 changes: 2 additions & 0 deletions cmflib/commands/metadata/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def run(self):
attr_dict = CmfConfig.read_config(config_file_path)
url = attr_dict.get("cmf-server-ip", "http://127.0.0.1:80")

print("metadata push started")
print("........................................")

if self.args.pipeline_name in query.get_pipeline_names(): # Checks if pipeline name exists
json_payload = query.dumptojson(
Expand Down
3 changes: 3 additions & 0 deletions cmflib/metadata_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ def create_new_execution_in_existing_context(
EXECUTION_CONTEXT_NAME_PROPERTY_NAME = "Context_Type"
EXECUTION_CONTEXT_ID = "Context_ID"
EXECUTION_EXECUTION = "Execution"
EXECUTION_EXECUTION_TYPE_NAME="Execution_type_name"
EXECUTION_REPO = "Git_Repo"
EXECUTION_START_COMMIT = "Git_Start_Commit"
EXECUTION_END_COMMIT = "Git_End_Commit"
Expand Down Expand Up @@ -402,6 +403,7 @@ def create_new_execution_in_existing_run_context(
EXECUTION_CONTEXT_NAME_PROPERTY_NAME: metadata_store_pb2.STRING,
EXECUTION_CONTEXT_ID: metadata_store_pb2.INT,
EXECUTION_EXECUTION: metadata_store_pb2.STRING,
EXECUTION_EXECUTION_TYPE_NAME: metadata_store_pb2.STRING,
EXECUTION_PIPELINE_TYPE: metadata_store_pb2.STRING,
EXECUTION_PIPELINE_ID: metadata_store_pb2.INT,
EXECUTION_REPO: metadata_store_pb2.STRING,
Expand All @@ -415,6 +417,7 @@ def create_new_execution_in_existing_run_context(
# Mistakenly used for grouping in the UX
EXECUTION_CONTEXT_ID: metadata_store_pb2.Value(int_value=context_id),
EXECUTION_EXECUTION: metadata_store_pb2.Value(string_value=execution),
EXECUTION_EXECUTION_TYPE_NAME: metadata_store_pb2.Value(string_value=execution_type_name),
EXECUTION_PIPELINE_TYPE: metadata_store_pb2.Value(string_value=pipeline_type),
EXECUTION_PIPELINE_ID: metadata_store_pb2.Value(int_value=pipeline_id),
EXECUTION_REPO: metadata_store_pb2.Value(string_value=git_repo),
Expand Down
110 changes: 75 additions & 35 deletions server/app/get_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,89 @@
from server.app.query_visualization import query_visualization
from fastapi.responses import FileResponse

def get_executions(mlmdfilepath, pipeline_name):
def get_executions_by_ids(mlmdfilepath, pipeline_name, exe_ids):
query = cmfquery.CmfQuery(mlmdfilepath)
stages = query.get_pipeline_stages(pipeline_name)
df = pd.DataFrame()
for stage in stages:
executions = query.get_all_executions_in_stage(stage)
if str(executions.Pipeline_Type[0]) == pipeline_name:
df = pd.concat([df, executions], sort=True, ignore_index=True)
executions = query.get_all_executions_by_ids_list(exe_ids)
df = pd.concat([df, executions], sort=True, ignore_index=True)
#df=df.drop('name',axis=1)
return df

def get_all_exe_ids(mlmdfilepath):
query = cmfquery.CmfQuery(mlmdfilepath)
df = pd.DataFrame()
execution_ids = {}
names = query.get_pipeline_names()
for name in names:
stages = query.get_pipeline_stages(name)
for stage in stages:
executions = query.get_all_executions_in_stage(stage)
df = pd.concat([df, executions], sort=True, ignore_index=True)
if df.empty:
return
for name in names:
execution_ids[name] = df.loc[df['Pipeline_Type'] == name, ['id', 'Context_Type']]
return execution_ids

def get_all_artifact_ids(mlmdfilepath):
# following is a dictionary of dictionary
# First level dictionary key is pipeline_name
# First level dicitonary value is nested dictionary
# Nested dictionary key is type i.e. Dataset, Model, etc.
# Nested dictionary value is ids i.e. set of integers
artifact_ids = {}
query = cmfquery.CmfQuery(mlmdfilepath)
names = query.get_pipeline_names()
for name in names:
df = pd.DataFrame()
artifacts = query.get_all_artifacts_by_context(name)
df = pd.concat([df, artifacts], sort=True, ignore_index=True)
if df.empty:
return
else:
artifact_ids[name] = {}
for art_type in df['type']:
filtered_values = df.loc[df['type'] == art_type, ['id', 'name']]
artifact_ids[name][art_type] = filtered_values
return artifact_ids

# This function fetches all the artifacts available in given mlmd
def get_artifacts(mlmdfilepath, pipeline_name, data): # get_artifacts return value (artifact_type or artifact_df) is
# determined by a data variable().
def get_artifacts(mlmdfilepath, pipeline_name, art_type, artifact_ids):
query = cmfquery.CmfQuery(mlmdfilepath)
names = query.get_pipeline_names() # getting all pipeline names in mlmd
identifiers = []
for name in names:
if name==pipeline_name:
stages = query.get_pipeline_stages(name)
for stage in stages:
executions = query.get_all_exe_in_stage(stage)
for exe in executions:
identifiers.append(exe.id)
name = []
url = []
df = pd.DataFrame()
for identifier in identifiers:
get_artifacts = query.get_all_artifacts_for_execution(
identifier
) # getting all artifacts
df = pd.concat([df, get_artifacts], sort=True, ignore_index=True)
df['event'] = df.groupby('id')['event'].transform(lambda x: ', '.join(x))
df['name'] = df['name'].str.split(':').str[0]
df=df.drop_duplicates()
if data == "artifact_type":
tempout = list(set(df["type"]))
else:
df = df.loc[df["type"] == data]
result = df.to_json(orient="records")
tempout = json.loads(result)
return tempout
for name in names:
if name == pipeline_name:
df = query.get_all_artifacts_by_ids_list(artifact_ids)
if len(df) == 0:
return
df = df.drop_duplicates()
art_names = df['name'].tolist()
name_dict = {}
name_list = []
exec_type_name_list = []
exe_type_name = pd.DataFrame()
for name in art_names:
executions = query.get_all_executions_for_artifact(name)
exe_type_name = pd.concat([exe_type_name,executions],ignore_index=True)
execution_type_name = exe_type_name["execution_type_name"].drop_duplicates().tolist()
execution_type_name = [str(element).split('"')[1] for element in execution_type_name]
execution_type_name_str = ',\n '.join(map(str, execution_type_name))
name_list.append(name)
exec_type_name_list.append(execution_type_name_str)
name_dict['name'] = name_list
name_dict['execution_type_name'] = exec_type_name_list
name_df = pd.DataFrame(name_dict)
merged_df = df.merge(name_df, on='name', how='left')
merged_df['name'] = merged_df['name'].apply(lambda x: x.split(':')[0] if ':' in x else x)
merged_df = merged_df.loc[merged_df["type"] == art_type]
result = merged_df.to_json(orient="records")
tempout = json.loads(result)
return tempout

def get_artifact_types(mlmdfilepath):
query = cmfquery.CmfQuery(mlmdfilepath)
artifact_types = query.get_all_artifact_types()
return artifact_types

def create_unique_executions(server_store_path, req_info):
mlmd_data = json.loads(req_info["json_payload"])
Expand All @@ -68,7 +108,7 @@ def create_unique_executions(server_store_path, req_info):
executions_client = []
for i in mlmd_data['Pipeline'][0]["stages"]: # checks if given execution_id present in mlmd
for j in i["executions"]:
if j['name'] != "":#If executions have name , they are reusable executions
if j['name'] != "": #If executions have name , they are reusable executions
continue #which needs to be merged in irrespective of whether already
#present or not so that new artifacts associated with it gets in.
if 'Execution_uuid' in j['properties']:
Expand Down
Loading

0 comments on commit 036495e

Please sign in to comment.