#
Workflow builder
In this part, you can find common implementation of various conductor tasks. It's pythonic representation of conductor operators and system tasks This implementation depends on frinx-python-sdk and uses Pydantic to serialize workflow definition to result in JSON format.
#
DECISION TASK
self.tasks.append(DecisionTask(
name="decision",
task_reference_name="decision",
decision_cases={
"true": [
HumanTask(
name="human",
task_reference_name="human"
)
],
},
default_case=[
TerminateTask(
name="terminate",
task_reference_name="terminate",
input_parameters=TerminateTaskInputParameters(
termination_status=WorkflowStatus.FAILED
))
],
input_parameters=DecisionTaskInputParameters(
status="${workflow.input.status}"
),
case_expression="$.status === 'true' ? 'true' : 'false'"
))
self.tasks.append(DecisionCaseValueTask(
name="decision",
task_reference_name="decision",
decision_cases={
"true": [
HumanTask(
name="human",
task_reference_name="human"
)
],
},
default_case=[
TerminateTask(
name="terminate",
task_reference_name="terminate",
input_parameters=TerminateTaskInputParameters(
termination_status=WorkflowStatus.FAILED
))
],
input_parameters=DecisionCaseValueTaskInputParameters(
case_value_param="${workflow.input.status}"
),
))
#
DO_WHILE TASK
loop_tasks = WaitDurationTask(
name="wait",
task_reference_name="wait",
input_parameters=WaitDurationTaskInputParameters(
duration="1 seconds"
)
)
self.tasks.append(DoWhileTask(
name="do_while",
task_reference_name="LoopTask",
loop_condition="if ( $.LoopTask['iteration'] < $.value ) { true; } else { false; }",
loop_over=[
loop_tasks
],
input_parameters={
"value": workflow_inputs.value.wf_input
}
))
#
DYNAMIC_FORK TASK
task_inputs = InventoryWorkflows.InstallDeviceByName.WorkflowInput()
fork_inputs = [
{
task_inputs.device_name.name: "IOS01"
},
{
task_inputs.device_name.name: "IOS02"
},
{
task_inputs.device_name.name: "IOS02"
}
]
self.tasks.append(DynamicForkTask(
name="dyn_fork",
task_reference_name="dyn_fork",
input_parameters=DynamicForkArraysTaskFromDefInputParameters(
fork_task_name=InventoryWorkflows.InstallDeviceByName,
fork_task_inputs=fork_inputs
),
))
self.tasks.append(JoinTask(
name="join",
task_reference_name="join"
))
self.tasks.append(DynamicForkTask(
name="dyn_fork",
task_reference_name="dyn_fork",
input_parameters=DynamicForkTaskFromDefInputParameters(
dynamic_tasks=InventoryWorkflows.InstallDeviceByName,
dynamic_tasks_input=workflow_inputs.device_name.wf_input
),
))
self.tasks.append(JoinTask(
name="join",
task_reference_name="join"
))
task_inputs = InventoryWorkflows.InstallDeviceByName.WorkflowInput()
fork_inputs = [
{
task_inputs.device_name.name: "IOS01"
},
{
task_inputs.device_name.name: "IOS02"
},
{
task_inputs.device_name.name: "IOS02"
}
]
input_parameters = DynamicForkArraysTaskInputParameters(
fork_task_name="Install_device_by_name",
fork_task_inputs=fork_inputs
)
self.tasks.append(DynamicForkTask(
name="dyn_fork",
task_reference_name="dyn_fork",
input_parameters=input_parameters
))
self.tasks.append(DynamicForkTask(
name="dyn_fork",
task_reference_name="dyn_fork",
input_parameters=DynamicForkTaskInputParameters(
dynamic_tasks_input="Install_device_by_name",
dynamic_tasks=[
{
task_inputs.device_name.name: "IOS01"
},
{
task_inputs.device_name.name: "IOS02"
},
{
task_inputs.device_name.name: "IOS02"
}
]
)
))
#
EVENT TASK
self.tasks.append(EventTask(
name="Event",
task_reference_name="event_a",
sink="conductor:Wait_task",
async_complete=False
))
#
EXCLUSIVE_JOIN TASK
self.tasks.append(ExclusiveJoinTask(
name="exclusive_join",
task_reference_name="exclusive_join",
))
A list of task reference names that this JOIN task will wait for completion
self.tasks.append(ExclusiveJoinTask(
name="exclusive_join",
task_reference_name="exclusive_join",
join_on=["task1", "task2"]
))
#
FORK_JOIN TASK
fork_tasks_a = []
fork_tasks_b = []
fork_tasks_a.append(SimpleTask(
name=Inventory.InventoryAddDevice,
task_reference_name="add_device_cli",
input_parameters=SimpleTaskInputParameters(
root=dict(
device_name="IOS01",
zone="uniconfig",
service_state="IN_SERVICE",
mount_body="body"
)
)
))
fork_tasks_a.append(SimpleTask(
name=Inventory.InventoryInstallDeviceByName,
task_reference_name="install_device_cli",
input_parameters=SimpleTaskInputParameters(
root=dict(
device_name="IOS01"
)
)
))
fork_tasks_b.append(SimpleTask(
name=Inventory.InventoryAddDevice,
task_reference_name="add_device_netconf",
input_parameters=SimpleTaskInputParameters(
root=dict(
device_name="NTF01",
zone="uniconfig",
service_state="IN_SERVICE",
mount_body="body"
)
)
))
fork_tasks_b.append(SimpleTask(
name=Inventory.InventoryInstallDeviceByName,
task_reference_name="install_device_netconf",
input_parameters=SimpleTaskInputParameters(
root=dict(
device_name="NTF01"
)
)
))
self.tasks.append(ForkTask(
name="fork",
task_reference_name="fork",
fork_tasks=[
fork_tasks_a,
fork_tasks_b
]
))
#
HUMAN TASK
self.tasks.append(HumanTask(
name="human",
task_reference_name="human"
))
#
INLINE TASK
self.tasks.append(InlineTask(
name="inline",
task_reference_name="inline",
input_parameters=InlineTaskInputParameters(
expression='if ($.value){return {"result": true}} else { return {"result": false}}',
value="${workflow.variables.test}"
)))
INFO: expression wrapped into javascript function:
expression = "function e() { if ($.value){return {\"result\": true}} else { return {\"result\": false}} } e();"
#
JOIN TASK
Read more on conductor-oss docs
self.tasks.append(JoinTask(
name="join",
task_reference_name="join"
))
A list of task reference names that this JOIN task will wait for completion
self.tasks.append(JoinTask(
name="join",
task_reference_name="join",
join_on=["task1", "task2"]
))
#
JSON_JQ_TRANSFORM TASK
json_jq = JsonJqTask(
name="json_jq",
task_reference_name="json_jq",
input_parameters=JsonJqTaskInputParameters(
query_expression="{ key3: (.key1.value1 + .key2.value2) }",
key_1={
"value1": [
"a",
"b"
]
},
key2={
"value2": [
"c",
"d"
]
}
)
)
self.tasks.append(json_jq)
#
SET_VARIABLE TASK
self.tasks.append(SetVariableTask(
name="var",
task_reference_name="var",
input_parameters=SetVariableTaskInputParameters(
root=dict(
env="frinx"
)
)
))
#
SIMPLE TASK
self.tasks.append(
SimpleTask(
name=Inventory.InventoryAddDevice,
task_reference_name="test",
input_parameters=SimpleTaskInputParameters(
root=dict(
device_name="IOS01",
zone="uniconfig",
service_state="aha",
mount_body="body"
)
)
)
)
#
START_WORKFLOW TASK
Start Workflow is an operator task used to start another workflow from an existing workflow. Unlike a sub-workflow task, a start workflow task doesn’t create a relationship between the current workflow and the newly started workflow. That means it doesn’t wait for the started workflow to get completed.
#
INPUT PARAMETERS
start_workflow:
- StartWorkflowTaskInputParameters : StartWorkflowTaskPlainInputParameters|StartWorkflowTaskFromDefInputParameters
- StartWorkflowTaskPlainInputParameters
- StartWorkflowTaskFromDefInputParameters
workflow_input_parameters = {
InventoryWorkflows.InstallDeviceByName.WorkflowInput().device_name.name: "IOS01"
}
task_inputs = StartWorkflowTaskInputParameters(
start_workflow=StartWorkflowTaskFromDefInputParameters(
workflow=InventoryWorkflows.InstallDeviceByName,
input=workflow_input_parameters
)
)
self.tasks.append(StartWorkflowTask(
name="Install_device_by_name",
task_reference_name="start",
input_parameters=task_inputs
))
#
SUBWORKFLOW TASK
sub_workflow_param = SubWorkflowParam(
name=InventoryWorkflows.AddDeviceToInventory.__name__,
version=1
)
workflows_inputs = InventoryWorkflows.AddDeviceToInventory.WorkflowInput()
sub_workflow_input = {}
sub_workflow_input.setdefault(workflows_inputs.device_name.name, "IOS01")
sub_workflow_input.setdefault(workflows_inputs.zone.name, "uniconfig")
self.tasks.append(SubWorkflowTask(
name="subworkflow",
task_reference_name="subworkflow",
sub_workflow_param=sub_workflow_param,
input_parameters=SubWorkflowInputParameters(
root=sub_workflow_input
)
))
SubWorkflowFromDefParam validate subworkflow and workflow inputs
sub_workflow_param = SubWorkflowFromDefParam(
name=InventoryWorkflows.AddDeviceToInventory
)
workflows_inputs = InventoryWorkflows.AddDeviceToInventory.WorkflowInput()
sub_workflow_input = {}
sub_workflow_input.setdefault(workflows_inputs.device_name.name, "IOS01")
sub_workflow_input.setdefault(workflows_inputs.zone.name, "uniconfig")
self.tasks.append(SubWorkflowTask(
name="subworkflow",
task_reference_name="subworkflow",
sub_workflow_param=sub_workflow_param,
input_parameters=SubWorkflowInputParameters(
root=sub_workflow_input
)
))
#
SWITCH TASK
- SwitchTaskValueParamInputParameters -> VALUE-PARAM
- SwitchTaskInputParameters -> JAVASCRIPT
VALUE-PARAM evaluator type
switch = SwitchTask(
name="switch",
task_reference_name="switch",
decision_cases={
"true": [
WaitDurationTask(
name="wait",
task_reference_name="wait1",
input_parameters=WaitDurationTaskInputParameters(
duration="10 seconds"
)
)
]},
default_case=[
WaitDurationTask(
name="wait",
task_reference_name="wait2",
input_parameters=WaitDurationTaskInputParameters(
duration="10 seconds"
)
)
],
expression="switch_case_value",
evaluator_type=SwitchEvaluatorType.VALUE_PARAM,
input_parameters=SwitchTaskValueParamInputParameters(
switch_case_value="${workflow.input.value}"
)
)
self.tasks.append(switch)
switch = SwitchTask(
name="switch",
task_reference_name="switch",
decision_cases={
"true": [
WaitDurationTask(
name="wait",
task_reference_name="wait1",
input_parameters=WaitDurationTaskInputParameters(
duration="10 seconds"
)
)
]},
default_case=[
WaitDurationTask(
name="wait",
task_reference_name="wait2",
input_parameters=WaitDurationTaskInputParameters(
duration="10 seconds"
)
)
],
expression="$.inputValue == 'true' ? 'true' : 'false'",
evaluator_type=SwitchEvaluatorType.JAVASCRIPT,
input_parameters=SwitchTaskInputParameters(
input_value="${workflow.input.value}"
)
)
self.tasks.append(switch)
#
TERMINATE TASK
TerminateTask(
name="terminate",
task_reference_name="terminate",
input_parameters=TerminateTaskInputParameters(
termination_status=WorkflowStatus.COMPLETED,
workflow_output={"output": "COMPLETED"}
)
)
#
WAIT_DURATION TASK
self.tasks.append(WaitDurationTask(
name="WAIT",
task_reference_name="WAIT",
input_parameters=WaitDurationTaskInputParameters(
duration="10 seconds"
)
))
#
WAIT_UNTIL TASK
self.tasks.append(WaitUntilTask(
name="WAIT_UNTIL",
task_reference_name="WAIT_UNTIL",
input_parameters=WaitUntilTaskInputParameters(
until='2022-12-25 09:00 PST'
)
))