#Frinx Python SDK

The FRINX Python SDK is a flexible tool designed to simplify interaction with the FRINX network automation solutions. This SDK provides a set of Python libraries and utilities that enable developers to easily integrate with FRINX's platform, streamline their network automation workflows, and leverage FRINX's capabilities to the fullest.

#Project init

For the beggining, you will need to create new python project. Example of simple project can be found here We recommend to use poetry as a package management tool and use package from pypi.org

#pyproject.toml

# pyproject.toml [tool.poetry] name = "example_worker" version = "0.0.1" description = "Example of worker implementation" [tool.poetry.dependencies] python = "^3.10" frinx-python-sdk = "^2" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api"

#Worker definition

In worker.py we want to create some piece of logic, which will be used in workflows to solve complex problems. Our worker has predefined interface, where you can define conductor task definitions, inputs and outpus. In method execute, you can implement your custom logic. For example call to our service, parse data from previous task or any other code you want.

# workers.test_worker.py # imports from frinx-python-sdk from frinx.common.conductor_enums import TaskResultStatus from frinx.common.type_aliases import ListAny from frinx.common.worker.service import ServiceWorkersImpl from frinx.common.worker.task_def import TaskDefinition from frinx.common.worker.task_def import TaskInput from frinx.common.worker.task_def import TaskOutput from frinx.common.worker.task_result import TaskResult from frinx.common.worker.worker import WorkerImpl # ServiceWorkersImpl: Group workers based on purpose to one class. Then all workers can be registered together class TestWorkers(ServiceWorkersImpl): # WorkerImpl: Specific worker implementation with predefined interfaces class Echo(WorkerImpl): # TaskDefinition: Define conductor task parameters # https://docs.conductor-oss.org/documentation/configuration/taskdef.html class WorkerDefinition(TaskDefinition): name: str = 'TEST_echo' description: str = 'testing purposes: returns input unchanged' labels: ListAny = ['TEST'] timeout_seconds: int = 60 response_timeout_seconds: int = 60 # TaskInput: Define conductor task input parameters # These parameters are validated via pydantic before execute method class WorkerInput(TaskInput): input: str # TaskOutput: Define conductor task output values class WorkerOutput(TaskOutput): output: str # Execute method contain task logic. # Method got input parameters and returning TaskResult object def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]: print(worker_input.input) return TaskResult( status=TaskResultStatus.COMPLETED, logs=['Echo worker invoked successfully'], output=self.WorkerOutput(output=worker_input.input) )

#Workflow definition

Now let's use previous created worker in workflow.

# workers.test_workflow.py # imports from frinx-python-sdk from frinx.common.conductor_enums import TimeoutPolicy from frinx.common.type_aliases import ListStr from frinx.common.workflow.service import ServiceWorkflowsImpl from frinx.common.workflow.task import SimpleTask from frinx.common.workflow.task import SimpleTaskInputParameters from frinx.common.workflow.workflow import FrontendWFInputFieldType from frinx.common.workflow.workflow import WorkflowImpl from frinx.common.workflow.workflow import WorkflowInputField # ServiceWorkflowsImpl: Group workflows based on purpose to one class. Then all workflows can be registered together class TestWorkflows(ServiceWorkflowsImpl): # WorkflowImpl: Specific workflow implementation with predefined interfaces # https://docs.conductor-oss.org/documentation/configuration/workflowdef/index.html class TestWorkflow(WorkflowImpl): name: str = 'Test_workflow' version: int = 1 description: str = 'Test workflow built from test workers' labels: ListStr = ['TEST'] timeout_seconds: int = 60 * 5 timeout_policy: TimeoutPolicy = TimeoutPolicy.TIME_OUT_WORKFLOW # WorkflowImpl.WorkflowInput: Define workflow inputs # WorkflowInputField: Define, how input will be rendered in UI during execution class WorkflowInput(WorkflowImpl.WorkflowInput): text: WorkflowInputField = WorkflowInputField( name='text', frontend_default_value='hello world', description='How many seconds to sleep during the workflow', type=FrontendWFInputFieldType.STRING, ) # WorkflowImpl.WorkflowOutput: Define workflow outputs # Parameters in workflowOutput are still strings, because they keep only # reference to task output class WorkflowOutput(WorkflowImpl.WorkflowOutput): text: str # workflow_builder method helps you build workflow JSON representation via code # you can make any logic here to handle various situation # for more info, how to create workflows see workflow-builder docs # https://docs.conductor-oss.org/devguide/labs/first-workflow.html def workflow_builder(self, workflow_inputs: WorkflowInput) -> None: echo_task = SimpleTask( name=TestWorkers.Echo, task_reference_name='echo', input_parameters=SimpleTaskInputParameters( root=dict( input=workflow_inputs.text.wf_input() ) ) ) self.tasks.append(echo_task) self.output_parameters = self.WorkflowOutput( text=echo_task.output_ref('output') )

#Start worker

Now implement conductor client, which registers our workflow and executes custom worker logic.

# main.py import logging import os # Import conductor client from frinx.client.v2.frinx_conductor_wrapper import FrinxConductorWrapper from frinx.common.logging import logging_common from frinx.common.logging.logging_common import LoggerConfig from frinx.common.logging.logging_common import Root # Register your tasks def register_tasks(conductor_client: FrinxConductorWrapper) -> None: logging.info('Register tasks') from workers.test_worker import TestWorkers TestWorkers().register(conductor_client=conductor_client) # Register your workflows def register_workflows() -> None: logging.info('Register workflows') from workers.test_workflow import TestWorkflows TestWorkflows().register(overwrite=True) def main() -> None: # Enable logging logging_common.configure_logging( LoggerConfig( root=Root( level=os.environ.get('LOG_LEVEL', 'INFO').upper(), handlers=['console'] ) ) ) # Enable prometheus metrics from frinx.common.telemetry.metrics import Metrics from frinx.common.telemetry.metrics import MetricsSettings Metrics(settings=MetricsSettings(metrics_enabled=True)) # Register conductor client from frinx.common.frinx_rest import CONDUCTOR_HEADERS from frinx.common.frinx_rest import CONDUCTOR_URL_BASE conductor_client = FrinxConductorWrapper( server_url=CONDUCTOR_URL_BASE, # Define polling interval for conductor client. # How often client will ask conductor for task to execute polling_interval=float(os.environ.get('CONDUCTOR_POLL_INTERVAL', 0.1)), # Number of parallel threads, which can execute tasks max_thread_count=int(os.environ.get('CONDUCTOR_THREAD_COUNT', 50)), headers=dict(CONDUCTOR_HEADERS), ) register_tasks(conductor_client) register_workflows() conductor_client.start_workers() if __name__ == '__main__': main()