Using different connectors in SSB

You can use the supported connectors to create end-to-end streaming analytics processes based on your requirement.

Using Filesystem source

You can use the Filesystem connector in the Community Edition to process files which are stored on the filesystem.

To use the Filesystem connector, you need to either copy the files to the persistent docker volume provided with the Community Edition, or map bind mounts from the host filesystem.

How to copy files to the Docker volume?

To copy existing files directly to the cluster for processing, you can use the docker command to copy them directly to the persistent volume.

First, determine the name of the Job Manager container. You can do this by running the following command:
$ docker ps -f "label=com.docker.compose.service=flink-jobmanager" --format="{{.Names}}"
csp-ce_flink-jobmanager-1

In the examples below, csp-ce_flink-jobmanager-1 is the name of the container.

Next, copy your file to the shared volume in the container. For example, if you have a file in your local directory called example.json that you would like to process, you can copy it to the cluster with the following docker command:
docker cp example.json csp-ce_flink-jobmanager-1:/persistent

Make sure to substitute the container name for the one you located in the previous step.

To check that the file is successfully copied, first connect to the container with the following command:
docker exec -it csp-ce_flink-jobmanager-1 bin/bash
When you access the container, go to the persistent directory and list out the files there:
cd /persistent
ls

The example.json should be listed in the persistent directory.

When you create a table using the Filesystem connector as shown in Creating a Filesystem backed table, you need to reference the path of the copied file in the container: file:///persistent/example.json

How to make files accessible using bind mount in Docker?
If you have many or large files, it is easier to reference a directory on the host machine using a bind mount as you do not need to copy the files to the volumes. To make the directories available, open the docker-compose.yml file in an editor, and locate the entry for the Flink Job Manager, and add the references to your files:
flink-jobmanager:
         …
    volumes:
      - flink-volume:/persistent
      - /your/local/volume:/mnt/local0
      - /your/other/volume:/mnt/local1

Once you have added the volumes to the flink-jobmanager entry, you need to add them to the flink-taskmanager entry as well.

When you create a table using the Filesystem connector as shown in Creating a Filesystem backed table, you need to reference the path of the mounted file in the volume:

file:///mnt/local0/example.json

Creating a Filesystem backed table in Streaming SQL Console

After setting up the local file, open the Streaming SQL Console, and execute the following example:
DROP TABLE IF EXISTS my_file_source;
CREATE TABLE `ssb`.`ssb_default`.`my_file_source` (
  `foo` VARCHAR(2147483647)
) WITH (
  'format' = 'json',
  'path' = 'file:///persistent/example.json',
  'connector' = 'filesystem'
);

SELECT * FROM my_file_source;

This example parses JSON serialized input with rows delimited by a newline. When using JSON, the columns defined match the keys in the JSON document.

The Filesystem connector also supports a number of other formats, such as CSV or raw. The raw format is a good option for arbitrary format log files with no specific serialization. In this case, each line of data (delimited by newline) is returned in a single column, and can be split or parsed using User Defined Functions, SQL string functions, regular expressions, and so on.

Using PostgreSQL CDC connector

The Community Edition comes pre-configured with a PostgreSQL database that can be used to try Change Data Capture (CDC). After setting up a table in the PostgreSQL database, you can use the connector for your SQL queries.

A database named ssb_cdc has been created with the necessary configuration. An ssb_cdc role (password: cloudera) has also been added that can be used to access the changelog streams.

Before using the PostgreSQL connector, you need to up a CDC table in the PostgreSQL database.

After accessing the PostgreSQL instance, start the psql client and connect to the ssb_cdc database:
docker exec -it <psql_container_name_or_id> /bin/bash
psql
\c ssb_cdc
As an example, create the following table in PostgreSQL:
CREATE TABLE cdc_table (a varchar, b int);
Set the permissions to the created table:
ALTER TABLE cdc_table REPLICA IDENTITY FULL;

After setting up the database, you can use it in Streaming SQL Console.

To subscribe to the changelog stream in SSB, create a table and check the data in the database using the following example:
CREATE TABLE `ssb`.`ssb_default`.`cdc_postgres_table` (
  `a` VARCHAR(2147483647),
  `b` INT,
  `ts` AS PROCTIME()
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'postgresql',
  'database-name' = 'ssb_cdc',
  'schema-name' = 'public',
  'table-name' = 'cdc_table',
  'username' = 'ssb_cdc',
  'password' = 'cloudera',
  'decoding.plugin.name' = 'pgoutput',
  'debezium.slot.name' = 'flink'
);

SELECT * from cdc_postgres_table;

Insert or update a row in the PostgreSQL table using your database tool of choice.

For example, you can use the psql client and manually insert values to the cdc_table:
psql
\c ssb_cdc
insert into cdc_table values ('Hello', 123);")

A row representing the change will show up in the Results tab.