#Workflow builder

In this part, you can find common implementation of various conductor tasks. It's pythonic representation of conductor operators and system tasks This implementation depends on frinx-python-sdk and uses Pydantic to serialize workflow definition to result in JSON format.

#DECISION TASK

self.tasks.append(DecisionTask( name="decision", task_reference_name="decision", decision_cases={ "true": [ HumanTask( name="human", task_reference_name="human" ) ], }, default_case=[ TerminateTask( name="terminate", task_reference_name="terminate", input_parameters=TerminateTaskInputParameters( termination_status=WorkflowStatus.FAILED )) ], input_parameters=DecisionTaskInputParameters( status="${workflow.input.status}" ), case_expression="$.status === 'true' ? 'true' : 'false'" ))

#DO_WHILE TASK

loop_tasks = WaitDurationTask( name="wait", task_reference_name="wait", input_parameters=WaitDurationTaskInputParameters( duration="1 seconds" ) ) self.tasks.append(DoWhileTask( name="do_while", task_reference_name="LoopTask", loop_condition="if ( $.LoopTask['iteration'] < $.value ) { true; } else { false; }", loop_over=[ loop_tasks ], input_parameters={ "value": workflow_inputs.value.wf_input } ))

#DYNAMIC_FORK TASK

task_inputs = InventoryWorkflows.InstallDeviceByName.WorkflowInput() fork_inputs = [ { task_inputs.device_name.name: "IOS01" }, { task_inputs.device_name.name: "IOS02" }, { task_inputs.device_name.name: "IOS02" } ] self.tasks.append(DynamicForkTask( name="dyn_fork", task_reference_name="dyn_fork", input_parameters=DynamicForkArraysTaskFromDefInputParameters( fork_task_name=InventoryWorkflows.InstallDeviceByName, fork_task_inputs=fork_inputs ), )) self.tasks.append(JoinTask( name="join", task_reference_name="join" ))

#EVENT TASK

self.tasks.append(EventTask( name="Event", task_reference_name="event_a", sink="conductor:Wait_task", async_complete=False ))

#EXCLUSIVE_JOIN TASK

self.tasks.append(ExclusiveJoinTask( name="exclusive_join", task_reference_name="exclusive_join", ))

#FORK_JOIN TASK

fork_tasks_a = [] fork_tasks_b = [] fork_tasks_a.append(SimpleTask( name=Inventory.InventoryAddDevice, task_reference_name="add_device_cli", input_parameters=SimpleTaskInputParameters( root=dict( device_name="IOS01", zone="uniconfig", service_state="IN_SERVICE", mount_body="body" ) ) )) fork_tasks_a.append(SimpleTask( name=Inventory.InventoryInstallDeviceByName, task_reference_name="install_device_cli", input_parameters=SimpleTaskInputParameters( root=dict( device_name="IOS01" ) ) )) fork_tasks_b.append(SimpleTask( name=Inventory.InventoryAddDevice, task_reference_name="add_device_netconf", input_parameters=SimpleTaskInputParameters( root=dict( device_name="NTF01", zone="uniconfig", service_state="IN_SERVICE", mount_body="body" ) ) )) fork_tasks_b.append(SimpleTask( name=Inventory.InventoryInstallDeviceByName, task_reference_name="install_device_netconf", input_parameters=SimpleTaskInputParameters( root=dict( device_name="NTF01" ) ) )) self.tasks.append(ForkTask( name="fork", task_reference_name="fork", fork_tasks=[ fork_tasks_a, fork_tasks_b ] ))

#HUMAN TASK

self.tasks.append(HumanTask( name="human", task_reference_name="human" ))

#INLINE TASK

self.tasks.append(InlineTask( name="inline", task_reference_name="inline", input_parameters=InlineTaskInputParameters( expression='if ($.value){return {"result": true}} else { return {"result": false}}', value="${workflow.variables.test}" )))

INFO: expression wrapped into javascript function:

expression = "function e() { if ($.value){return {\"result\": true}} else { return {\"result\": false}} } e();"

#JOIN TASK

Read more on conductor-oss docs

self.tasks.append(JoinTask( name="join", task_reference_name="join" ))

#JSON_JQ_TRANSFORM TASK

