BDR Automation Examples

You can use the Cloudera Manager API to automate BDR tasks, such as creating a schedule for a replication. This page describes an automated solution for creating, running, and managing HDFS replication schedules in order to minimize Recovery Point Objectives (RPOs) for late arriving data or to automate recovery after disaster recovery.

For more information about the Cloudera Manager API and how to install the API client, see the following:

Automating HDFS Replication Schedules

Automating HDFS replication with the API is a multi-step process that involves the following tasks:

Step 1. Create a Peer

Before you can create or run a replication schedule, you need a peer Cloudera Manager instance. This peer acts as the source Cloudera Manager instance where data is pulled from. See Designating a Replication Source for more information.

The following code sample shows you how to create a peer:

import cm_client
from cm_client.rest import ApiException
from pprint import pprint

# Configure HTTP basic authorization: basic
cm_client.configuration.username = “[***username***]”
cm_client.configuration.password = “[***password***]”
cm_client.configuration.verify_ssl = False

TARGET_CM_HOST = “[***destination_cluster***]”
SOURCE_CM_URL = “[***source_cluster***]:7180”

# Create an instance of the API class
api_host = TARGET_CM_HOST
port = '7183'
api_version = 'v30'
# Construct base URL for API
# http://cmhost:7180/api/v30api_url = api_host + ':' + port + '/api/' + api_version
api_client = cm_client.ApiClient(api_url)
api_instance = cm_client.CmPeersResourceApi(api_client)

# create peer on the target 
# url= is the source
body = cm_client.ApiCmPeer(cloudera_manager_created_user=True,
  name="peer_name_from_api",
  password="admin",
  type="REPLICATION",
  url=SOURCE_CM_URL, 
  username="admin")

api_response = api_instance.create_peer(body=body)
The above sample creates an API root handle and gets a Cloudera Manager instance from it before creating the peer. To implement a similar solution to the example, keep the following guidelines in mind:
  • Replace <destination_cluster> with the domain name of the destination, for example target.cm.cloudera.com.
  • Replace <source_cluster> with the domain name of the source, for example src.cm.cloudera.com:7180/.
  • The user you specify must possess a role that is capable of creating a peer, such as the Cluster Administrator role.

Step 2. Create the HDFS Replication Schedule

After you have add a peer Cloudera Manager instance that functions as the source, you can create a replication schedule:

import cm_client
from cm_client.rest import ApiException
from pprint import pprint

# Configure HTTP basic authorization: basic
cm_client.configuration.username = “[***username***]”
cm_client.configuration.password = “[***password***]”

cm_client.configuration.verify_ssl = False

TARGET_CM_HOST = “[***destination_cluster***]”


# Create an instance of the API class
api_host = TARGET_CM_HOST
port = '7183'
api_version = 'v30'

# Construct base URL for API
# http://cmhost:7180/api/v30api_url = api_host + ':' + port + '/api/' + api_version
api_client = cm_client.ApiClient(api_url)
api_instance = cm_client.ReplicationsResourceApi(api_client)
cluster_name = 'Cluster 1' # str | 
service_name = 'HDFS-1' # str | The service name.

body = cm_client.ApiReplicationScheduleList([{
  "displayName" : "ScheduleNameFromAPI",
  "startTime" : "2021-03-11T18:28:18.684Z",
  "interval" : 0,
  "intervalUnit" : "MINUTE",
  "alertOnStart" : False,
  "alertOnSuccess" : False,
  "alertOnFail" : False,
  "alertOnAbort" : False,
  "hdfsArguments" : {
    "sourceService" : {
      "peerName" : "peer_name_from_api",
      "clusterName" : "Cluster 1",
      "serviceName" : "HDFS-1"
    },
    "sourcePath" : "/tmp",
    "destinationPath" : "/tmp",
    "mapreduceServiceName" : "YARN-1",
    "userName" : "d",
    "numMaps" : 20,
    "dryRun" : False,
    "bandwidthPerMap" : 100,
    "abortOnError" : False,
    "removeMissingFiles" : False,
    "preserveReplicationCount" : True,
    "preserveBlockSize" : True,
    "preservePermissions" : False,
    "skipChecksumChecks" : False,
    "skipListingChecksumChecks" : False,
    "skipTrash" : False,
    "replicationStrategy" : "DYNAMIC",
    "preserveXAttrs" : False,
    "exclusionFilters" : [ ],
    "raiseSnapshotDiffFailures" : False
  }}]) 

