Data Movement and Integration
Also available as:
PDF
loading table of contents...

Chapter 8. Replicating Data with Falcon

Falcon can replicate data across multiple clusters using DistCp A replication feed allows you to set a retention policy and do it according to the frequency you specify in the feed entity. Falcon uses a pull-based replication mechanism, meaning in every target cluster, for a given source cluster, a coordinator is scheduled that pulls the data using DistCp from the source cluster.

[Note]Note

Due to changes in Hive, for the Oozie HCAT URI (which is used for Hive table feeds) Falcon supports URIs with only one metastore. This applies even if you have multiple metastores configured.

Prerequisites

  • Your environment must meet the HDP versioning requirements described in "Replication Between HDP Versions" in Creating Falcon Entity Definitions.

  • You must have the following components installed on your cluster:

    • HDP: Installed on your cluster (using Ambari or a manual installation)

    • Falcon: Installed on your cluster and the Falcon Service is running

    • Oozie client and server: Installed on your cluster and the Oozie service is running on your cluster

Replicating Data Using the CLI

Define the Data Source: Set Up a Source Cluster Entity

Define where data and processes are stored in the cluster entity.

  1. Create an XML file for the Cluster entity. This file contains all properties for the cluster. Include the XML version:

    <?xml version="1.0"?>
  2. Define the colo and name attributes for the cluster.

    <?xml version="1.0"?>
    <cluster colo="<MyDataCenter>" description="description" 
                name="<MyDataCenterFilename>"> 
    </cluster>

    colo specifies the data center to which this cluster belongs.

    name is the name of the cluster, which must be unique.

  3. Define the interfaces for the cluster. For each interface specify type of interface, endpoint, and Apache version.

    For example:

    <cluster colo="<MyDataCenter>" description="description" 
              name="<MyDataCenterFilename>">
         <interfaces>
              
              <!-- Required for distcp for replications. -->
              <interface type="readonly" endpoint="hftp://nn:50070" version="2.4.2" />
        
              <!-- Needed for writing to HDFS-->
              <interface type="write" endpoint="hdfs://nn:8020" version="2.4.2" />
        
              <!-- Required. An execute interface specifies the interface for job tracker.-->
              <interface type="execute" endpoint="rm:8050" version="2.4.2" />
        
              <!-- Required. A workflow interface specifies the interface for workflow engines, such as Oozie.-->
              <interface type="workflow" endpoint="http://os:11000/oozie/" version="4.0.0" />
        
              <!--A registry interface specifies the interface for the metadata catalog, such as Hive Metastore or HCatalog.-->
              <interface type="registry" endpoint="thrift://hms:9083" version="0.13.0" /> 
        
              <!--Messaging interface specifies the interface for sending alerts.-->
              <interface type="messaging" endpoint="tcp://mq:61616?daemon=true" version="5.1.6" /> 
         </interfaces>
    </cluster>
  4. Provide the locations for the HDFS paths to files.

    For example:

    <cluster colo="<MyDataCenter>" description="description" 
              name="<MyDataCenter>">
         <interfaces>
         
              <!-- Required for distcp for replications. -->   
              <interface type="readonly" endpoint="hftp://nn:50070" version="2.4.2" /> 
              
              <!-- Needed for writing to HDFS-->
              <interface type="write" endpoint="hdfs://nn:8020" version="2.4.2" />
              
              <!-- Needed to write to jobs as MapReduce-->
              <interface type="execute" endpoint="rm:8050" version="2.4.2" /> 
              
              <!-- Required. Submits Oozie jobs.-->
              <interface type="workflow" endpoint="http://os:11000/oozie/" version="4.0.0" /> 
              
              <!--Register/deregister partitions in the Hive Metastore and get events on partition availability-->
              <interface type=”registry" endpoint="thrift://hms:9083" version="0.13.0" /> 
              
              <!--Needed for alerts-->
              <interface type="messaging" endpoint="tcp://mq:61616?daemon=true" version="5.1.6" /> 
         </interfaces>
         
         <locations>
         
              <!--HDFS directories used by the Falcon server-->
              <location name="staging" path="/apps/falcon/prod-cluster/staging" /> 
              <location name="temp" path="/tmp" />
              <location name="working" path="/apps/falcon/prod-cluster/working" />
         </locations>
    </cluster>

    The cluster entity is complete if you are using a non-secure environment. If you are using an environment that is secured with Kerberos, continue on with the next step.

  5. For secure clusters, define the following properties in all your cluster entities as shown below:

    <cluster colo="<MyDataCenter>" description="description" 
              name="<MyDataCenter>">
              
         <interfaces>
         
              <!-- Required for distcp for replications. -->     
              <interface type="readonly" endpoint="hftp://nn:50070" version="2.4.2" /> 
              
              <!-- Needed for writing to HDFS-->
              <interface type="write" endpoint="hdfs://nn:8020" version="2.4.2" />
              
              <!-- Needed to write to jobs as MapReduce-->
              <interface type="execute" endpoint="rm:8050" version="2.4.2" />
              
              <!-- Required. Submits Oozie jobs.-->
              <interface type="workflow" endpoint="http://os:11000/oozie/" version="4.0.0" /> 
              
              <!--Register/deregister partitions in the Hive Metastore and get events on partition availability-->
              <interface type=”registry" endpoint="thrift://hms:9083" version="0.13.0" /> 
              
              <!--Needed for alerts-->
              <interface type="messaging" endpoint="tcp://mq:61616?daemon=true" version="5.1.6" /> 
         </interfaces>
         
         <locations>
         
              <!--HDFS directories used by the Falcon server-->
              <location name="staging" path="/apps/falcon/prod-cluster/staging" /> 
              <location name="temp" path="/tmp" />
              <location name="working" path="/apps/falcon/prod-cluster/working" />
         </locations> 
         
         <properties>
              <property name="dfs.namenode.kerberos.principal" value="nn/$my.internal@EXAMPLE.COM"/>
              <property name="hive.metastore.kerberos.principal" value="hive/$my.internal@EXAMPLE.COM"/>
              <property name="hive.metastore.uris" value="thrift://$my.internal:9083"/>
              <property name="hive.metastore.sasl.enabled" value="true"/>
         </properties>
    </cluster>

    Replace $my.internal@EXAMPLE.COM and $my.internal with your own values.

    [Important]Important

    Make sure hadoop.security.auth_to_local in core-site.xml is consistent across all clusters. Inconsistencies in rules for hadoop.security.auth_to_local can lead to issues with delegation token renewals.

