Using different connectors in SSB

You can use the supported connectors to create end-to-end streaming analytics processes based on your requirement.

Using Filesystem source

You can use the Filesystem connector in the Community Edition to process files which are stored on the filesystem.

To use the Filesystem connector, you need to either copy the files to the persistent docker volume provided with the Community Edition, or map bind mounts from the host filesystem.

How to copy files to the Docker volume?

To copy existing files directly to the cluster for processing, you can use the ‘docker’ command to copy them directly to the persistent volume.

First, determine the name of the Job Manager container. You can do this by running the following command:
$ docker ps -f "label=com.docker.compose.service=flink-jobmanager" --format="{{.Names}}"
csp-ce_flink-jobmanager_1

In the examples below, csp-ce_flink-jobmanager_1 is the name of the container.

Next, copy your file to the shared volume in the container. For example, if you have a file in your local directory called example.json that you would like to process, you can copy it to the cluster with the following docker command:
docker cp example.json csp-ce_flink-jobmanager_1:/persistent

Make sure to substitute the container name for the one you located in the previous step.

You can check if the file is successfully copied to the persistent directory. First, list the IDs of the container by using the docker ps. Search for the flink-jobmanager entry and copy the ID of the Job Manager. The ID should look like this: a321a34f7032

After this, you need to access the container using the following command where you need to replace the container_id:
docker exec -it <container_id> bin/bash
When you access the container, go to the persistent directory and list out the files there:
cd /persistent
ls

The example.json should be listed in the persistent directory.

When you create a table using the Filesystem connector as shown in Creating a Filesystem backed table, you need to reference the path of the copied file in the container: file:///persistent/example.json

How to make files accessible using bind mount in Docker?
If you have many or large files, it is easier to reference a directory on the host machine using a bind mount as you do not need to copy the files to the volumes. To make the directories available, open the docker-compose.yml file in an editor, and locate the entry for the Flink Job Manager, and add the references to your files:
flink-jobmanager:
         …
    volumes:
      - flink-volume:/persistent
      - /your/local/volume:/mnt/local0
      - /your/other/volume:/mnt/local1

Once you have added the volumes to the flink-jobmanager entry, you need to add them to the flink-taskmanager entry as well.

When you create a table using the Filesystem connector as shown in Creating a Filesystem backed table, you need to reference the path of the mounted file in the volume:

file:///mnt/local0/example.json

Creating a Filesystem backed table in Streaming SQL Console

After setting up the local file, open the Streaming SQL Console, and execute the following example:
DROP TABLE IF EXISTS my_file_source;
CREATE TABLE `ssb`.`ssb_default`.`my_file_source` (
  `foo` VARCHAR(2147483647)
) WITH (
  'format' = 'json',
  'path' = 'file:///persistent/example.json',
  'connector' = 'filesystem'
);

SELECT * FROM my_file_source;

This example parses JSON serialized input with rows delimited by a newline. When using JSON, the columns defined match the keys in the JSON document.

The Filesystem connector also supports a number of other formats, such as ‘csv’, or ‘raw’. The ‘raw’ format is a good option for arbitrary format log files with no specific serialization. In this case, each line of data (delimited by newline) is returned in a single column, and can be split or parsed using User Defined Functions, SQL string functions, regular expressions, and so on.

Using PostgreSQL CDC connector

The Community Edition comes pre-configured with a PostgreSQL database that can be used to try Change Data Capture (CDC). After setting up a table in the PostgreSQL database, you can use the connector for your SQL queries.

A database named ssb_cdc has been created with the necessary configuration. An ssb_cdc role (password: cloudera) has also been added that can be used to access the changelog streams.

Before using the PostgreSQL connector in, you need to up a CDC table in the PostgreSQL database.

As an example, create the following table in PostgreSQL:
CREATE TABLE cdc_table (a varchar, b int);
Set the permissions to the created table:
ALTER TABLE cdc_table REPLICA IDENTITY FULL;

After setting up the database, you can use it in Streaming SQL Console.

To subscribe to the changelog stream in SSB, create a table and check the data in the database using the following example:
CREATE TABLE `ssb`.`ssb_default`.`cdc_postgres_table` (
  `a` VARCHAR(2147483647),
  `b` INT,
  `ts` AS PROCTIME()
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'postgresql',
  'database-name' = 'ssb_cdc',
  'schema-name' = 'public',
  'table-name' = 'cdc_table',
  'username' = 'ssb_cdc',
  'password' = 'cloudera',
  'decoding.plugin.name' = 'pgoutput',
  'debezium.slot.name' = 'flink'
);

SELECT * from cdc_postgres_table;

Insert or update a row in the PostgreSQL table using your database tool of choice. A row representing the change will show up in the Results tab.