Source code for seamless.highlevel.Transformer

# Author: Sjoerd de Vries
# Copyright (c) 2016-2022 INSERM, 2022-2023 CNRS

# The MIT License (MIT)

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Transformer class for transforming input values to a result value,
and its helper functions."""

# pylint: disable=too-many-lines

from __future__ import annotations
import asyncio
import sys
from typing import *

import weakref
import functools
import json
from copy import deepcopy

from silk.mixed.get_form import get_form
from .Base import Base
from .Cell import Cell, FolderCell
from .Module import Module
from .Resource import Resource
from .SelfWrapper import SelfWrapper
from .proxy import Proxy, CodeProxy, HeaderProxy
from .pin import PinsWrapper
from ..mime import language_to_mime
from ..core.context import Context as CoreContext
from . import parse_function_code
from .SchemaWrapper import SchemaWrapper
from .compiled import CompiledObjectDict
from .Environment import Environment
from .HelpMixin import HelpMixin

default_pin = {
    "celltype": "default",
}


def new_transformer(
    # pylint: disable=dangerous-default-value
    ctx,
    path,
    pins,
    hash_pattern={"*": "#"},
) -> dict[str, Any]:
    """Return a workflow graph node for a new transformer cell"""
    if pins is None:
        pins = []
    if isinstance(pins, (list, tuple)):
        pins = {pin: default_pin.copy() for pin in pins}
    else:
        pins = deepcopy(pins)
    for pin in pins:
        if pin == "inp":
            print(
                # pylint: disable=line-too-long
                "WARNING: pin 'inp' for a transformer is NOT recommended (shadows the .inp attribute)"
            )
    transformer = {
        "path": path,
        "type": "transformer",
        "compiled": False,
        "language": "python",
        "pins": pins,
        "hash_pattern": hash_pattern,
        "RESULT": "result",
        "INPUT": "inp",
        # the result schema can be exposed as an input pin to the transformer under this name
        "SCHEMA": None,
        "UNTRANSLATED": True,
    }
    ### json.dumps(transformer)
    if ctx is not None:
        assert path is not None
        ctx._graph[0][path] = transformer
    return transformer


[docs] class Transformer(Base, HelpMixin): """Transforms input values to a result value See http://sjdv1982.github.io/seamless/sphinx/html/transformer.html for documentation""" _node = None def __init__( self, code=None, *, pins=None, hash_pattern={"*": "#"} ): # pylint: disable=dangerous-default-value from ..metalevel.debugmode import DebugMode super().__init__(parent=None, path=None) self._node = new_transformer(ctx=None, path=None, pins=pins, hash_pattern=hash_pattern) if code is not None: self.code = code # modifies self._node self._debug = DebugMode(self) self._environment = Environment(self) def _get_parent2(self): parent = self._parent() if parent is None: raise AttributeError("Transformer must have been bound to a workflow context.") return parent def _init( self, parent, path, set_node ): assert parent is not None and path is not None assert self._parent() is None node = self._node temp_connections = {} dependencies = {} if set_node: node2 = deepcopy(node) node2["path"] = path parent._graph[0][path] = node2 temp = node.get("TEMP", {}).copy().get("input_auth", {}) has_temp = len(temp) for pinname, value in temp.items(): if isinstance(value, Base): assert isinstance(value, (Cell, Module, DeepCellBase)) temp_connections[pinname] = value continue for pinname, value in temp_connections.items(): temp.pop(pinname) vparent = value._parent() if vparent is None: try: value2 = value.value except Exception: msg = "WARNING: Cannot bind pin '{}'" print(msg.format(pinname), file=sys.stderr) temp[pinname] = value2 elif vparent is not parent: msg = "Pin '{}' is bound to a different context" raise RuntimeError(msg.format(pinname)) else: dependencies[pinname] = value if has_temp: if len(temp): node2["TEMP"]["input_auth"] = temp else: node2["TEMP"].pop("input_auth") if not len(node2["TEMP"]): node2.pop("TEMP") else: hash_pattern = node["hash_pattern"].copy() pins = node["pins"] default_node = new_transformer(ctx=None, path=None, pins=pins, hash_pattern=hash_pattern) is_default_node = (node == default_node) assert is_default_node, path parent_node = parent._get_node(path) assert parent_node is not None if "environment" in parent_node: self._environment._load(parent_node["environment"]) Base._init2(self, parent, path) parent._set_child(path, self) self._get_htf() self._node = None for pinname, value in dependencies.items(): setattr(self, pinname, value) parent._translate()
[docs] @classmethod def from_canonical_interface(cls, tool, command=None): """TODO: document""" from ..cmd.canonical import build_transformer_dict tf_dict, result_celltype, buffers, bashcode = build_transformer_dict(tool, command=command) self = cls() self.language = "bash" for pin, celltype in tf_dict.items(): setattr(self, pin, None) getattr(self.pins, pin).celltype = celltype self.result.celltype = result_celltype for pin, buffer in buffers.items(): setattr(self, pin, None) getattr(self.pins, pin).celltype = "bytes" setattr(self, pin, buffer) self.code = bashcode return self
def _set_temp(self, attr, value): htf = self._get_htf() htf["UNTRANSLATED"] = True if "TEMP" not in htf or htf["TEMP"] is None: htf["TEMP"] = {} htf["TEMP"][attr] = value if "checksum" in htf: htf["checksum"].pop(attr, None) def _set_temp_checksum(self, attr, checksum): from . import Checksum checksum = Checksum(checksum).hex() htf = self._get_htf() htf["UNTRANSLATED"] = True if "checksum" not in htf or htf["checksum"] is None: htf["checksum"] = {} htf["checksum"][attr] = checksum if "TEMP" in htf: htf["checksum"].pop(attr, None) @property def environment(self) -> Optional[Environment]: """Computing environment to execute transformations in""" return self._environment @property def debug(self) -> bool: """If debug mode is enabled.""" return self._debug @property def meta(self) -> Optional[dict[str, Any]]: """Dictionary of meta-parameters. These don't affect the computation result, but may affect job managers Example of meta-parameters: expected computation time, service name You can set this dictionary directly, or you may assign .meta to a cell """ return deepcopy(self._get_htf().get("meta")) @meta.setter def meta(self, value: dict[str, Any] | Cell): from .assign import assign_connection htf = self._get_htf() target_path = self._path + ("meta",) realtime_meta = False if isinstance(value, Cell): parent = self._get_parent2() assert value._parent() is parent assign_connection(parent, value._path, target_path, False) htf.pop("meta", None) elif isinstance(value, Proxy): raise TypeError(".meta can only be assigned to a dict or to a whole Cell") else: if not isinstance(value, dict): raise TypeError(value) json.dumps(value) parent = self._get_parent() if parent is not None: parent.remove_connections(target_path, endpoint="target") meta = htf.get("meta", {}).copy() meta.update(value) for k in list(meta.keys()): if meta[k] is None: meta.pop(k) htf["meta"] = meta try: self.cancel() except Exception: pass try: tf = self._get_tf(force=True) htf = self._get_htf() if htf["compiled"]: tf2 = tf.tf.executor else: tf2 = tf.tf tf2.meta = deepcopy(value) realtime_meta = True except Exception: pass if not realtime_meta: self._get_htf()["UNTRANSLATED"] = True parent._translate() @property def local(self) -> bool | None: """Local execution. If True, transformations are executed in the local Seamless instance. If False, they are delegated to an assistant. If None (default), an assistant is tried first and local execution is a fallback.""" return self.meta.get("local") @local.setter def local(self, value:bool): self.meta["local"] = value @property def RESULT(self) -> str: """The name of the result variable. Default is "result". This is also the attribute under which the result object is available (i.e. Transformer.result by default). The result object is similar to a (structured) Cell. NOTE: changing this attribute is currently not implemented """ htf = self._get_htf() return htf["RESULT"] @RESULT.setter def RESULT(self, value): raise NotImplementedError htf = self._get_htf() result_path = self._path + (htf["RESULT"],) new_result_path = self._path + (value,) parent = self._parent() htf["RESULT"] = value @property def INPUT(self) -> str: """The name of the input attribute. Default is "inp". This is the attribute under which the input object is available (i.e. Transformer.inp by default). The input object is similar to a (structured) Cell. NOTE: changing this attribute is currently not implemented """ htf = self._get_htf() return htf["INPUT"] @INPUT.setter def INPUT(self, value): raise NotImplementedError @property def fingertip_no_remote(self) -> bool: """If True, remote calls are disabled for fingertipping. Remote calls can be for a database or a buffer server. """ htf = self._get_htf() return htf.get("fingertip_no_remote", False) @fingertip_no_remote.setter def fingertip_no_remote(self, value: bool): if value not in (True, False): raise TypeError(value) htf = self._get_htf() if value: htf["fingertip_no_remote"] = True else: htf.pop("fingertip_no_remote", None) @property def fingertip_no_recompute(self) -> bool: """If True, recomputation is disabled for fingertipping. This means recomputation via transformation, which can be intensive. Recomputation via conversion or subcell expression (which are quick) is always enabled. """ htf = self._get_htf() return htf.get("fingertip_no_recompute", False) @fingertip_no_recompute.setter def fingertip_no_recompute(self, value: bool): if value not in (True, False): raise TypeError(value) htf = self._get_htf() if value: htf["fingertip_no_recompute"] = True else: htf.pop("fingertip_no_recompute", None) @property def scratch(self) -> bool: """Is this transformer's result attribute a scratch cell. Scratch cells are fully dependent cells that are big and/or easy to recompute. Scratch cell buffers are: - Not added to saved zip archives and vaults. - TODO: Annotated as "scratch" in databases - TODO: cleared automatically from databases a short while after computation """ htf = self._get_htf() return "scratch" in htf @scratch.setter def scratch(self, value): if value not in (True, False): raise TypeError(value) htf = self._get_htf() if value: htf["scratch"] = True else: htf.pop("scratch", None) htf["UNTRANSLATED"] = True if self._parent() is not None: self._parent()._translate()
[docs] def clear_exception(self) -> None: """Clear any exception associated with this transformer. Re-execute of the associated transformation. Both local and remote (via assistant) execution are affected. If this transformer has no transformation (missing or pending inputs), this will set a flag, causing clear_exception to take effect as soon as a transformation is present. Re-translation will clear this flag. """ tf = self._get_tf(force=True) htf = self._get_htf() if htf["compiled"]: tf.tf.executor.clear_exception() else: tf.tf.clear_exception()
@property def language(self) -> str: """Defines the programming language of the transformer's source code. Allowed values are: python, ipython, bash, or any compiled language. See seamless.compiler.languages and seamless.compile.compilers for a list """ return self._get_htf()["language"] @language.setter def language(self, value): if value == "docker": import warnings warnings.warn( # pylint: disable=line-too-long 'Transformer.language="docker" is deprecated. Use language="bash" and set docker_image.', FutureWarning, ) value = "bash" htf = self._get_htf() parent = self._parent() if parent is None: if value not in ("python", "ipython", "bash"): raise ValueError("Unbound transformers can only have 'python', 'ipython' or 'bash' as their language") htf["language"] = value return lang, language, extension = parent.environment.find_language(value) compiled = language["mode"] == "compiled" old_language = htf.get("language") htf["language"] = lang old_compiled = htf.get("compiled", False) if old_compiled != compiled: htf["UNTRANSLATED"] = True elif (old_language == "bash") != (lang == "bash"): htf["UNTRANSLATED"] = True htf["compiled"] = compiled htf["file_extension"] = extension has_translated = False if not has_translated: self._parent()._translate() @property def docker_image(self) -> str: """Defines the Docker image in which a transformer should run Getting this property is syntactic sugar for: `Transformer.environment.get_docker()["name"]` Setting this property is more-or-less syntactic sugar for: `Transformer.environment.set_docker({"name": ...})` """ im = self.environment.get_docker() if im is None: return None return im["name"] @docker_image.setter def docker_image(self, docker): if docker is None: self.environment.set_docker(None) return im = self.environment.get_docker() if im is None: im = {} im["name"] = docker self.environment.set_docker(im) @property def header(self) -> Optional[str]: """For a compiled transformer, the generated C header""" htf = self._get_htf() if not htf.get("compiled"): raise AttributeError("Only compiled transformers have a header") dirs = ["value", "mount", "mimetype", "celltype"] return HeaderProxy( self, ("header",), "w", getter=self._header_getter, dirs=dirs ) def _header_getter(self, attr): htf = self._get_htf() assert htf["compiled"] if attr == "celltype": return "code" elif attr == "value": tf = self._get_tf(force=True) return tf.header.value elif attr == "mount": return self._sub_mount_header elif attr == "mimetype": language = "c" mimetype = language_to_mime(language) return mimetype else: raise AttributeError(attr) @property def schema(self): """The schema of the transformer input object See Cell.schema for more details""" htf = self._get_htf() inp = htf["INPUT"] # TODO: self.self return getattr(self, inp).schema @property def example(self): """The example handle of the transformer input object. See Cell.example for more details """ self._get_parent2() tf = self._get_tf(force=True) htf = self._get_htf() inputcell = getattr(tf, htf["INPUT"]) inp_ctx = inputcell._data._context() example = inp_ctx.example.handle return example @example.setter def example(self, value): return self.example.set(value) def _result_example(self): htf = self._get_htf() tf = self._get_tf(force=True) resultcell = getattr(tf, htf["RESULT"]) result_ctx = resultcell._data._context() example = result_ctx.example.handle return example
[docs] def add_validator(self, validator, name): """Adds a validator to the input, analogous to Cell.add_validator""" htf = self._get_htf() inp = htf["INPUT"] # TODO: self.self return getattr(self, inp).add_validator(validator, name=name)
def add_special_pin(self, pinname, celltype): from .Cell import celltypes if not pinname.startswith("SPECIAL__"): raise ValueError("Special pinname must start with SPECIAL__") if celltype not in celltypes and celltype not in ("deepcell", "folder"): raise TypeError(celltype) htf = self._get_htf() if pinname in htf["pins"]: raise TypeError("Pin already exists") pin = default_pin.copy() pin["celltype"] = celltype htf["pins"][pinname] = pin htf["UNTRANSLATED"] = True parent = self._parent() if parent is not None: parent._translate() def _assign_to(self, hctx, path): from .assign import assign_connection htf = self._get_htf() parent = self._get_parent2() result_path = self._path + (htf["RESULT"],) assign_connection(parent, result_path, path, True) hctx._translate() def __setattr__(self, attr, value): if attr.startswith("_") or hasattr(type(self), attr): return object.__setattr__(self, attr, value) else: return self._setattr(attr, value, from_setitem=False) def __setitem__(self, item, value): if not isinstance(item, str): raise TypeError("item must be 'str', not '{}'".format(type(item))) return self._setattr(item, value, from_setitem=True) def _setattr(self, attr, value, *, from_setitem): from .assign import assign_connection translate = False parent = self._parent() htf = self._get_htf() new_pin = False if isinstance(value, Resource): assert attr == "code" self._sub_mount(attr, value.filename, "r", "file", True) if attr == "main_module" and htf["compiled"] and attr not in htf["pins"]: raise TypeError( "Cannot assign directly all module objects; assign individual elements" ) assign_to_temp = False if self._parent() is None and attr != htf["RESULT"]: assign_to_temp = True elif ( not self._has_tf() and not isinstance(value, (Cell, Module, DeepCellBase)) and attr != htf["RESULT"] ): assign_to_temp = True if assign_to_temp: if attr.startswith("SPECIAL__"): if attr not in htf["pins"]: raise AttributeError("Cannot define new special pins before translation") if isinstance(value, Resource): value = value.data if "TEMP" not in htf or htf["TEMP"] is None: htf["TEMP"] = {} if attr == "code": code = value if callable(value): code, _, _ = parse_function_code(value) htf["TEMP"]["code"] = code else: if not isinstance(value, (Cell, Module, DeepCellBase)): get_form(value) if "input_auth" not in htf["TEMP"]: htf["TEMP"]["input_auth"] = {} htf["TEMP"]["input_auth"][attr] = value if attr not in htf["pins"]: htf["pins"][attr] = default_pin.copy() htf["UNTRANSLATED"] = True parent = self._parent() if parent is not None: parent.remove_connections(self._path + (attr,), endpoint="target") parent._translate() return if attr == "code": if isinstance(value, Resource): value = value.data if isinstance(value, Cell): if parent is not None: target_path = self._path + (attr,) assert value._parent() is parent assign_connection(parent, value._path, target_path, False) translate = True else: self._set_temp(attr, value) elif isinstance(value, Proxy): raise AttributeError("".join(value._path)) else: tf = self._get_tf(force=True) if callable(value): value, _, _ = parse_function_code(value) removed = False if parent is not None: check_libinstance_subcontext_binding(parent, self._path) removed = parent.remove_connections( self._path + (attr,), endpoint="target" ) if removed or (parent is None): htf = self._get_htf() htf["UNTRANSLATED"] = True if "TEMP" not in htf or htf["TEMP"] is None: htf["TEMP"] = {} htf["TEMP"]["code"] = value if "checksum" in htf: htf["checksum"].pop("code", None) translate = True else: tf.code.set(value) elif attr == htf["INPUT"]: target_path = self._path if isinstance(value, (Cell, DeepCellBase)): assert value._parent() is parent exempt = self._exempt() assign_connection( parent, value._path, target_path, False, exempt=exempt ) translate = True else: tf = self._get_tf(force=True) inp = getattr(tf, htf["INPUT"]) removed = parent.remove_connections( self._path + (attr,), endpoint="target" ) if removed: translate = True inp.handle_no_inference.set(value) elif attr == htf["RESULT"]: tf = self._get_tf(force=True) result = getattr(tf, htf["RESULT"]) # Example-based programming to set the schema # TODO: suppress inchannel warning result.handle_no_inference.set(value) else: if attr.startswith("SPECIAL__"): if attr not in htf["pins"]: raise AttributeError("Cannot define new special pins like this, use add_special_pin") if not from_setitem: raise AttributeError("Special pins must be set using bracket syntax") pin0 = {} if isinstance(value, DeepCell) or ( isinstance(value, Cell) and value.hash_pattern == {"*": "#"} ): pin0 = { "celltype": "deepcell", } elif isinstance(value, DeepFolderCell): pin0 = { "celltype": "deepfolder", } elif isinstance(value, FolderCell): pin0 = { "celltype": "folder", } elif isinstance(value, Module): pin0 = { "celltype": "module", } if attr not in htf["pins"]: pin = default_pin.copy() new_pin = True pin.update(pin0) htf["pins"][attr] = pin translate = True else: pin = htf["pins"][attr] old_pin = deepcopy(pin) pin.update(pin0) if old_pin != pin: translate = True if isinstance(value, (Cell, Module, DeepCellBase)): if new_pin and isinstance(value, Cell) and not isinstance(value, SubCell) and value.celltype == "checksum": pin["celltype"] = "checksum" target_path = self._path + (attr,) assert value._parent() is parent assign_connection(parent, value._path, target_path, False) translate = True else: if self.debug.enabled and self.debug.mode == "sandbox": mount = self._get_debugmount() mount_ctx = mount.mount_ctx if htf["compiled"]: if attr not in mount.kwargs_cells: raise AttributeError(attr) attr2 = "KWARGS_" + attr return getattr(mount_ctx, attr2).set(value) else: return getattr(mount_ctx, attr).set(value) tf = self._get_tf(force=True) inp = getattr(tf, htf["INPUT"]) removed = parent.remove_connections( self._path + (attr,), endpoint="target" ) if removed: translate = True setattr(inp.handle_no_inference, attr, value) if translate: parent._translate() def _exempt(self): # assign_connection will break previous connections into self # but we must exempt self.code from this, and perhaps main_module exempt = [] htf = self._get_htf() if "code" not in htf["pins"]: exempt.append((self._path + ("code",))) if htf["compiled"] and "main_module" not in htf["pins"]: exempt.append((self._path + ("main_module",))) return exempt def _has_tf(self): htf = self._get_htf() if htf.get("UNTRANSLATED"): return False parent = self._parent() try: p = parent._gen_context for subpath in self._path: p2 = getattr(p, subpath) if isinstance(p2, SynthContext) and p2._context is not None: p2 = p2._context() p = p2 if not isinstance(p, CoreContext): raise AttributeError return True except AttributeError: return False def _get_tf(self, force=False): parent = self._get_parent2() if not self._has_tf(): if force: raise Exception("Transformer has not yet been translated") return None p = parent._gen_context for subpath in self._path: p2 = getattr(p, subpath) if isinstance(p2, SynthContext) and p2._context is not None: p2 = p2._context() p = p2 assert isinstance(p, CoreContext) return p def _get_htf(self): parent = self._parent() if parent is not None: return parent._get_node(self._path) else: assert self._node is not None return self._node def _get_debugmount(self): if not self.debug.enabled: return None if self.debug.mode != "sandbox": return None node = self._get_htf() tf = self._get_tf() if node["language"] in ("python", "ipython", "bash"): tf2 = tf.tf else: tf2 = tf.tf.executor from ..metalevel.debugmode import debugmountmanager return debugmountmanager._mounts[tf2] def _get_value(self, attr): htf = self._get_htf() if self.debug.enabled and self.debug.mode == "sandbox": mount = self._get_debugmount() mount_ctx = mount.mount_ctx if htf["compiled"]: if attr not in mount.kwargs_cells: raise AttributeError(attr) attr2 = "KWARGS_" + attr return getattr(mount_ctx, attr2).value else: return getattr(mount_ctx, attr).value tf = self._get_tf(force=True) if attr == "code": p = tf.code return p.data elif attr == htf["RESULT"]: return getattr(tf, attr).value else: inp = getattr(tf, htf["INPUT"]) p = inp.value[attr] return p
[docs] def observe(self, attr, callback, polling_interval, observe_none=False): """Observes attributes of the result, analogous to Cell.observe""" if isinstance(attr, str): attr = (attr,) path = self._path + attr return self._get_top_parent().observe( path, callback, polling_interval, observe_none=observe_none )
[docs] def unobserve(self, attr): """Analogous to Cell.unobserve""" if isinstance(attr, str): attr = (attr,) path = self._path + attr return self._get_top_parent().unobserve(path)
@property def exception(self): """Returns the exception associated with the transformer. The exception may be raised during one of three stages: 1. The construction of the input object (Transformer.inp). The input object is cell-like, see Cell.exception for more details. 2. The construction of the individual input values that are inserted into the transformer namespace before execution. 3. The execution of the transformer. For Python/IPython cells, this is the exception directly raised in code. For Bash/Docker cells, exceptions are raised upon non-zero exit codes. For compiled transformers, this stage is subdivided into generating the C header, compiling the code module, and executing the compiled code. 4. The construction of the result object (Transformer.result). The result object is cell-like, see Cell.exception for more details. """ class PlaceHolder: """Placeholder class""" htf = self._get_htf() if htf.get("UNTRANSLATED"): # pylint: disable=line-too-long return "This transformer is untranslated; run 'ctx.translate()' or 'await ctx.translation()'" tf0 = self._get_tf(force=True) tf = tf0.tf if htf["compiled"]: attrs = ( htf["INPUT"], "code", PlaceHolder, "gen_header", "integrator", "executor", htf["RESULT"], ) else: attrs = ( htf["INPUT"], "code", PlaceHolder, "ipy_template_code", "apply_ipy_template", "ipy_code", "tf", htf["RESULT"], ) exc = "" for k in attrs: if k == "code": curr_exc = tf0.exception if curr_exc is None: curr_exc = tf0.code.exception elif k is PlaceHolder: k = "input pins" exc2 = {} for childname in tf0.children: if childname.endswith("_PIN"): c = getattr(tf0, childname) if c._status_reason == StatusReasonEnum.INVALID: exc2[childname[:-4]] = c.exception if len(exc2): curr_exc = exc2 elif k == "tf": if len(exc): return exc curr_exc = tf.exception if curr_exc is not None: return curr_exc else: continue elif k in (htf["INPUT"], htf["RESULT"]): if len(exc): return exc curr_exc = getattr(self, k).exception elif k in ("ipy_template_code", "apply_ipy_template", "ipy_code"): tf2 = self._get_tf(force=True) try: item = getattr(tf2, k) except AttributeError: continue curr_exc = item.exception else: curr_exc = getattr(tf, k).exception if curr_exc is not None: if k == "executor": if isinstance(curr_exc, dict) and list(curr_exc.keys()) == [ "module" ]: curr_exc = curr_exc["module"] if isinstance(curr_exc, dict): curr_exc2 = "" for kk in curr_exc: curr_exc2 += " +++ " + kk + " +++\n" curr_exc2 += str(curr_exc[kk]).strip("\n") + "\n" curr_exc2 += " +++ /" + kk + " +++\n" curr_exc = curr_exc2 exc += "*** " + k + " ***\n" exc += str(curr_exc).strip("\n") + "\n" exc += "*** /" + k + " ***\n" curr_exc = None if not len(exc): return None return exc @property def logs(self): """Returns the stdout/stderr logs of the transformer, if any""" htf = self._get_htf() if htf.get("UNTRANSLATED"): return None tf = self._get_tf(force=True).tf if htf["compiled"]: logs = "" for k in "gen_header", "integrator", "executor": subtf = getattr(tf, k) sublogs = subtf.logs if sublogs is not None and len(sublogs.strip()): logs += "*** " + k + " ***\n" logs += sublogs.strip() + "\n" logs += "*** /" + k + " ***\n" if not len(logs): return None return logs else: return tf.logs @property def status(self): """The status of the transformer, analogous to Cell.status. See Transformer.exception about the different stages. The first stage with a non-OK status is reported.""" htf = self._get_htf() if htf.get("UNTRANSLATED"): return "Status: error (ctx needs translation)" tf = self._get_tf(force=True).tf if htf["compiled"]: attrs = ( htf["INPUT"], "code", "gen_header", "integrator", "executor", htf["RESULT"], ) else: attrs = ( htf["INPUT"], "code", "ipy_template_code", "apply_ipy_template", "ipy_code", "tf", htf["RESULT"], ) for k in attrs: pending = False upstream = False if k in (htf["INPUT"], htf["RESULT"]): if k == htf["INPUT"] and not len(htf["pins"]): continue if ( k == htf["RESULT"] and self.debug.enabled and self.debug.mode == "sandbox" ): continue cell = getattr(self, k) status = cell.status elif k == "code": status = self._get_tf(force=True).code.status elif k == "tf": status = self._get_tf(force=True).tf.status elif k in ("ipy_template_code", "apply_ipy_template", "ipy_code"): tf2 = self._get_tf(force=True) try: item = getattr(tf2, k) except AttributeError: continue status = item.status else: tf = self._get_tf(force=True).tf status = getattr(tf, k).status if not status.endswith("OK"): if status.endswith(" pending"): pending = True elif status.endswith(" upstream"): upstream = True else: return "*" + k + "*: " + status if upstream: return "Status: upstream" elif pending: return "Status: pending" return "Status: OK"
[docs] def get_transformation_checksum(self) -> Optional[str]: """Return the checksum of the transformation dict. The transformation dict contains the checksums of all input pins, including the code, as well as the following special keys: - __output__: the name (usually "result") and (sub)celltype of the output pin If it has a hash pattern, this is appended as the fourth element. - __as__ (optional): a dictionary of pin-to-variable renames (pins.pinname.as_ attribute) - __format__ (optional): a dictionary that contains deepcell and filesystem attributes The transformation checksum is the checksum of this dict. Note that in addition, a transformation dict may contain extra information that is not reflected in this checksum: - __env__: the checksum of the environment description - __meta__: meta information (Transformer.meta). - __compilers__: context-wide compiler definitions. - __languages__: context-wide language definition. Because of the double underscores, this extra information is called "dunder. `ctx.resolve(checksum, "plain")` will return the transformation dict, minus the dunder information. The checksum is treated like any other buffer, i.e. including database, assistant etc. With Transformer.get_transformation_dict(), you will obtain the full transformation dict, including the dunder. """ _ = self._get_parent2() htf = self._get_htf() tf = self._get_tf() if htf["compiled"]: return tf.tf.executor.get_transformation_checksum() else: return tf.tf.get_transformation_checksum()
async def _get_transformation_checksum_async(self) -> Optional[str]: """Async version of .get_transformation_checksum""" _ = self._get_parent2() htf = self._get_htf() tf = self._get_tf() if htf["compiled"]: coro = tf.tf.executor._get_transformation_checksum_async() else: coro = tf.tf._get_transformation_checksum_async() return await coro
[docs] def get_transformation_dict(self): """Return the full transformation dict. The transformation dict contains the checksums of all input pins, including the code. In addition, it may contain the following special keys: - __output__: the name (usually "result") and (sub)celltype of the output pin If it has a hash pattern, this is appended as the fourth element. - __env__: the checksum of the environment description - __as__: a dictionary of pin-to-variable renames (pins.pinname.as_ attribute) - __format__: a dictionary that contains deepcell and filesystem attributes Finally, it may contain additional information that is not reflected in its checksum: - __meta__: meta information (Transformer.meta). - __compilers__: context-wide compiler definitions. - __languages__: context-wide language definition.""" from seamless.core.cache.transformation_cache import transformation_cache checksum = self.get_transformation_checksum() if checksum is None: return None return transformation_cache.get_transformation_dict(checksum)
async def _dummy_run_async(self, _): parent = self._parent() await parent.translation() while self.status == "Status: pending": await parent.computation(0.05) result = self.result.checksum if result is None: # glitch await parent.computation(0.1) result = self.result.checksum return result def _dummy_run_sync(self, _): parent = self._parent() parent.translate() while self.status == "Status: pending": parent.compute(0.05) result = self.result.checksum if result is None: # glitch parent.compute(0.1) result = self.result.checksum return result def get_transformation(self) -> "Transformation": from .direct import Transformation, transformation_from_dict from ..core.direct.run import _get_node_transformation_dependencies, _node_to_transformation_dict result_celltype = self._get_htf().get("result_celltype", "mixed") if self._parent() is not None: resolver_sync = lambda _: self.get_transformation_checksum() resolver_async = lambda _: self._get_transformation_checksum_async evaluator_sync = self._dummy_run_sync evaluator_async = self._dummy_run_async upstream_dependencies = {} return Transformation( result_celltype, resolver_sync, resolver_async, evaluator_sync, evaluator_async, upstream_dependencies ) else: if self.code is None: raise RuntimeError("This transformer has no code") node = self._node.copy() transformation_dict = _node_to_transformation_dict(node) upstream_dependencies = _get_node_transformation_dependencies(node) return transformation_from_dict(transformation_dict, result_celltype, upstream_dependencies)
[docs] def cancel(self) -> None: """Hard-cancels the transformer. This will send a cancel signal that will kill the transformation if it is running. The transformation is killed with a HardCancelError exception. Clearing the exception using Transformer.clear_exception will restart the transformation. This affects both local and remote execution. """ tf = self._get_tf(force=True) htf = self._get_htf() if htf["compiled"]: for attr in "gen_header", "integrator", "executor": tf2 = getattr(tf, attr) tf2.hard_cancel() else: tf.tf.hard_cancel()
[docs] def undo(self) -> str | None: """Attempt to undo a finished transformer. This may be useful in the case of non-reproducible transformers. While the correct solution is to make them deterministic, this method will allow repeated execution under various conditions, in order to investigate the issue. If the transformer has no associated transformation (e.g. undefined inputs) or the transformation result is not known, an exception is raised. Otherwise, the database is contacted in order to contest the result. If the database returns an error message, that is returned as string. """ from seamless.core.cache.transformation_cache import transformation_cache if self._parent() is not None: tcache = self._parent()._manager.cachemanager.transformation_cache else: tcache = transformation_cache tf_checksum = self.get_transformation_checksum() if tf_checksum is None: raise RuntimeError("Transformer has no defined transformation") result = tcache.undo(tf_checksum) if not isinstance(result, bytes): return result
@property def self(self): """Returns a wrapper where the pins are not directly accessible. By default, a pin called "compute" will cause "transformer.status" to return the pin, and not the actual transformer status. To be sure to get the transformer status, you can invoke transformer.self.status. NOTE: experimental, requires more testing """ attributelist = [k for k in type(self).__dict__ if not k.startswith("_")] return SelfWrapper(self, attributelist) @property def link_options(self) -> list[str]: """Linker options for compiled modules They are a list of strings, for example: ["-lm", "-lgfortran", "-lcudart"] """ htf = self._get_htf() if not htf["compiled"]: raise AttributeError("Only for compiled transformers") return deepcopy(htf.get("link_options", [])) @link_options.setter def link_options(self, link_options): htf = self._get_htf() if not htf["compiled"]: raise AttributeError("Only for compiled transformers") ok = True if not isinstance(link_options, list): ok = False else: for opt in link_options: if not isinstance(opt, str): ok = False if not ok: raise TypeError("link_options must be a list of strings") htf["link_options"] = deepcopy(link_options) self._parent()._translate()
[docs] def copy(self): """If not bound to a context, return a copy of the Transformer. If bound to a workflow, return a copy wrapper. This wrapper can be assigned to a new Context attribute, creating a copy of the current Transformer, where input parameters and connections to input pins are all copied.""" if self._parent() is None: node = self._get_htf() node = deepcopy(node) tf = Transformer() tf._node = node return tf else: return TransformerCopy(self)
@property def execution_metadata(self): tf = self._get_tf(force=True) htf = self._get_htf() if htf["compiled"]: tf2 = tf.tf.executor else: tf2 = tf.tf return tf2.execution_metadata def __getattribute__(self, attr): if attr.startswith("_"): return super().__getattribute__(attr) if hasattr(type(self), attr) or attr in self.__dict__ or attr == "path": return super().__getattribute__(attr) return self._getattr(attr) def __getitem__(self, item): if not isinstance(item, str): raise TypeError("item must be 'str', not '{}'".format(type(item))) return self._getattr(item) def _getattr(self, attr): htf = self._get_htf() dirs = None deleter = None setter = None pull_source = functools.partial(self._pull_source, attr) if attr in htf["pins"] and attr != "pins": getter = functools.partial(self._valuegetter, attr) pin = getattr(self.pins, attr) setter = functools.partial(setattr, pin) dirs = ["value", "celltype", "subcelltype", "as_"] proxycls = Proxy elif attr == "pins": return PinsWrapper(self) elif attr == "code": getter = self._codegetter deleter = self._code_deleter dirs = ["value", "mount", "mimetype", "celltype"] proxycls = CodeProxy elif attr == htf["INPUT"]: getter = self._inputgetter dirs = [ "value", "buffered", "data", "checksum", "schema", "example", "status", "exception", "add_validator", "handle", ] + list(htf["pins"].keys()) setter = self._inputsetter return Proxy( self, (attr,), "w", pull_source=None, getter=getter, dirs=dirs, setter=setter, deleter=deleter, ) elif attr == htf["RESULT"]: getter = self._resultgetter dirs = [ "value", "buffered", "data", "checksum", "schema", "example", "exception", "add_validator", "celltype", "hash_pattern", ] setter = self._resultsetter pull_source = None proxycls = Proxy elif attr == "main_module": if not htf["compiled"]: raise AttributeError(attr) if htf["compiled"]: return CompiledObjectDict(self) else: raise AttributeError(attr) mode = "w" if setter is not None else "r" return proxycls( self, (attr,), mode, pull_source=pull_source, getter=getter, setter=setter, dirs=dirs, ) def _sub_mount_header(self, path=None, mode="w", authority="cell", persistent=True): assert mode == "w" assert authority == "cell" return self._sub_mount( "header", path=path, mode=mode, authority=authority, persistent=persistent ) def _sub_mount(self, attr, path=None, mode="rw", authority="file", persistent=True): htf = self._get_htf() if attr == "header": assert htf["compiled"] if path is None: if "mount" in htf: mount = htf["mount"] if attr in mount: mount.pop(attr) if not len(mount): htf.pop("mount") self._get_htf()["UNTRANSLATED"] = True self._parent()._translate() return mount = { "path": path, "mode": mode, "authority": authority, "persistent": persistent, } if not "mount" in htf: htf["mount"] = {} htf["mount"][attr] = mount self._get_htf()["UNTRANSLATED"] = True self._parent()._translate() def _codegetter(self, attr): if attr == "celltype": return "code" elif attr == "value": if self.debug.enabled and self.debug.mode == "sandbox": mount = self._get_debugmount() mount_ctx = mount.mount_ctx attr = "code" if self.self.language == "bash": attr = "bashcode" return getattr(mount_ctx, attr).value tf = self._get_tf(force=True) return tf.code.value elif attr == "mount": return functools.partial(self._sub_mount, "code") elif attr == "mimetype": htf = self._get_htf() language = htf["language"] mimetype = language_to_mime(language) return mimetype elif attr == "checksum": tf = self._get_tf(force=True) return tf.code.checksum else: raise AttributeError(attr) def _code_deleter(self, attr): if attr == "mount": htf = self._get_htf() if "mount" in htf: mount = htf["mount"] if "code" in mount: mount.pop("code") if not len(mount): htf.pop("mount") self._get_htf()["UNTRANSLATED"] = True self._parent()._translate() else: raise AttributeError(attr) def _inputgetter(self, attr): htf = self._get_htf() if attr in htf["pins"]: return getattr(self, attr) if self.debug.enabled and self.debug.mode == "sandbox": mount = self._get_debugmount() if attr != "value": if attr in ("status", "exception"): return getattr(mount.tf, attr) raise AttributeError(attr) mount_ctx = mount.mount_ctx result = {} if htf["compiled"]: result = {} for k in mount.kwargs_cells: kk = "KWARGS_" + k c = getattr(mount_ctx, kk) result[k] = c.value else: for k in mount.tf._pins: c = getattr(mount_ctx, k) result[k] = c.value return result if attr not in ("value", "status", "exception"): tf = self._get_tf(force=True) inputcell = getattr(tf, htf["INPUT"]) if attr == "value": if htf.get("UNTRANSLATED"): return None tf = self._get_tf(force=True) inputcell = getattr(tf, htf["INPUT"]) return inputcell.value elif attr == "data": return inputcell.data elif attr == "buffered": return inputcell.buffer.value elif attr == "checksum": return inputcell.checksum elif attr == "handle": return inputcell.handle_no_inference elif attr == "schema": schema = inputcell.handle.schema return SchemaWrapper(self, schema, "SCHEMA") elif attr == "example": return self.example elif attr == "status": if htf.get("UNTRANSLATED"): # pylint: disable=line-too-long return "This transformer is untranslated; run 'ctx.translate()' or 'await ctx.translation()'" tf = self._get_tf(force=True) inputcell = getattr(tf, htf["INPUT"]) return ( inputcell._data.status ) # TODO; take into account validation, inchannel status elif attr == "exception": if htf.get("UNTRANSLATED"): return "Status: error (ctx needs translation)" tf = self._get_tf(force=True) inputcell = getattr(tf, htf["INPUT"]) return inputcell.exception elif attr == "add_validator": handle = inputcell.handle_no_inference return handle.add_validator raise AttributeError(attr) def _inputsetter(self, attr, value): if attr in ( "value", "data", "buffered", "checksum", "handle", "schema", "example", "status", "exception", "add_validator", ): raise AttributeError(attr) if isinstance(value, Cell): raise TypeError(value) htf = self._get_htf() if self.debug.enabled: if attr in htf["pins"]: return setattr(self, attr, value) raise AttributeError(attr) if not self._has_tf(): if isinstance(value, Resource): value = value.data if "TEMP" not in htf or htf["TEMP"] is None: htf["TEMP"] = {} if "input_auth" not in htf["TEMP"]: htf["TEMP"]["input_auth"] = {} get_form(value) htf["TEMP"]["input_auth"][attr] = value self._parent()._translate() return tf = self._get_tf(force=True) inputcell = getattr(tf, htf["INPUT"]) handle = inputcell.handle_no_inference setattr(handle, attr, value) def _resultgetter(self, attr): htf = self._get_htf() if attr == "celltype": return htf.get("result_celltype", "structured") if htf.get("UNTRANSLATED"): if attr in ("schema", "example", "exception"): raise Exception("Transformer has not yet been translated") return None if attr == "value" and self.debug.enabled and self.debug.mode == "sandbox": mount = self._get_debugmount() mount_ctx = mount.mount_ctx return getattr(mount_ctx, htf["RESULT"]).value tf = self._get_tf(force=True) resultcell = getattr(tf, htf["RESULT"]) if attr == "mount": raise Exception("Result cells cannot be mounted") if attr == "value": return resultcell.value elif attr == "data": return resultcell.data elif attr == "buffered": if self.result.celltype != "structured": raise AttributeError(attr) return resultcell.buffer.value elif attr == "checksum": return resultcell.checksum elif attr == "schema": if self.result.celltype != "structured": raise AttributeError(attr) schema = self._result_example().schema return SchemaWrapper(self, schema, "RESULTSCHEMA") elif attr == "example": if self.result.celltype != "structured": raise AttributeError(attr) return self._result_example() elif attr == "exception": return resultcell.exception elif attr == "add_validator": if self.result.celltype != "structured": raise AttributeError(attr) result_ctx = resultcell._data._context() handle = result_ctx.example.handle return handle.add_validator return getattr(resultcell, attr) def _resultsetter(self, attr, value): from .Cell import celltypes if attr != "celltype": raise AttributeError(attr) htf = self._get_htf() if htf["compiled"] and attr != "structured": raise Exception("Compiled transformers must have structured result cells") if value == "deepfolder": raise TypeError('"deepfolder" not allowed. Use "folder" instead') if value not in celltypes and value not in ("deepcell", "folder"): raise TypeError(value) htf["result_celltype"] = value parent = self._parent() if parent is not None: parent._translate() def _valuegetter(self, attr, attr2): if attr2 == "celltype": return getattr(self.pins, attr).celltype if attr2 == "subcelltype": return getattr(self.pins, attr).subcelltype if attr2 == "as_": return getattr(self.pins, attr).as_ if attr2 != "value": raise AttributeError(attr2) return self._get_value(attr) def _pull_source(self, attr, path): from .assign import assign_connection tf = self._get_tf() htf = self._get_htf() parent = self._parent() def set_mount(node): if "mount" not in htf: return if htf["mount"].get("code") is None: return node["mount"] = htf["mount"].pop("code") language = htf["language"] value = None if attr == "code": if tf is not None: p = tf.code value = p.data elif "TEMP" in htf and "code" in htf["TEMP"]: value = htf["TEMP"]["code"] cell = { "path": path, "type": "cell", "celltype": "code", "language": language, "transformer": True, "UNTRANSLATED": True, } if value is not None: assert isinstance(value, str), type(value) cell["TEMP"] = value if "checksum" in htf: htf["checksum"].pop("code", None) set_mount(cell) else: raise NotImplementedError if tf is not None: inp = getattr(tf, htf["INPUT"]) p = getattr(inp.value, attr) value = p.value cell = { "path": path, "type": "cell", "celltype": "structured", "datatype": "mixed", } child = Cell(parent=parent, path=path) # inserts itself as child parent._graph[0][path] = cell if "file_extension" in htf: child.mimetype = htf["file_extension"] else: mimetype = language_to_mime(language) child.mimetype = mimetype target_path = self._path + (attr,) assign_connection(parent, path, target_path, False) htf["UNTRANSLATED"] = True parent._translate() def _observe_input(self, checksum): if self._parent() is None: return try: htf = self._get_htf() except Exception: return if htf.get("checksum") is None: htf["checksum"] = {} htf["checksum"].pop("input_temp", None) htf["checksum"].pop("input", None) htf["checksum"].pop("input_buffer", None) if checksum is not None: htf["checksum"]["input"] = checksum def _observe_input_auth(self, checksum): if self._parent() is None: return try: htf = self._get_htf() except Exception: return if htf.get("checksum") is None: htf["checksum"] = {} htf["checksum"].pop("input_temp", None) htf["checksum"].pop("input_auth", None) if checksum is not None: htf["checksum"]["input_auth"] = checksum def _observe_input_buffer(self, checksum): if self._parent() is None: return try: htf = self._get_htf() except Exception: return if htf.get("checksum") is None: htf["checksum"] = {} htf["checksum"].pop("input_temp", None) if checksum is None: htf["checksum"].pop("input", None) htf["checksum"].pop("input_buffer", None) else: if "input" not in htf["checksum"]: htf["checksum"]["input_buffer"] = checksum def _observe_code(self, checksum): if self._parent() is None: return try: htf = self._get_htf() except Exception: return if htf.get("checksum") is None: htf["checksum"] = {} htf["checksum"].pop("code", None) if checksum is not None: htf["checksum"]["code"] = checksum def _observe_result(self, checksum): if self._parent() is None: return try: htf = self._get_htf() except Exception: return if htf.get("checksum") is None: htf["checksum"] = {} htf["checksum"]["result"] = checksum def _observe_schema(self, checksum): if self._parent() is None: return try: htf = self._get_htf() except Exception: return if htf.get("checksum") is None: htf["checksum"] = {} htf["checksum"]["schema"] = checksum def _observe_result_schema(self, checksum): if self._parent() is None: return try: htf = self._get_htf() except Exception: return if htf.get("checksum") is None: htf["checksum"] = {} htf["checksum"]["result_schema"] = checksum def _observe_main_module(self, checksum): if self._parent() is None: return try: htf = self._get_htf() except Exception: return if htf.get("checksum") is None: htf["checksum"] = {} htf["checksum"]["main_module"] = checksum def _set_observers(self): try: htf = self._get_htf() except KeyError: # fail silently, as we are probably part of a macro return """ # Disable the code below. # for now, assume that the internal structure of foreign transformers # is not too different... if htf["compiled"] or htf["language"] not in ("python", "ipython", "bash", "docker"): if htf["compiled"]: pass else: raise NotImplementedError # NOTE: observers depend on the implementation of # translate_XXX_transformer (midlevel) """ tf = self._get_tf(force=True) tf.code._set_observer(self._observe_code) inp = htf["INPUT"] inpcell = getattr(tf, inp) inpcell.auth._set_observer(self._observe_input_auth) inpcell.buffer._set_observer(self._observe_input_buffer) inpcell._data._set_observer(self._observe_input) schemacell = inpcell.schema schemacell._set_observer(self._observe_schema) result = htf["RESULT"] resultcell = getattr(tf, result) if self.result.celltype == "structured": resultcell._data._set_observer(self._observe_result) schemacell = resultcell.schema schemacell._set_observer(self._observe_result_schema) else: resultcell._set_observer(self._observe_result) if htf["compiled"]: tf.main_module.auth._set_observer(self._observe_main_module) def __delattr__(self, attr): if attr.startswith("_"): return super().__delattr__(attr) return delattr(self.pins, attr) # TODO: self.self.pins... def __dir__(self): htf = self._get_htf() d = super().__dir__() std = ["code", "pins", htf["RESULT"], htf["INPUT"], "exception", "status"] if htf["compiled"]: std += list(("main_module", "header")) pins = list(htf["pins"].keys()) return sorted(d + pins + std) def __str__(self): return "Seamless Transformer: " + self.path def __repr__(self): return str(self)
class TransformerCopy: """Wrapper around a Transformer. To facilitate copying.""" def __init__(self, transformer): self.transformer = weakref.ref(transformer) from .synth_context import SynthContext from .assign import check_libinstance_subcontext_binding from ..core.status import StatusReasonEnum from .DeepCell import DeepCellBase, DeepCell, DeepFolderCell from .SubCell import SubCell