# Workflow builder

In this part, you can find common implementation of various conductor tasks. It's pythonic representation of conductor operators and system tasks This implementation depends on frinx-python-sdk and uses Pydantic to serialize workflow definition to result in JSON format.

# DECISION TASK

self.tasks.append(DecisionTask(
    name="decision",
    task_reference_name="decision",
    decision_cases={
        "true": [
            HumanTask(
                name="human",
                task_reference_name="human"
            )
        ],
    },
    default_case=[
        TerminateTask(
            name="terminate",
            task_reference_name="terminate",
            input_parameters=TerminateTaskInputParameters(
                termination_status=WorkflowStatus.FAILED
            ))
    ],
    input_parameters=DecisionTaskInputParameters(
        status="${workflow.input.status}"
    ),
    case_expression="$.status === 'true' ? 'true' : 'false'"
))
self.tasks.append(DecisionCaseValueTask(
    name="decision",
    task_reference_name="decision",
    decision_cases={
        "true": [
            HumanTask(
                name="human",
                task_reference_name="human"
            )
        ],
    },
    default_case=[
        TerminateTask(
            name="terminate",
            task_reference_name="terminate",
            input_parameters=TerminateTaskInputParameters(
                termination_status=WorkflowStatus.FAILED
            ))
    ],
    input_parameters=DecisionCaseValueTaskInputParameters(
        case_value_param="${workflow.input.status}"
    ),
))

# DO_WHILE TASK

loop_tasks = WaitDurationTask(
    name="wait",
    task_reference_name="wait",
    input_parameters=WaitDurationTaskInputParameters(
        duration="1 seconds"
    )
)

self.tasks.append(DoWhileTask(
    name="do_while",
    task_reference_name="LoopTask",
    loop_condition="if ( $.LoopTask['iteration'] < $.value ) { true; } else { false; }",
    loop_over=[
        loop_tasks
    ],
    input_parameters={
        "value": workflow_inputs.value.wf_input
    }
))

# DYNAMIC_FORK TASK

task_inputs = InventoryWorkflows.InstallDeviceByName.WorkflowInput()

fork_inputs = [
    {
        task_inputs.device_name.name: "IOS01"
    },
    {
        task_inputs.device_name.name: "IOS02"
    },
    {
        task_inputs.device_name.name: "IOS02"
    }
]

self.tasks.append(DynamicForkTask(
    name="dyn_fork",
    task_reference_name="dyn_fork",
    input_parameters=DynamicForkArraysTaskFromDefInputParameters(
        fork_task_name=InventoryWorkflows.InstallDeviceByName,
        fork_task_inputs=fork_inputs
    ),
))

self.tasks.append(JoinTask(
    name="join",
    task_reference_name="join"
))
self.tasks.append(DynamicForkTask(
    name="dyn_fork",
    task_reference_name="dyn_fork",
    input_parameters=DynamicForkTaskFromDefInputParameters(
        dynamic_tasks=InventoryWorkflows.InstallDeviceByName,
        dynamic_tasks_input=workflow_inputs.device_name.wf_input
    ),
))

self.tasks.append(JoinTask(
    name="join",
    task_reference_name="join"
))
task_inputs = InventoryWorkflows.InstallDeviceByName.WorkflowInput()

fork_inputs = [
    {
        task_inputs.device_name.name: "IOS01"
    },
    {
        task_inputs.device_name.name: "IOS02"
    },
    {
        task_inputs.device_name.name: "IOS02"
    }
]

input_parameters = DynamicForkArraysTaskInputParameters(
    fork_task_name="Install_device_by_name",
    fork_task_inputs=fork_inputs
)

self.tasks.append(DynamicForkTask(
    name="dyn_fork",
    task_reference_name="dyn_fork",
    input_parameters=input_parameters
))
self.tasks.append(DynamicForkTask(
    name="dyn_fork",
    task_reference_name="dyn_fork",
    input_parameters=DynamicForkTaskInputParameters(
        dynamic_tasks_input="Install_device_by_name",
        dynamic_tasks=[
            {
                task_inputs.device_name.name: "IOS01"
            },
            {
                task_inputs.device_name.name: "IOS02"
            },
            {
                task_inputs.device_name.name: "IOS02"
            }
        ]
    )
))

# EVENT TASK

self.tasks.append(EventTask(
    name="Event",
    task_reference_name="event_a",
    sink="conductor:Wait_task",
    async_complete=False
))

# EXCLUSIVE_JOIN TASK

