Running a Flink job

After developing your application, you can submit your Flink job in YARN per-job, session or application mode. To submit the Flink job, you need to run the Flink client in the command line including security parameters and other configurations with the run command.

Submitting a job means uploading the job’s JAR and related dependencies to the Flink cluster and initiating the job execution.

The Flink jobs you submit to the cluster are running on YARN. Submitting a job means that the JAR file of the Flink application is uploaded to the cluster with the related dependencies. and the job execution is initiated. You have the following deployment modes in which you can run your Flink jobs:
  • Per-job mode

    Per-job mode means that you run the Flink job in a dedicated YARN application. In this case each submitted Flink job has its own Flink cluster in YARN, with its own Job Manager and Task Managers. When you run Flink jobs in per-job mode, every job submission creates a new cluster. As the cluster deployment has to be created with every submission, the execution of the job can take up time.

  • Session mode

    Session mode means that you run multiple Flink jobs in the same YARN sessions. In this case every Flink job shares the cluster, the allocated resources, the Job Manager and Task Managers. When you run Flink jobs in session mode, the submitted jobs are created in one cluster and are long-lived. The execution time is shorter than in per-job mode, however you need to consider that in a session mode a cluster failure affects every Flink job, and recreation from a savepoint can take up time.

  • Application mode

    Regarding resource management, Application mode is much like Per-job mode. That means every deployed application has its dedicated Flink cluster. The key difference is the main() method of the deployed application is executed by the Job Manager rather than the client.

For more information about the deployment modes of Flink, see the Apache Flink documentation.

You can set how to run your Flink job with the execution.target setting in the Flink configuration file. By default, execution.target is set to yarn-per-job, but you can change it to yarn-application or yarn-session. Alternatively, you can add the corresponding arguments to the flink run command when submitting the Flink job.

  1. Connect to the cluster using ssh where you want to run the Flink application.
    ssh [***YOUR WORKLOAD USERNAME***]@[***YOUR MASTER NODE FQDN***]
  2. Submit the Flink job using the flink run command.
    Per-job mode
    flink run \
      -d \
      -t yarn-per-job \
      -ynm <job_name_in_yarn> \
      -p <job_parallelism> \
      -ys <slots_per_task_manager> \
      -ytm <memory_per_container_in_mb> \
      <job_jar_file> <job_parameters>
    flink run \
      -d \
      -t yarn-per-job \
      -ynm <job_name_in_yarn> \
      -p <job_parallelism> \
      -ys <slots_per_task_manager> \
      -ytm <memory_per_container_in_mb> \
      -py <python_script> <job_parameters>
    # Assuming the content of '/tmp/shipfiles': venv.zip, word_count.py
    
    flink run -t yarn-per-job \
    -Dyarn.ship-files=/tmp/shipfiles \
    -pyarch /tmp/shipfiles/venv.zip \
    -pyclientexec venv.zip/bin/python3 \
    -pyexec venv.zip/bin/python3 \
    -pyfs /tmp/shipfiles \
    -pym word_count
    Session mode
    1. Start a Flink session cluster.
      flink-yarn-session \
        -d \
        -nm <job_name_in_yarn> \
        -s <slots_per_task_manager> \
        -tm <memory_per_container_in_mb>
      The flink-yarn-session command outputs the ID of the corresponding YARN application. You need to add the YARN application ID to the flink run command.
      YARN ApplicationID: application_1616633166424_0024
    2. Submit the Flink job.
      flink run \
        -d \
        -t yarn-session \
        -yid <application_id> \
        -p <parallelism> \
        <job_jar_file> <job_parameters>
      flink run \
        -d \
        -t yarn-session \
        -yid <application_id> \
        -p <parallelism> \
        -py <python_script> <job_parameters>
    Application mode
    flink run-application \
      -d \
      -t yarn-application \
      -p <job_parallelism> \
      -ys <slots_per_task_manager> \
      -ytm <memory_per_container_in_mb> \
      <job_jar_file> <job_parameters>
    flink run-application \
      -d \
      -t yarn-application \
      -p <job_parallelism> \
      -ys <slots_per_task_manager> \
      -ytm <memory_per_container_in_mb> \
      -py <python_script> <job_parameters>
    For more examples and PyFlink possibilities, see the Submitting PyFlink Jobs section of the Apache Flink documentation.