Data querying with SQL Client

Learn more about querying data with Flink SQL Client using Kafka catalogs.

You need to configure the SQL Client using the following methods:
  • SQL Client Default settings - default SQL configurations that are specific for Catalogs and connectors in the /etc/flink/conf/sql-client-defaults.yaml file.
  • Custom environment settings - using the -e sql-env.yaml paramter when starting the SQL client can override the regular Flink and Default SQL settings.

As every Flink SQL query is an independent Flink job, you can decide if you want to run them as standalone (per-job) YARN applications, or you can run them on a Flink session cluster.

For more information about the needed configurations, see the SQL Client documentation.

  1. Start the SQL Client.
    flink-sql-client embedded \
    -D security.kerberos.login.principal=<username> \
    -D security.kerberos.login.keytab=<keytab_name>
  2. Create a source table based on your streaming application parameters.
    The following table can be created when using the Stateful Flink Application tutorial:
    CREATE TABLE ItemTransactions (
    	transactionId    BIGINT,
    	ts    BIGINT,
    	itemId    STRING,
    	quantity INT,
    	event_time AS CAST(from_unixtime(floor(ts/1000)) AS TIMESTAMP(3)),
    	WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
    	'connector.type'    	 = 'kafka',
    	'connector.version' 	 = 'universal',
    	'connector.topic'   	 = 'transaction.log.1',
    	'connector.startup-mode' = 'earliest-offset',
    	'connector.properties.bootstrap.servers' = '<broker_address>',
    	'connector.properties.security.protocol'            = 'SASL_SSL',
           'connector.properties.sasl.kerberos.service.name'   = 'kafka',
           'connector.properties.ssl.truststore.location'      = '<absolute_path_to_jks>',
           'format.type' = 'json'
          );
  3. Check that the table creation was successful with the SHOW TABLES command.
  4. Use the SELECT query to check how the input data is generated.
    The following SELECT query can be used when using the Stateful Flink Application tutorial:
    > SELECT * FROM ItemTransactions;
  5. Press Q to exit the SELECT query and to stop the Flink job.
  6. Use the different SQL statements to create more queries about your streaming application.