self.tasks.append(ExclusiveJoinTask(
    name="exclusive_join",
    task_reference_name="exclusive_join",
))

A list of task reference names that this JOIN task will wait for completion

self.tasks.append(ExclusiveJoinTask(
    name="exclusive_join",
    task_reference_name="exclusive_join",
    join_on=["task1", "task2"]
))

# FORK_JOIN TASK

fork_tasks_a = []
fork_tasks_b = []

fork_tasks_a.append(SimpleTask(
    name=Inventory.InventoryAddDevice,
    task_reference_name="add_device_cli",
    input_parameters=SimpleTaskInputParameters(
        root=dict(
            device_name="IOS01",
            zone="uniconfig",
            service_state="IN_SERVICE",
            mount_body="body"
        )
    )
))

fork_tasks_a.append(SimpleTask(
    name=Inventory.InventoryInstallDeviceByName,
    task_reference_name="install_device_cli",
    input_parameters=SimpleTaskInputParameters(
        root=dict(
            device_name="IOS01"
        )
    )
))

fork_tasks_b.append(SimpleTask(
    name=Inventory.InventoryAddDevice,
    task_reference_name="add_device_netconf",
    input_parameters=SimpleTaskInputParameters(
        root=dict(
            device_name="NTF01",
            zone="uniconfig",
            service_state="IN_SERVICE",
            mount_body="body"
        )
    )
))

fork_tasks_b.append(SimpleTask(
    name=Inventory.InventoryInstallDeviceByName,
    task_reference_name="install_device_netconf",
    input_parameters=SimpleTaskInputParameters(
        root=dict(
            device_name="NTF01"
        )
    )
))

self.tasks.append(ForkTask(
    name="fork",
    task_reference_name="fork",
    fork_tasks=[
        fork_tasks_a,
        fork_tasks_b
    ]
))

# HUMAN TASK

self.tasks.append(HumanTask(
    name="human",
    task_reference_name="human"
))

# INLINE TASK

self.tasks.append(InlineTask(
    name="inline",
    task_reference_name="inline",
    input_parameters=InlineTaskInputParameters(
        expression='if ($.value){return {"result": true}} else { return {"result": false}}',
        value="${workflow.variables.test}"
    )))

INFO: expression wrapped into javascript function:

expression = "function e() { if ($.value){return {\"result\": true}} else { return {\"result\": false}} } e();"

# JOIN TASK

Read more on conductor-oss docs

self.tasks.append(JoinTask(
    name="join",
    task_reference_name="join"
))

A list of task reference names that this JOIN task will wait for completion

self.tasks.append(JoinTask(
    name="join",
    task_reference_name="join",
    join_on=["task1", "task2"]
))

# JSON_JQ_TRANSFORM TASK

json_jq = JsonJqTask(
    name="json_jq",
    task_reference_name="json_jq",
    input_parameters=JsonJqTaskInputParameters(
        query_expression="{ key3: (.key1.value1 + .key2.value2) }",
        key_1={
            "value1": [
                "a",
                "b"
            ]
        },
        key2={
            "value2": [
                "c",
                "d"
            ]
        }
    )
)
self.tasks.append(json_jq)

# SET_VARIABLE TASK

self.tasks.append(SetVariableTask(
    name="var",
    task_reference_name="var",
    input_parameters=SetVariableTaskInputParameters(
        root=dict(
            env="frinx"
        )
    )
))

# SIMPLE TASK

self.tasks.append(
    SimpleTask(
        name=Inventory.InventoryAddDevice,
        task_reference_name="test",
        input_parameters=SimpleTaskInputParameters(
            root=dict(
                device_name="IOS01",
                zone="uniconfig",
                service_state="aha",
                mount_body="body"
            )
        )
    )
)

# START_WORKFLOW TASK

Start Workflow is an operator task used to start another workflow from an existing workflow. Unlike a sub-workflow task, a start workflow task doesn’t create a relationship between the current workflow and the newly started workflow. That means it doesn’t wait for the started workflow to get completed.

# INPUT PARAMETERS

start_workflow:

  • StartWorkflowTaskInputParameters : StartWorkflowTaskPlainInputParameters|StartWorkflowTaskFromDefInputParameters
  • StartWorkflowTaskPlainInputParameters
  • StartWorkflowTaskFromDefInputParameters
workflow_input_parameters = {
    InventoryWorkflows.InstallDeviceByName.WorkflowInput().device_name.name: "IOS01"
}

