Webhook connector
The Webhook connector can be used as HTTP POST/PUT with templates and headers.
To use the webhook connector with Python UDFs:
Create your UDF using the SSB UI. Example:
from pyflink.table.udf import udf from pyflink.table import DataTypes from pyflink.table import Row @udf(result_type=DataTypes.ROW([ DataTypes.FIELD("transactionId", DataTypes.STRING()), DataTypes.FIELD("ts", DataTypes.STRING()), DataTypes.FIELD("accountId", DataTypes.STRING()), DataTypes.FIELD("amount", DataTypes.STRING()) ])) def udf_function(transactionId, ts, accountId, amount): if amount % 2 != 0: processed_amount = amount * 3 return Row(str(transactionId), str(ts), str(accountId), str(processed_amount))
- Create a webhook table. Refer to Creating Webhook tables for more information.
- Write into the webhook table using the UDF in the query's
WHERE
statement. Example:INSERT INTO %s SELECT transactionId, ts, accountId, amount FROM ( SELECT transaction(transactionId, ts, accountId, amount) FROM %s" ) AS udf_results WHERE transactionId IS NOT NULL AND ts IS NOT NULL " AND accountId IS NOT NULL " " AND amount IS NOT NULL", destinationName, sourceName);