json_jq = JsonJqTask( name="json_jq", task_reference_name="json_jq", input_parameters=JsonJqTaskInputParameters( query_expression="{ key3: (.key1.value1 + .key2.value2) }", key_1={ "value1": [ "a", "b" ] }, key2={ "value2": [ "c", "d" ] } ) ) self.tasks.append(json_jq)

#SET_VARIABLE TASK

self.tasks.append(SetVariableTask( name="var", task_reference_name="var", input_parameters=SetVariableTaskInputParameters( root=dict( env="frinx" ) ) ))

#SIMPLE TASK

self.tasks.append( SimpleTask( name=Inventory.InventoryAddDevice, task_reference_name="test", input_parameters=SimpleTaskInputParameters( root=dict( device_name="IOS01", zone="uniconfig", service_state="aha", mount_body="body" ) ) ) )

#START_WORKFLOW TASK

Start Workflow is an operator task used to start another workflow from an existing workflow. Unlike a sub-workflow task, a start workflow task doesn’t create a relationship between the current workflow and the newly started workflow. That means it doesn’t wait for the started workflow to get completed.

#INPUT PARAMETERS

start_workflow:

  • StartWorkflowTaskInputParameters : StartWorkflowTaskPlainInputParameters|StartWorkflowTaskFromDefInputParameters
  • StartWorkflowTaskPlainInputParameters
  • StartWorkflowTaskFromDefInputParameters
workflow_input_parameters = { InventoryWorkflows.InstallDeviceByName.WorkflowInput().device_name.name: "IOS01" } task_inputs = StartWorkflowTaskInputParameters( start_workflow=StartWorkflowTaskFromDefInputParameters( workflow=InventoryWorkflows.InstallDeviceByName, input=workflow_input_parameters ) ) self.tasks.append(StartWorkflowTask( name="Install_device_by_name", task_reference_name="start", input_parameters=task_inputs ))

#SUBWORKFLOW TASK

sub_workflow_param = SubWorkflowParam( name=InventoryWorkflows.AddDeviceToInventory.__name__, version=1 ) workflows_inputs = InventoryWorkflows.AddDeviceToInventory.WorkflowInput() sub_workflow_input = {} sub_workflow_input.setdefault(workflows_inputs.device_name.name, "IOS01") sub_workflow_input.setdefault(workflows_inputs.zone.name, "uniconfig") self.tasks.append(SubWorkflowTask( name="subworkflow", task_reference_name="subworkflow", sub_workflow_param=sub_workflow_param, input_parameters=SubWorkflowInputParameters( root=sub_workflow_input ) ))

#SWITCH TASK

  • SwitchTaskValueParamInputParameters -> VALUE-PARAM
  • SwitchTaskInputParameters -> JAVASCRIPT

VALUE-PARAM evaluator type

switch = SwitchTask( name="switch", task_reference_name="switch", decision_cases={ "true": [ WaitDurationTask( name="wait", task_reference_name="wait1", input_parameters=WaitDurationTaskInputParameters( duration="10 seconds" ) ) ]}, default_case=[ WaitDurationTask( name="wait", task_reference_name="wait2", input_parameters=WaitDurationTaskInputParameters( duration="10 seconds" ) ) ], expression="switch_case_value", evaluator_type=SwitchEvaluatorType.VALUE_PARAM, input_parameters=SwitchTaskValueParamInputParameters( switch_case_value="${workflow.input.value}" ) ) self.tasks.append(switch)

#TERMINATE TASK

TerminateTask( name="terminate", task_reference_name="terminate", input_parameters=TerminateTaskInputParameters( termination_status=WorkflowStatus.COMPLETED, workflow_output={"output": "COMPLETED"} ) )

#WAIT_DURATION TASK

self.tasks.append(WaitDurationTask( name="WAIT", task_reference_name="WAIT", input_parameters=WaitDurationTaskInputParameters( duration="10 seconds" ) ))

#WAIT_UNTIL TASK

self.tasks.append(WaitUntilTask( name="WAIT_UNTIL", task_reference_name="WAIT_UNTIL", input_parameters=WaitUntilTaskInputParameters( until='2022-12-25 09:00 PST' ) ))