task_inputs = StartWorkflowTaskInputParameters(
    start_workflow=StartWorkflowTaskFromDefInputParameters(
        workflow=InventoryWorkflows.InstallDeviceByName,
        input=workflow_input_parameters
    )
)

self.tasks.append(StartWorkflowTask(
    name="Install_device_by_name",
    task_reference_name="start",
    input_parameters=task_inputs
))

# SUBWORKFLOW TASK

sub_workflow_param = SubWorkflowParam(
    name=InventoryWorkflows.AddDeviceToInventory.__name__,
    version=1
)

workflows_inputs = InventoryWorkflows.AddDeviceToInventory.WorkflowInput()

sub_workflow_input = {}
sub_workflow_input.setdefault(workflows_inputs.device_name.name, "IOS01")
sub_workflow_input.setdefault(workflows_inputs.zone.name, "uniconfig")

self.tasks.append(SubWorkflowTask(
    name="subworkflow",
    task_reference_name="subworkflow",
    sub_workflow_param=sub_workflow_param,
    input_parameters=SubWorkflowInputParameters(
        root=sub_workflow_input
    )
))

SubWorkflowFromDefParam validate subworkflow and workflow inputs

sub_workflow_param = SubWorkflowFromDefParam(
    name=InventoryWorkflows.AddDeviceToInventory
)

workflows_inputs = InventoryWorkflows.AddDeviceToInventory.WorkflowInput()

sub_workflow_input = {}
sub_workflow_input.setdefault(workflows_inputs.device_name.name, "IOS01")
sub_workflow_input.setdefault(workflows_inputs.zone.name, "uniconfig")

self.tasks.append(SubWorkflowTask(
    name="subworkflow",
    task_reference_name="subworkflow",
    sub_workflow_param=sub_workflow_param,
    input_parameters=SubWorkflowInputParameters(
        root=sub_workflow_input
    )
))

# SWITCH TASK

  • SwitchTaskValueParamInputParameters -> VALUE-PARAM
  • SwitchTaskInputParameters -> JAVASCRIPT

VALUE-PARAM evaluator type

switch = SwitchTask(
    name="switch",
    task_reference_name="switch",
    decision_cases={
        "true": [
            WaitDurationTask(
                name="wait",
                task_reference_name="wait1",
                input_parameters=WaitDurationTaskInputParameters(
                    duration="10 seconds"
                )
            )
        ]},
    default_case=[
        WaitDurationTask(
            name="wait",
            task_reference_name="wait2",
            input_parameters=WaitDurationTaskInputParameters(
                duration="10 seconds"
            )
        )
    ],
    expression="switch_case_value",
    evaluator_type=SwitchEvaluatorType.VALUE_PARAM,
    input_parameters=SwitchTaskValueParamInputParameters(
        switch_case_value="${workflow.input.value}"
    )
)
self.tasks.append(switch)
switch = SwitchTask(
    name="switch",
    task_reference_name="switch",
    decision_cases={
        "true": [
            WaitDurationTask(
                name="wait",
                task_reference_name="wait1",
                input_parameters=WaitDurationTaskInputParameters(
                    duration="10 seconds"
                )
            )
        ]},
    default_case=[
        WaitDurationTask(
            name="wait",
            task_reference_name="wait2",
            input_parameters=WaitDurationTaskInputParameters(
                duration="10 seconds"
            )
        )
    ],
    expression="$.inputValue == 'true' ? 'true' : 'false'",
    evaluator_type=SwitchEvaluatorType.JAVASCRIPT,
    input_parameters=SwitchTaskInputParameters(
        input_value="${workflow.input.value}"
    )
)

self.tasks.append(switch)

# TERMINATE TASK

TerminateTask(
    name="terminate",
    task_reference_name="terminate",
    input_parameters=TerminateTaskInputParameters(
        termination_status=WorkflowStatus.COMPLETED,
        workflow_output={"output": "COMPLETED"}
    )
)

# WAIT_DURATION TASK

self.tasks.append(WaitDurationTask(
    name="WAIT",
    task_reference_name="WAIT",
    input_parameters=WaitDurationTaskInputParameters(
        duration="10 seconds"
    )
))

# WAIT_UNTIL TASK

self.tasks.append(WaitUntilTask(
    name="WAIT_UNTIL",
    task_reference_name="WAIT_UNTIL",
    input_parameters=WaitUntilTaskInputParameters(
        until='2022-12-25 09:00 PST'
    )
))