Skip to content

Commit

Permalink
Merge pull request #288 from dmgav/understand-devices
Browse files Browse the repository at this point in the history
Support for in-header annotations of devices and callables
  • Loading branch information
dmgav authored Aug 16, 2023
2 parents 9974c63 + fddbfe1 commit af6015b
Show file tree
Hide file tree
Showing 4 changed files with 628 additions and 104 deletions.
210 changes: 171 additions & 39 deletions bluesky_queueserver/manager/profile_ops.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ast
import collections
import copy
import enum
import glob
Expand Down Expand Up @@ -918,24 +919,42 @@ def prepare_plan(plan, *, plans_in_nspace, devices_in_nspace, allowed_plans, all
if not success:
raise RuntimeError(f"Validation of plan parameters failed: {errmsg}")

def ref_from_name(v, objects_in_nspace, sel_object_names, selected_object_tree, nspace):
def ref_from_name(v, objects_in_nspace, sel_object_names, selected_object_tree, nspace, eval_expressions):
if isinstance(v, str):
if (v in sel_object_names) or _is_object_name_in_list(v, allowed_objects=selected_object_tree):
v = _get_nspace_object(v, objects_in_nspace=objects_in_nspace, nspace=nspace)
elif eval_expressions and re.search(r"^[_A-Za-z][_A-Za-z0-9]*(\.[_A-Za-z][_A-Za-z0-9]*)*$", v):
try:
v = eval(v, nspace, nspace)
except Exception:
pass
return v

def process_argument(v, objects_in_nspace, sel_object_names, selected_object_tree, nspace):
def process_argument(v, objects_in_nspace, sel_object_names, selected_object_tree, nspace, eval_expressions):
# Recursively process lists (iterables) and dictionaries
if isinstance(v, str):
v = ref_from_name(v, objects_in_nspace, sel_object_names, selected_object_tree, nspace)
v = ref_from_name(
v, objects_in_nspace, sel_object_names, selected_object_tree, nspace, eval_expressions
)
elif isinstance(v, dict):
for key, value in v.copy().items():
v[key] = process_argument(value, objects_in_nspace, sel_object_names, selected_object_tree, nspace)
v[key] = process_argument(
value, objects_in_nspace, sel_object_names, selected_object_tree, nspace, eval_expressions
)
elif isinstance(v, Iterable):
v_original = v
v = list()
for item in v_original:
v.append(process_argument(item, objects_in_nspace, sel_object_names, selected_object_tree, nspace))
v.append(
process_argument(
item,
objects_in_nspace,
sel_object_names,
selected_object_tree,
nspace,
eval_expressions,
)
)
return v

def process_parameter_value(value, pp, objects_in_nspace, group_plans, group_devices, nspace):
Expand All @@ -947,6 +966,7 @@ def process_parameter_value(value, pp, objects_in_nspace, group_plans, group_dev
# Include/exclude all devices/plans if required
convert_all_plan_names = pp.get("convert_plan_names", None) # None - not defined
convert_all_device_names = pp.get("convert_device_names", None)
eval_all_expressions = pp.get("eval_expressions", False)

# A set of the selected device names (device names are listed explicitly, not as a tree)
sel_object_names, pname_list, dname_list = set(), set(), set()
Expand Down Expand Up @@ -980,8 +1000,10 @@ def process_parameter_value(value, pp, objects_in_nspace, group_plans, group_dev

sel_object_names = pname_list.union(dname_list)

if sel_object_names or selected_object_tree:
value = process_argument(value, objects_in_nspace, sel_object_names, selected_object_tree, nspace)
if sel_object_names or selected_object_tree or eval_all_expressions:
value = process_argument(
value, objects_in_nspace, sel_object_names, selected_object_tree, nspace, eval_all_expressions
)

return value

Expand Down Expand Up @@ -1014,7 +1036,7 @@ def process_parameter_value(value, pp, objects_in_nspace, group_plans, group_dev
items_in_nspace = devices_in_nspace.copy()
items_in_nspace.update(plans_in_nspace)

plan_func = process_argument(plan_name, plans_in_nspace, set(), group_plans, nspace)
plan_func = process_argument(plan_name, plans_in_nspace, set(), group_plans, nspace, False)
if isinstance(plan_func, str):
success = False
err_msg = f"Plan '{plan_name}' is not allowed or does not exist"
Expand Down Expand Up @@ -2017,11 +2039,13 @@ def process_argument(v, v_min, v_max):
def _find_and_replace_built_in_types(annotation_type_str, *, plans=None, devices=None, enums=None):
"""
Find and replace built-in types in parameter annotation type string. Built-in types are
``__PLAN__``, ``__DEVICE__``, ``__PLAN_OR_DEVICE__``. Since plan and device names are passed
as strings and then converted to respective object, the built-in plan names are replaced by ``str``
type, which is then used in parameter validation. The function also determines whether plans, devices
or both plans and devices need to be converted. If the name of the built-in type is redefined
in ``plans``, ``devices`` or ``enums`` section, then the keyword is ignored.
``__PLAN__``, ``__DEVICE__``, ``__PLAN_OR_DEVICE__``, ``__READABLE__``, ``__MOVABLE__``,
``__FLYABLE__``, ``__CALLABLE__``.
Since plan and device names are passed as strings and then converted to respective object,
the built-in plan names are replaced by ``str`` type, which is then used in parameter validation.
The function also determines whether plans, devices or both plans and devices need to be converted.
If the name of the built-in type is redefined in ``plans``, ``devices`` or ``enums`` section,
then the keyword is ignored.
Parameters
----------
Expand All @@ -2038,10 +2062,9 @@ def _find_and_replace_built_in_types(annotation_type_str, *, plans=None, devices
-------
annotation_type_str: str
Modified parameter type
convert_plan_names: boolean
Indicates whether the plan names need to be converted based on the parameter type.
convert_device_names: boolean
Indicates whether the devices names need to be converted based on the parameter type.
convert_values: dict
Dictionary that contains information on how the parameter values should be processed.
Keys: ``convert_plan_names``, ``convert_device_names``, ``eval_expressions``.
"""
# Built-in types that may be contained in 'annotation_type_str' and
# should be converted to another known type
Expand All @@ -2050,34 +2073,45 @@ def _find_and_replace_built_in_types(annotation_type_str, *, plans=None, devices
enums = enums or {}

built_in_types = {
"__PLAN__": ("str", True, False),
"__DEVICE__": ("str", False, True),
"__PLAN_OR_DEVICE__": ("str", True, True),
"__PLAN__": ("str", True, False, False),
"__DEVICE__": ("str", False, True, False),
"__PLAN_OR_DEVICE__": ("str", True, True, False),
"__READABLE__": ("str", False, True, False),
"__MOVABLE__": ("str", False, True, False),
"__FLYABLE__": ("str", False, True, False),
"__CALLABLE__": ("str", False, False, True),
}
convert_plan_names = False
convert_device_names = False
eval_expressions = False

for btype, (btype_replace, pl, dev) in built_in_types.items():
for btype, (btype_replace, pl, dev, ev_expr) in built_in_types.items():
# If 'plans', 'devices' or 'enums' contains the type name, then leave it as is
if (btype in plans) or (btype in devices) or (btype in enums):
continue
if re.search(btype, annotation_type_str):
annotation_type_str = re.sub(btype, btype_replace, annotation_type_str)
convert_plan_names = convert_plan_names or pl
convert_device_names = convert_device_names or dev
eval_expressions = eval_expressions or ev_expr

return annotation_type_str, convert_plan_names, convert_device_names
convert_values = dict(
convert_plan_names=convert_plan_names,
convert_device_names=convert_device_names,
eval_expressions=eval_expressions,
)
return annotation_type_str, convert_values


def _process_annotation(encoded_annotation, *, ns=None):
"""
Processed annotation is encoded the same way as in the descriptions of existing plans.
Returns reference to the annotation (type object) and the list of temporary types
that needs to be deleted once the processing (parameter validation) is complete.
The built-in type names (__DEVICE__, __PLAN__, __PLAN_OR_DEVICE__) are replaced with ``str``
unless types with the same name are defined in ``plans``, ``devices`` or
``enums`` sections of the annotation. Explicitly defined the types with the same names
as built-in types are treated as regular types.
The built-in type names (__DEVICE__, __PLAN__, __PLAN_OR_DEVICE__, ``__READABLE__``,
``__MOVABLE__``, ``__FLYABLE__``, ``__CALLABLE__``) are replaced with ``str`` unless types with
the same name are defined in ``plans``, ``devices`` or ``enums`` sections of the annotation.
Explicitly defined the types with the same names as built-in types are treated as regular types.
The function validates annotation and raises exceptions in the following cases:
the types defined in 'plans', 'devices' or 'enums' sections are not lists or tuples;
Expand All @@ -2102,12 +2136,9 @@ def _process_annotation(encoded_annotation, *, ns=None):
-------
annotation_type: type
Type reconstructed from annotation.
convert_plan_names: bool
Indicates if __PLAN__ or __PLAN_OR_DEVICE__ built-in type was detected and matching
strings should be converted to plan objects.
convert_device_names: bool
Indicates if __DEVICE__ or __PLAN_OR_DEVICE__ built-in type was detected and matching
strings should be converted to devuce objects.
convert_values: dict
Dictionary that contains information on how the parameter values should be processed.
Keys: ``convert_plan_names``, ``convert_device_names``, ``eval_expressions``.
ns: dict
Namespace dictionary with created types.
"""
Expand Down Expand Up @@ -2146,7 +2177,7 @@ def _process_annotation(encoded_annotation, *, ns=None):
items.update(plans)
items.update(enums)

annotation_type_str, convert_plan_names, convert_device_names = _find_and_replace_built_in_types(
annotation_type_str, convert_values = _find_and_replace_built_in_types(
annotation_type_str, plans=plans, devices=devices, enums=enums
)

Expand Down Expand Up @@ -2189,7 +2220,7 @@ def _process_annotation(encoded_annotation, *, ns=None):
except Exception as ex:
raise TypeError(f"Failed to process annotation '{annotation_type_str}': {ex}'")

return annotation_type, convert_plan_names, convert_device_names, ns
return annotation_type, convert_values, ns


def _process_default_value(encoded_default_value):
Expand Down Expand Up @@ -2244,7 +2275,7 @@ def _decode_parameter_types_and_defaults(param_list):
raise KeyError(f"No 'name' key in the parameter description {p}")

if "annotation" in p:
p_type, _, _, _ = _process_annotation(p["annotation"])
p_type, _, _ = _process_annotation(p["annotation"])
else:
p_type = typing.Any

Expand Down Expand Up @@ -2806,11 +2837,79 @@ def _process_plan(plan, *, existing_devices, existing_plans):
Error occurred while creating plan description.
"""

def _get_full_type_name(patterns, s):
"""
Returns the full name of the type with parameters. The function tries to find the name
of the type in the string ``s`` using the list of regular expressions ``patterns``.
For example, if the list of patterns is ``["typing.Callable", "collections.abc.Callable"]``
the function will find both isolated ``typing.Callable`` and ``collections.abc.Callable``
and the types with parameters such as ``typing.Callable[[int, str], float]``, whichever
comes first in the string ``s``.
Parameters
----------
patterns : list
List of regular expressions to find the type name in the string ``s``.
s : str
String to search for the type name.
Returns
-------
str or None
Full name of the type with parameters or ``None`` if the type name was not found.
"""
if not s:
return None

# Try all patterns. Use the first pattern that matches.
matches = [re.search(p, s) for p in patterns]
match = None
for m in matches:
if m:
if not match or m.start() < match.start():
match = m

# No patterns were found.
if not match:
return None

name = match[0]
start_ind, end_name_ind = match.span()

# The type has no parameters
if end_name_ind == len(s) or s[end_name_ind] != "[":
return name

# Find full name with parameters (find the closing bracket)
end_ind, n_brackets = -1, 0
for n in range(end_name_ind, len(s)):
if s[n] == "[":
n_brackets += 1
elif s[n] == "]":
n_brackets -= 1
if n_brackets == 0:
end_ind = n + 1
break

if end_ind < 0:
return name

return s[start_ind:end_ind]

def convert_annotation_to_string(annotation):
"""
Ignore the type if it can not be properly reconstructed using 'eval'
"""
ns = {"typing": typing, "NoneType": type(None)}
from bluesky.protocols import Flyable, Movable, Readable

# Patterns for callables
callables_patterns = (r"typing.Callable", r"collections.abc.Callable")
n_callables = 0 # Number of detected callables

protocols_mapping = {"__READABLE__": Readable, "__MOVABLE__": Movable, "__FLYABLE__": Flyable}
protocols_inv = {v: k for k, v in protocols_mapping.items()}

ns = {"typing": typing, "collections": collections, "NoneType": type(None), **protocols_mapping}

# This will work for generic types like 'typing.List[int]'
a_str = f"{annotation!r}"
Expand All @@ -2821,12 +2920,42 @@ def convert_annotation_to_string(annotation):
# Note, that starting with Python 3.10 parameter annotation always have
# '__name__', which is meaningless for types other than base types.
a_str = annotation.__name__
mapping = {k.__name__: v for k, v in protocols_inv.items()}
if a_str in mapping:
a_str = mapping[a_str]
else:
# Replace each expression with a unique string in the form of '__CALLABLE<n>__'
while True:
pattern = _get_full_type_name(callables_patterns, a_str)
if not pattern:
break
try:
p_type = eval(pattern, ns, ns)
except Exception:
p_type = None
p_str = f"__CALLABLE{n_callables + 1}__"
a_str = re.sub(re.escape(pattern), p_str, a_str)
if p_type:
# Evaluation of the annotation will fail later and the parameter
# will have no type annotation
ns[p_str] = p_type
n_callables += 1

mapping = {f"bluesky.protocols.{k.__name__}": v for k, v in protocols_inv.items()}
for pattern, replacement in mapping.items():
a_str = re.sub(pattern, replacement, a_str)

# Verify if the type could be recreated by evaluating the string during validation.
try:
an = eval(a_str, ns, ns)
if an != annotation:
raise Exception()

# Replace all callables ('__CALLABLE1__', '__CALLABLE2__', etc.) with '__CALLABLE__' string
for n in range(n_callables):
p_str = f"__CALLABLE{n + 1}__"
a_str = re.sub(p_str, "__CALLABLE__", a_str)

except Exception:
# Ignore the type if it can not be recreated.
a_str = None
Expand Down Expand Up @@ -2985,14 +3114,17 @@ def assemble_custom_annotation(parameter, *, existing_plans, existing_devices):

if annotation:
# Verify that the encoded type could be decoded (raises an exception if fails)
_, convert_plan_names, convert_device_names, _ = _process_annotation(annotation)
_, convert_values, _ = _process_annotation(annotation)
# _, convert_plan_names, convert_device_names, _ = _process_annotation(annotation)
working_dict["annotation"] = annotation

# Set the following parameters True only if they do not already exist (ignore if False)
if convert_plan_names and ("convert_plan_names" not in working_dict):
if convert_values["convert_plan_names"] and ("convert_plan_names" not in working_dict):
working_dict["convert_plan_names"] = True
if convert_device_names and ("convert_device_names" not in working_dict):
if convert_values["convert_device_names"] and ("convert_device_names" not in working_dict):
working_dict["convert_device_names"] = True
if convert_values["eval_expressions"] and ("eval_expressions" not in working_dict):
working_dict["eval_expressions"] = True

if default:
# Verify that the encoded representation of the default can be decoded.
Expand Down
Loading

0 comments on commit af6015b

Please sign in to comment.