Create the Replication Target: Define a Cluster Entity

Replication targets must also be defined as cluster entities. These entities include:

  • colo and name attributes for the cluster.

  • Interfaces for the cluster.

  • Locations for the HDFS paths to files.

  • (For secure clusters only) security properties.

Create the Feed Entity

The feed entity defines the data set that Falcon replicates. Reference your cluster entities to determine which clusters the feed uses.

  1. Create an XML file for the Feed entity.

    <?xml version="1.0"?>
  2. Describe the feed.

    <?xml version="1.0"?> 
    <feed description="$rawInputFeed" name=”testFeed” xmlns="uri:falcon:feed:0.1">
    </feed>
  3. Specify the frequency of the feed.

    <?xml version="1.0"?>
    <feed description="$rawInputFeed" name=”testFeed” xmlns="uri:falcon:feed:0.1"> 
    
    <!--Feed run frequency-->
    <frequency>hours(1)</frequency>
    
    </feed>
  4. Set how frequently the feed should run.

    For example:

    <?xml version="1.0"?>
    <feed description="$rawInputFeed" name=”testFeed” xmlns="uri:falcon:feed:0.1"> 
    
    <!--Feed run frequency--> 
    <frequency>hours(1)</frequency>
    
    </feed>
  5. (Optional) Set a late-arrival cut-off policy. The supported policies for late data handling are backoff, exp-backoff (default), and final.

    For example, to set the policy to a late cutoff of 6 hours:

    <?xml version="1.0"?>
    <feed description="$rawInputFeed" name=”testFeed” xmlns="uri:falcon:feed:0.1">
    
    <!--Feed run frequency-->
    <frequency>hours(1)</frequency>
    
    <!-- Late arrival cut-off -->
    <late-arrival cut-off="hours(6)”/> 
    
    </feed>
  6. Define your source and target clusters for the feed and set a retention policy for the data.

    For example, for two clusters, MyDataCenter and MyDataCenter-secondary cluster:

    <?xml version="1.0"?>
    <feed description="$rawInputFeed" name=”testFeed” xmlns="uri:falcon:feed:0.1">
    
    <!--Feed run frequency-->
    <frequency>hours(1)</frequency> 
    
    <!-- Late arrival cut-off -->
    <late-arrival cut-off="hours(6)”/> 
    
    <!-- Target clusters for retention and replication. -->
    <clusters> 
         <cluster name="<MyDataCenter>" type="source">
              <validity start="$date" end="$date"/>
              
              <!--Currently delete is the only action available -->
              <retention limit="days($n)" action="delete"> 
         </cluster>
        
         <cluster name="$MyDataCenter-secondary" type="target">
              <validity start="2012-01-01T00:00Z" end="2099-12-31T00:00Z"/>
              <location type="data” path="/churn/weblogs/${YEAR}-${MONTH}-${DAY}-${HOUR} "/>
    	      <retention limit="days(7)" action="delete"/>
         </cluster>
    </clusters>
    </feed>
  7. Specify the HDFS weblogs path locations or Hive table locations. For example to specify the HDFS weblogs location:

    <?xml version="1.0"?>
                        
    <feed description="$rawInputFeed" name=”testFeed” xmlns="uri:falcon:feed:0.1">
    
    <!--Feed run frequency-->
    <frequency>hours(1)</frequency> 
    
    <!-- Late arrival cut-off -->
    <late-arrival cut-off="hours(6)”/> 
    
    <!-- Target clusters for retention and replication. -->
    <clusters> 
         <cluster name="<MyDataCenter>" type="source">
              <validity start="$date" end="$date"/>
         
              <!--Currently delete is the only action available -->
              <retention limit="days($n)" action="delete"> 
         </cluster>
         
         <cluster name="$MyDataCenter-secondary" type="target">
              <validity start="2012-01-01T00:00Z" end="2099-12-31T00:00Z"/>
              <location type="data” path="/churn/weblogs/${YEAR}-${MONTH}-${DAY}-${HOUR} "/>
              <retention limit="days(7)" action="delete"/>
         </cluster>
     </clusters> <locations>
    
    <!-- Global location across clusters - HDFS paths or Hive tables -->
    <location type="data" path="/weblogs/${YEAR}-${MONTH}-${DAY}-${HOUR} "/>
    </locations>
    </feed>
  8. Specify HDFS ACLs. Set the owner, group, and level of permissions for HDFS. For example:

    <?xml version="1.0"?>
    <feed description="$rawInputFeed" name=”testFeed” xmlns="uri:falcon:feed:0.1">
    
    <!--Feed run frequency-->
    <frequency>hours(1)</frequency>
    
    <!-- Late arrival cut-off -->
    <late-arrival cut-off="hours(6)”/>
    
    <!-- Target clusters for retention and replication. -->
    <clusters> 
         <cluster name="<MyDataCenter>" type="source">
         <validity start="$date" end="$date"/>
         
         <!--Currently delete is the only action available -->
         <retention limit="days($n)" action="delete"> 
         </cluster>
         
         <cluster name="$MyDataCenter-secondary" type="target">
         <validity start="2012-01-01T00:00Z" end="2099-12-31T00:00Z"/>
         <location type="data” path="/churn/weblogs/${YEAR}-${MONTH}-${DAY}-${HOUR} "/>
         <retention limit="days(7)" action="delete"/>
         </cluster>
    </clusters>
    
    <!-- Global location across clusters - HDFS paths or Hive tables -->
    <locations> 
         <location type="data" path="/weblogs/${YEAR}-${MONTH}-${DAY}-${HOUR} "/>
    </locations>  
    
    <!-- Required for HDFS. -->
    <ACL owner="hdfs" group="users" permission="0755"/>  
    
    </feed>
  9. Specify the location of the schema file for the feed as well as the provider of the schema like protobuf, thrift etc. For example:

    <?xml version="1.0"?>
    <feed description="$rawInputFeed" name=”testFeed” xmlns="uri:falcon:feed:0.1">
    
    <!--Feed run frequency-->
    <frequency>hours(1)</frequency>
    
    <!-- Late arrival cut-off -->
    <late-arrival cut-off="hours(6)”/> 
    
    <!-- Target clusters for retention and replication. -->
    <clusters> 
         <cluster name="<MyDataCenter>" type="source">
         <validity start="$date" end="$date"/>
         
         <!--Currently delete is the only action available -->
         <retention limit="days($n)" action="delete"> 
         </cluster>
         
         <cluster name="$MyDataCenter-secondary" type="target">
         <validity start="2012-01-01T00:00Z" end="2099-12-31T00:00Z"/>
         <location type="data” path="/churn/weblogs/${YEAR}-${MONTH}-${DAY}-${HOUR} "/>
         <retention limit="days(7)" action="delete"/>
         </cluster>
    </clusters>
    
    <!-- Global location across clusters - HDFS paths or Hive tables -->
    <locations> 
         <location type="data" path="/weblogs/${YEAR}-${MONTH}-${DAY}-${HOUR} "/>
    </locations>
    
    <!-- Required for HDFS. -->
    <ACL owner="hdfs" group="users" permission="0755"/>  
    <schema location="/schema" provider="protobuf"/>  
    </feed>

Submit and Schedule the Entities

You must submit and schedule all of your entities. The entity definitions are validated as part of the submission process.

  1. Submit all of your entities using the following syntax:

    falcon entity -type <EntityType> -submit -file <YourEntity.xml>
  2. Schedule your feed and process entities using the following syntax:

    falcon entity -type <EntityType> -name <YourEntity -schedule

See "Submitting and Scheduling an Entity Using the CLI" in Creating Falcon Entity Definitions for more specific instructions.

Confirm Results

To confirm your results, check your target cluster and review your Oozie jobs.