Webhook connector

The webhook connector can be used in SQL or through Python UDFs.

Example using the webhook connector in SQL

  1. Create a webhook table. Refer to Creating Webhook tables for more information.
  2. Write into the webhook table the query's INSERT statement. Example:

    insert into [***WEBHOOK TABLE***]
    select transactionId, ts, accountId, amount * 3
    from [***SOURCE TABLE***]
    where amount % 2 != 0

Example using the webhook connector with Python UDFs

  1. Create your UDF using the Cloudera SQL Stream Builder UI. Example:

    from pyflink.table.udf import udf
    from pyflink.table import DataTypes
    from pyflink.table import Row
    
    @udf(result_type=DataTypes.ROW([
        DataTypes.FIELD("transactionId", DataTypes.STRING()),
        DataTypes.FIELD("ts", DataTypes.STRING()),
        DataTypes.FIELD("accountId", DataTypes.STRING()),
        DataTypes.FIELD("amount", DataTypes.STRING())
    ]))
    def udf_function(transactionId, ts, accountId, amount):
        if amount % 2 != 0:
            processed_amount = amount * 3
            return Row(str(transactionId), str(ts), str(accountId), str(processed_amount))
  2. Create a webhook table. Refer to Creating Webhook tables for more information.
  3. Write into the webhook table using the UDF in the query's WHERE statement. Example:

    INSERT INTO [***TABLE NAME***] 
    SELECT transactionId, ts, accountId, amount 
    FROM ( 
        SELECT transaction(transactionId, ts, accountId, amount) 
        FROM [***TABLE NAME***]"
    ) AS udf_results 
    WHERE transactionId IS NOT NULL 
      AND ts IS NOT NULL 
    "  AND accountId IS NOT NULL "
    "  AND amount IS NOT NULL",
    destinationName, sourceName);