Creating Python User-defined Functions

With SQL Stream Builder, you can create powerful custom functions in Python to enhance the functionality of SQL.

User functions can be simple translation functions like Celsius to Fahrenheit, more complex business logic, or even looking up data from external sources. User functions are written in Python. When you write them, you can create a library of useful functions.

Python UDFs are disabled by default. To enable them:

  1. Python UDF execution requires the Python Apache Flink and dependent packages on a supported Python version installed on all nodes.
    1. Access your SSB node's command line. (Execute the following commands as root.
    2. Install the Python Apache Flink and dependent packages:
      /usr/local/bin/python[***VERSION***] -m pip install apache-flink==1.19.1
  2. Go to your cluster in Cloudera Manager.
  3. Select SQL Stream Builder from the list of services.
  4. Go to the Configuration tab and set the following configuration properties:
    1. Python Client Executable (ssb.python.client.executable): the path of the Python interpreter used to launch the Python process when submitting the Python jobs via flink run or compiling the jobs containing Python UDFs. For example /usr/bin/python3
    2. Python Executable (ssb.python.executable): the path of the Python interpreter used to execute the python UDF worker. For example /usr/bin/python3
    3. Python UDF Reaper Period (ssb.python.udf.reaper.period.seconds): the interval (in seconds) between two Python UDF Reaper runs, which deletes the Python files of the terminated jobs from the artifact storage.
    4. Python UDF Time To Live (ssb.python.udf.ttl): The minimum lifespan (in milliseconds, seconds, minutes, or hours) of a Python UDF in the artifact storage. After this the Python UDF Reaper can delete the Python files from the artifact storage.
    5. Select Enable Python UDFs in SSB and click on the checkbox.
  5. Enter a reason for change and click Save Changes.
  6. Navigate to Instances.
  7. Select the Streaming SQL Engine from the list. (Click the checkbox on the left.)
  8. Click Restart from the Actions for selected dropdown menu.

To use Python UDFs:

  1. Navigate to the Streaming SQL Console.
    1. Navigate to Management Console > Environments, and select the environment where you have created your cluster.
    2. Select the Streaming Analytics cluster from the list of Data Hub clusters.
    3. Select Streaming SQL Console from the list of services.
      The Streaming SQL Console opens in a new window.
  2. Open a project from the Projects page of Streaming SQL Console.
    1. Select an already existing project from the list by clicking the Open button or Switch button.
    2. Create a new project by clicking the New Project button.
    3. Import a project by clicking the Import button.
    You are redirected to the Explorer view of the project.
  3. Click next to Functions.
  4. Click New Function.
  5. From the dropdown menu select PYTHON.
  6. Add a Name to the UDF.
    For example, name the UDF to ADD_FUNCTION.
  7. Add a Description to the UDF. (Optional.)
  8. Paste the Python code to the editor.
    For example, a simple addition:
    from pyflink.table.udf import udf
    from pyflink.table import DataTypes
    
    @udf(result_type=DataTypes.BIGINT())
    def udf_function(i, j):
     return i + j
    
  9. Click Create.
  10. Once created, you can use the new User Defined Function in your SQL statement or as a computed column when creating a table.
    For example:
    -- as a SQL statement
    SELECT ADD_FUNCTION(myTable.n1, myTable.n2)
    
    -- as a computed column
    CREATE TABLE myTable (
      `number_a` BIGINT,
      `number_b` BIGINT
      `sum` AS ADD_FUNCTION(number_a, number_b)  -- evaluate expression and supply the result to queries
    ) WITH (
      'connector' = 'kafka'
      ...
    );
    

When a Flink job starts, SSB will upload the Python file to the Artifact Storage, accessible for Flink to execute the UDF when called. The ssb.python.udf.reaper.period.seconds and ssb.python.udf.ttl configuration properties (set in Cloudera Manager) control the behavior of SSB to remove Python files associated with terminated jobs.

For more information on using Python UDFs, refer to the Apache Flink documentation.