Using different connectors in SSB
You can use the supported connectors to create end-to-end streaming analytics processes based on your requirement.
Using Filesystem source
You can use the Filesystem connector in the Community Edition to process files which are stored on the filesystem.
To use the Filesystem connector, you need to either copy the files to the persistent docker volume provided with the Community Edition, or map bind mounts from the host filesystem.
- How to copy files to the Docker volume?
To copy existing files directly to the cluster for processing, you can use the
docker
command to copy them directly to the persistent volume.First, determine the name of the Job Manager container. You can do this by running the following command:$ docker ps -f "label=com.docker.compose.service=flink-jobmanager" --format="{{.Names}}" csp-ce_flink-jobmanager-1
In the examples below,
csp-ce_flink-jobmanager-1
is the name of the container.Next, copy your file to the shared volume in the container. For example, if you have a file in your local directory calledexample.json
that you would like to process, you can copy it to the cluster with the following docker command:docker cp example.json csp-ce_flink-jobmanager-1:/persistent
Make sure to substitute the container name for the one you located in the previous step.
To check that the file is successfully copied, first connect to the container with the following command:docker exec -it csp-ce_flink-jobmanager-1 bin/bash
When you access the container, go to the persistent directory and list out the files there:cd /persistent ls
The
example.json
should be listed in the persistent directory.When you create a table using the Filesystem connector as shown in Creating a Filesystem backed table, you need to reference the path of the copied file in the container:
file:///persistent/example.json
- How to make files accessible using bind mount in Docker?
-
If you have many or large files, it is easier to reference a directory on the host machine using a bind mount as you do not need to copy the files to the volumes. To make the directories available, open the
docker-compose.yml
file in an editor, and locate the entry for the Flink Job Manager, and add the references to your files:flink-jobmanager: … volumes: - flink-volume:/persistent - /your/local/volume:/mnt/local0 - /your/other/volume:/mnt/local1
Once you have added the volumes to the
flink-jobmanager
entry, you need to add them to theflink-taskmanager
entry as well.When you create a table using the Filesystem connector as shown in Creating a Filesystem backed table, you need to reference the path of the mounted file in the volume:
file:///mnt/local0/example.json
Creating a Filesystem backed table in Streaming SQL Console
DROP TABLE IF EXISTS my_file_source;
CREATE TABLE `ssb`.`ssb_default`.`my_file_source` (
`foo` VARCHAR(2147483647)
) WITH (
'format' = 'json',
'path' = 'file:///persistent/example.json',
'connector' = 'filesystem'
);
SELECT * FROM my_file_source;
This example parses JSON serialized input with rows delimited by a newline. When using JSON, the columns defined match the keys in the JSON document.
The Filesystem connector also supports a number of other formats, such as CSV or raw. The raw format is a good option for arbitrary format log files with no specific serialization. In this case, each line of data (delimited by newline) is returned in a single column, and can be split or parsed using User Defined Functions, SQL string functions, regular expressions, and so on.
Using PostgreSQL CDC connector
The Community Edition comes pre-configured with a PostgreSQL database that can be used to try Change Data Capture (CDC). After setting up a table in the PostgreSQL database, you can use the connector for your SQL queries.
A database named ssb_cdc
has been created with the necessary configuration.
An ssb_cdc
role (password: cloudera
) has also been added
that can be used to access the changelog streams.
Before using the PostgreSQL connector, you need to up a CDC table in the PostgreSQL database.
psql
client and connect
to the ssb_cdc
database:docker exec -it <psql_container_name_or_id> /bin/bash
psql
\c ssb_cdc
CREATE TABLE cdc_table (a varchar, b int);
Set the
permissions to the created
table:ALTER TABLE cdc_table REPLICA IDENTITY FULL;
After setting up the database, you can use it in Streaming SQL Console.
CREATE TABLE `ssb`.`ssb_default`.`cdc_postgres_table` (
`a` VARCHAR(2147483647),
`b` INT,
`ts` AS PROCTIME()
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgresql',
'database-name' = 'ssb_cdc',
'schema-name' = 'public',
'table-name' = 'cdc_table',
'username' = 'ssb_cdc',
'password' = 'cloudera',
'decoding.plugin.name' = 'pgoutput',
'debezium.slot.name' = 'flink'
);
SELECT * from cdc_postgres_table;
Insert or update a row in the PostgreSQL table using your database tool of choice.
psql
client and manually insert values to the
cdc_table
:psql
\c ssb_cdc
insert into cdc_table values ('Hello', 123);")
A row representing the change will show up in the Results tab.