# Frinx Python SDK

The FRINX Python SDK is a flexible tool designed to simplify interaction with the FRINX network automation solutions. This SDK provides a set of Python libraries and utilities that enable developers to easily integrate with FRINX's platform, streamline their network automation workflows, and leverage FRINX's capabilities to the fullest.

# Project init

For the beggining, you will need to create new python project. Example of simple project can be found here We recommend to use poetry as a package management tool and use package from pypi.org

# pyproject.toml

# pyproject.toml
[tool.poetry]
name = "example_worker"
version = "0.0.1"
description = "Example of worker implementation"

[tool.poetry.dependencies]
python = "^3.10"
frinx-python-sdk = "^2"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

# Worker definition

In worker.py we want to create some piece of logic, which will be used in workflows to solve complex problems. Our worker has predefined interface, where you can define conductor task definitions, inputs and outpus. In method execute, you can implement your custom logic. For example call to our service, parse data from previous task or any other code you want.

# workers.test_worker.py
# imports from frinx-python-sdk
from frinx.common.conductor_enums import TaskResultStatus
from frinx.common.type_aliases import ListAny
from frinx.common.worker.service import ServiceWorkersImpl  
from frinx.common.worker.task_def import TaskDefinition
from frinx.common.worker.task_def import TaskInput
from frinx.common.worker.task_def import TaskOutput
from frinx.common.worker.task_result import TaskResult
from frinx.common.worker.worker import WorkerImpl

# ServiceWorkersImpl: Group workers based on purpose to one class. Then all workers can be registered together
class TestWorkers(ServiceWorkersImpl):

    # WorkerImpl: Specific worker implementation with predefined interfaces
    class Echo(WorkerImpl):

        # TaskDefinition: Define conductor task parameters 
        # https://docs.conductor-oss.org/documentation/configuration/taskdef.html
        class WorkerDefinition(TaskDefinition):
            name: str = 'TEST_echo'
            description: str = 'testing purposes: returns input unchanged'
            labels: ListAny = ['TEST']
            timeout_seconds: int = 60
            response_timeout_seconds: int = 60

        # TaskInput: Define conductor task input parameters
        # These parameters are validated via pydantic before execute method
        class WorkerInput(TaskInput):
            input: str

        # TaskOutput: Define conductor task output values
        class WorkerOutput(TaskOutput):
            output: str

        # Execute method contain task logic. 
        # Method got input parameters and returning TaskResult object
        def execute(self, worker_input: WorkerInput) -> TaskResult[WorkerOutput]:
            print(worker_input.input)
            return TaskResult(
                status=TaskResultStatus.COMPLETED,
                logs=['Echo worker invoked successfully'],
                output=self.WorkerOutput(output=worker_input.input)
            )
            

# Workflow definition

Now let's use previous created worker in workflow.

# workers.test_workflow.py
# imports from frinx-python-sdk
from frinx.common.conductor_enums import TimeoutPolicy
from frinx.common.type_aliases import ListStr
from frinx.common.workflow.service import ServiceWorkflowsImpl
from frinx.common.workflow.task import SimpleTask
from frinx.common.workflow.task import SimpleTaskInputParameters
from frinx.common.workflow.workflow import FrontendWFInputFieldType
from frinx.common.workflow.workflow import WorkflowImpl
from frinx.common.workflow.workflow import WorkflowInputField


# ServiceWorkflowsImpl: Group workflows based on purpose to one class. Then all workflows can be registered together
class TestWorkflows(ServiceWorkflowsImpl):

    # WorkflowImpl: Specific workflow implementation with predefined interfaces
    # https://docs.conductor-oss.org/documentation/configuration/workflowdef/index.html
    class TestWorkflow(WorkflowImpl):
        name: str = 'Test_workflow'
        version: int = 1
        description: str = 'Test workflow built from test workers'
        labels: ListStr = ['TEST']
        timeout_seconds: int = 60 * 5
        timeout_policy: TimeoutPolicy = TimeoutPolicy.TIME_OUT_WORKFLOW

        # WorkflowImpl.WorkflowInput: Define workflow inputs
        # WorkflowInputField: Define, how input will be rendered in UI during execution 
        class WorkflowInput(WorkflowImpl.WorkflowInput):
            text: WorkflowInputField = WorkflowInputField(
                name='text',
                frontend_default_value='hello world',
                description='How many seconds to sleep during the workflow',
                type=FrontendWFInputFieldType.STRING,
            )

        # WorkflowImpl.WorkflowOutput: Define workflow outputs
        # Parameters in workflowOutput are still strings, because they keep only
        # reference to task output
        class WorkflowOutput(WorkflowImpl.WorkflowOutput):
            text: str

        # workflow_builder method helps you build workflow JSON representation via code
        # you can make any logic here to handle various situation
        # for more info, how to create workflows see workflow-builder docs
        # https://docs.conductor-oss.org/devguide/labs/first-workflow.html
        def workflow_builder(self, workflow_inputs: WorkflowInput) -> None:

            echo_task = SimpleTask(
                name=TestWorkers.Echo,
                task_reference_name='echo',
                input_parameters=SimpleTaskInputParameters(
                    root=dict(
                        input=workflow_inputs.text.wf_input()
                    )
                )
            )
            self.tasks.append(echo_task)

            self.output_parameters = self.WorkflowOutput(
                text=echo_task.output_ref('output')
            )

# Start worker

Now implement conductor client, which registers our workflow and executes custom worker logic.

# main.py
import logging
import os

# Import conductor client
from frinx.client.v2.frinx_conductor_wrapper import FrinxConductorWrapper
from frinx.common.logging import logging_common
from frinx.common.logging.logging_common import LoggerConfig
from frinx.common.logging.logging_common import Root


# Register your tasks
def register_tasks(conductor_client: FrinxConductorWrapper) -> None:
    logging.info('Register tasks')
    from workers.test_worker import TestWorkers
    TestWorkers().register(conductor_client=conductor_client)

# Register your workflows
def register_workflows() -> None:
    logging.info('Register workflows')
    from workers.test_workflow import TestWorkflows
    TestWorkflows().register(overwrite=True)


def main() -> None:

    # Enable logging
    logging_common.configure_logging(
        LoggerConfig(
            root=Root(
                level=os.environ.get('LOG_LEVEL', 'INFO').upper(),
                handlers=['console']
            )
        )
    )

    # Enable prometheus metrics
    from frinx.common.telemetry.metrics import Metrics
    from frinx.common.telemetry.metrics import MetricsSettings

    Metrics(settings=MetricsSettings(metrics_enabled=True))

    # Register conductor client
    from frinx.common.frinx_rest import CONDUCTOR_HEADERS
    from frinx.common.frinx_rest import CONDUCTOR_URL_BASE

    conductor_client = FrinxConductorWrapper(
        server_url=CONDUCTOR_URL_BASE,
        # Define polling interval for conductor client. 
        # How often client will ask conductor for task to execute
        polling_interval=float(os.environ.get('CONDUCTOR_POLL_INTERVAL', 0.1)),
        # Number of parallel threads, which can execute tasks
        max_thread_count=int(os.environ.get('CONDUCTOR_THREAD_COUNT', 50)),
        headers=dict(CONDUCTOR_HEADERS),
    )

    register_tasks(conductor_client)
    register_workflows()
    conductor_client.start_workers()


if __name__ == '__main__':
    main()