api_response = api_instance.create_schedules(cluster_name, service_name, body=body)

The example creates ApiHdfsReplicationArguments and populate attributes such as source path, destination name, MapReduce service to use, and others. For the source service, you will need to provide the HDFS service name and cluster name on the source Cloudera Manager instance. See the API documentation for the complete list of attributes for ApiHdfsReplicationArguments.

At the end of the example, hdfs_args is used to create an HDFS replication schedule.

Step 3. Run the Replication Schedule

The replication schedule created in step 2 has a frequency of 1 DAY, so the schedule will run at the initial start time every day. You can also manually run the schedule using the following:

import cm_client
from cm_client.rest import ApiException
from pprint import pprint

# Configure HTTP basic authorization: basic
cm_client.configuration.username = “[***username***]”
cm_client.configuration.password = “[***password***]”

cm_client.configuration.verify_ssl = False

TARGET_CM_HOST = “[***destination_cluster***]”


# Create an instance of the API class
api_host = TARGET_CM_HOST
port = '7183'
api_version = 'v30'

# Construct base URL for API
# http://cmhost:7180/api/v30api_url = api_host + ':' + port + '/api/' + api_version
api_client = cm_client.ApiClient(api_url)
api_instance = cm_client.ReplicationsResourceApi(api_client) 

Step 4. Monitor the Schedule

Once you get a command (cmd), you can wait for the command to finish and then get the results:
import cm_client
from cm_client.rest import ApiException
from pprint import pprint

# Configure HTTP basic authorization: basic
cm_client.configuration.username = “[***username***]”
cm_client.configuration.password = “[***password***]”

cm_client.configuration.verify_ssl = False


TARGET_CM_HOST = “[*** destination_cluster ***]”

# Create an instance of the API class
api_host = TARGET_CM_HOST
port = '7183'
api_version = 'v30'

# Construct base URL for API
# http://cmhost:7180/api/v30api_url = api_host + ':' + port + '/api/' + api_version
api_client = cm_client.ApiClient(api_url)
api_instance = cm_client.ReplicationsResourceApi(api_client)
cluster_name = [*** Cluster 1 ***] # str | 
schedule_id = 9 # int | Id of an existing replication schedule.
service_name = [*** HDFS-1 ***] # str | The service name.
limit = 20 # int | Maximum number of commands to retrieve. (optional) (default to 20)
offset = 0 # int | Index of first command to retrieve. (optional) (default to 0)
view = [*** summary ***] # str | The view to materialize. (optional) (default to summary)

try:
    # Returns a list of commands triggered by a schedule.
    api_response = api_instance.read_history([*** cluster_name ***], [*** schedule_id ***], [*** service_name ***], limit=limit, offset=offset, view=view)
    pprint(api_response)
except ApiException as e:
    print("Exception when calling ReplicationsResourceApi->read_history: %s\n" % e)

Configuring Replication to/from Cloud Providers

Step 1. Add a Cloud Account

Instead of adding a peer Cloudera Manager instance like a cluster-to-cluster replication, replicating to or from a cloud provider requires an account for that provider.

The following example shows how to add an S3 account:

# Construct base URL for API
# http://cmhost:7180/api/v30api_url = api_host + ':' + port + '/api/' + api_version
api_client = cm_client.ApiClient(api_url)

# create an instance of the API class
api_instance = cm_client.ExternalAccountsResourceApi(cm_client.ApiClient(configuration))

ACCESS_KEY=”[*** .... ***]”
SECRET_KEY=”[*** .... ***]”
TYPE_NAME = 'AWS_ACCESS_KEY_AUTH'


account_configs = {"items" : [{"name" : "aws_access_key","value" : [***ACCESS_KEY***]}, {"name" : "aws_secret_key","value" : [***SECRET_KEY***]}]}
body = cm_client.ApiExternalAccount(name="cloudAccount1", display_name="cloudAccount1", type_name=TYPE_NAME, account_configs=account_configs)


try:
    # Create a new external account.
    api_response = api_instance.create_account(body=body)
    pprint(api_response)
except ApiException as e:
    print("Exception when calling ExternalAccountsResourceApi->create_account: %s\n" % e)

Step 2. Create the Replication Schedule

CLUSTER_NAME='Cluster-tgt-1'
HDFS_NAME='HDFS-tgt-1'
CLOUD_ACCOUNT='cloudAccount1'
YARN_SERVICE='YARN-1'

hdfs = api_root.get_cluster(CLUSTER_NAME).get_service(HDFS_NAME)

hdfs_cloud_args = ApiHdfsCloudReplicationArguments(None)
hdfs_cloud_args.sourceService = ApiServiceRef(None,
                                peerName=None,
                                clusterName=CLUSTER_NAME,
                                serviceName=HDFS_NAME)
hdfs_cloud_args.sourcePath = '/src/path'
hdfs_cloud_args.destinationPath = 's3a://bucket/target/path/'
hdfs_cloud_args.destinationAccount = CLOUD_ACCOUNT
hdfs_cloud_args.mapreduceServiceName = YARN_SERVICE

# creating a schedule with daily frequency
start = datetime.datetime.now() # The time at which the scheduled activity is triggered for the first time.
end = start + datetime.timedelta(days=365) # The time after which the scheduled activity will no longer be triggered.

schedule = hdfs.create_replication_schedule(start, end, "DAY", 1, True, hdfs_args)

The example creates ApiHdfsCloudReplicationArguments, populates it, and creates an HDFS to S3 backup schedule. In addition to specifying attributes such as the source path and destination path, the example provides destinationAccount as CLOUD_ACCOUNT and peerName as None in sourceService. The peerName is None since there is no peer for cloud replication schedules.

hdfs_cloud_args is then used to create a HDFS-S3 replication schedule with a frequency of 1 day.

Step 3. Run the Replication Schedule

The replication schedule created in step 2 has a frequency of 1 DAY, so the schedule will run at the initial start time every day. You can also manually run the schedule using the following:

cmd = hdfs.trigger_replication_schedule(schedule.id)

Step 4. Monitor the Schedule

Once you get a command (cmd), you can wait for the command to finish and then get the results:
cmd = cmd.wait()
result = hdfs.get_replication_schedule(schedule.id).history[0].hdfsResult

Maintaining Replication Schedules

The following actions can be performed on replication schedules that are cluster-to-cluster or cluster to/from a cloud provider:

Get all replication schedules for a given service:
schs = hdfs.get_replication_schedules()
Get a given replication schedule by schedule id for a given service:
sch = hdfs.get_replication_schedule(schedule_id)
Delete a given replication schedule by schedule id for a given service:
sch = hdfs.delete_replication_schedule(schedule_id)
Update a given replication schedule by schedule id for a given service:
sch.hdfsArguments.removeMissingFiles = True
sch = hdfs.update_replication_schedule(sch.id, sch)
Debugging failures during replication
If a replication job fails, you can download replication diagnostic data for the replication command to troubleshoot and diagnose any issues.

The diagnostic data includes all the logs generated, including the MapReduce logs. You can also upload the logs to a support case for further analysis. Collecting a replication diagnostic bundle is available for API v11+ and Cloudera Manager version 5.5+.

args = {}
resp = hdfs.collect_replication_diagnostic_data(schedule_id=schedule.id, args)

# Download replication diagnostic bundle to a temp directory
tmpdir = tempfile.mkdtemp(prefix="support-bundle-replication")
support_bundle_path = os.path.join(tmpdir, "support-bundle.zip")
cm.download_from_url(resp.resultDataUrl, support_bundle_path)