BDR Automation Examples
You can use the Cloudera Manager API to automate BDR tasks, such as creating a schedule for a replication. This page describes an automated solution for creating, running, and managing HDFS replication schedules in order to minimize Recovery Point Objectives (RPOs) for late arriving data or to automate recovery after disaster recovery.
Automating HDFS Replication Schedules
Automating HDFS replication with the API is a multi-step process that involves the following tasks:
Step 1. Create a Peer
Before you can create or run a replication schedule, you need a peer Cloudera Manager instance. This peer acts as the source Cloudera Manager instance where data is pulled from. See Designating a Replication Source for more information.
The following code sample shows you how to create a peer:
import cm_client from cm_client.rest import ApiException from pprint import pprint # Configure HTTP basic authorization: basic cm_client.configuration.username = “[***username***]” cm_client.configuration.password = “[***password***]” cm_client.configuration.verify_ssl = False TARGET_CM_HOST = “[***destination_cluster***]” SOURCE_CM_URL = “[***source_cluster***]:7180” # Create an instance of the API class api_host = TARGET_CM_HOST port = '7183' api_version = 'v30' # Construct base URL for API # http://cmhost:7180/api/v30api_url = api_host + ':' + port + '/api/' + api_version api_client = cm_client.ApiClient(api_url) api_instance = cm_client.CmPeersResourceApi(api_client) # create peer on the target # url= is the source body = cm_client.ApiCmPeer(cloudera_manager_created_user=True, name="peer_name_from_api", password="admin", type="REPLICATION", url=SOURCE_CM_URL, username="admin") api_response = api_instance.create_peer(body=body)
- Replace <destination_cluster> with the domain name of the destination, for example target.cm.cloudera.com.
- Replace <source_cluster> with the domain name of the source, for example src.cm.cloudera.com:7180/.
- The user you specify must possess a role that is capable of creating a peer, such as the Cluster Administrator role.
Step 2. Create the HDFS Replication Schedule
After you have add a peer Cloudera Manager instance that functions as the source, you can create a replication schedule:
import cm_client from cm_client.rest import ApiException from pprint import pprint # Configure HTTP basic authorization: basic cm_client.configuration.username = “[***username***]” cm_client.configuration.password = “[***password***]” cm_client.configuration.verify_ssl = False TARGET_CM_HOST = “[***destination_cluster***]” # Create an instance of the API class api_host = TARGET_CM_HOST port = '7183' api_version = 'v30' # Construct base URL for API # http://cmhost:7180/api/v30api_url = api_host + ':' + port + '/api/' + api_version api_client = cm_client.ApiClient(api_url) api_instance = cm_client.ReplicationsResourceApi(api_client) cluster_name = 'Cluster 1' # str | service_name = 'HDFS-1' # str | The service name. body = cm_client.ApiReplicationScheduleList([{ "displayName" : "ScheduleNameFromAPI", "startTime" : "2021-03-11T18:28:18.684Z", "interval" : 0, "intervalUnit" : "MINUTE", "alertOnStart" : False, "alertOnSuccess" : False, "alertOnFail" : False, "alertOnAbort" : False, "hdfsArguments" : { "sourceService" : { "peerName" : "peer_name_from_api", "clusterName" : "Cluster 1", "serviceName" : "HDFS-1" }, "sourcePath" : "/tmp", "destinationPath" : "/tmp", "mapreduceServiceName" : "YARN-1", "userName" : "d", "numMaps" : 20, "dryRun" : False, "bandwidthPerMap" : 100, "abortOnError" : False, "removeMissingFiles" : False, "preserveReplicationCount" : True, "preserveBlockSize" : True, "preservePermissions" : False, "skipChecksumChecks" : False, "skipListingChecksumChecks" : False, "skipTrash" : False, "replicationStrategy" : "DYNAMIC", "preserveXAttrs" : False, "exclusionFilters" : [ ], "raiseSnapshotDiffFailures" : False }}]) api_response = api_instance.create_schedules(cluster_name, service_name, body=body)
The example creates ApiHdfsReplicationArguments and populate attributes such as source path, destination name, MapReduce service to use, and others. For the source service, you will need to provide the HDFS service name and cluster name on the source Cloudera Manager instance. See the API documentation for the complete list of attributes for ApiHdfsReplicationArguments.
At the end of the example, hdfs_args is used to create an HDFS replication schedule.
Step 3. Run the Replication Schedule
The replication schedule created in step 2 has a frequency of 1 DAY, so the schedule will run at the initial start time every day. You can also manually run the schedule using the following:
import cm_client from cm_client.rest import ApiException from pprint import pprint # Configure HTTP basic authorization: basic cm_client.configuration.username = “[***username***]” cm_client.configuration.password = “[***password***]” cm_client.configuration.verify_ssl = False TARGET_CM_HOST = “[***destination_cluster***]” # Create an instance of the API class api_host = TARGET_CM_HOST port = '7183' api_version = 'v30' # Construct base URL for API # http://cmhost:7180/api/v30api_url = api_host + ':' + port + '/api/' + api_version api_client = cm_client.ApiClient(api_url) api_instance = cm_client.ReplicationsResourceApi(api_client)
Step 4. Monitor the Schedule
import cm_client from cm_client.rest import ApiException from pprint import pprint # Configure HTTP basic authorization: basic cm_client.configuration.username = “[***username***]” cm_client.configuration.password = “[***password***]” cm_client.configuration.verify_ssl = False TARGET_CM_HOST = “[*** destination_cluster ***]” # Create an instance of the API class api_host = TARGET_CM_HOST port = '7183' api_version = 'v30' # Construct base URL for API # http://cmhost:7180/api/v30api_url = api_host + ':' + port + '/api/' + api_version api_client = cm_client.ApiClient(api_url) api_instance = cm_client.ReplicationsResourceApi(api_client) cluster_name = [*** Cluster 1 ***] # str | schedule_id = 9 # int | Id of an existing replication schedule. service_name = [*** HDFS-1 ***] # str | The service name. limit = 20 # int | Maximum number of commands to retrieve. (optional) (default to 20) offset = 0 # int | Index of first command to retrieve. (optional) (default to 0) view = [*** summary ***] # str | The view to materialize. (optional) (default to summary) try: # Returns a list of commands triggered by a schedule. api_response = api_instance.read_history([*** cluster_name ***], [*** schedule_id ***], [*** service_name ***], limit=limit, offset=offset, view=view) pprint(api_response) except ApiException as e: print("Exception when calling ReplicationsResourceApi->read_history: %s\n" % e)
Configuring Replication to/from Cloud Providers
Step 1. Add a Cloud Account
Instead of adding a peer Cloudera Manager instance like a cluster-to-cluster replication, replicating to or from a cloud provider requires an account for that provider.
The following example shows how to add an S3 account:
# Construct base URL for API # http://cmhost:7180/api/v30api_url = api_host + ':' + port + '/api/' + api_version api_client = cm_client.ApiClient(api_url) # create an instance of the API class api_instance = cm_client.ExternalAccountsResourceApi(cm_client.ApiClient(configuration)) ACCESS_KEY=”[*** .... ***]” SECRET_KEY=”[*** .... ***]” TYPE_NAME = 'AWS_ACCESS_KEY_AUTH' account_configs = {"items" : [{"name" : "aws_access_key","value" : [***ACCESS_KEY***]}, {"name" : "aws_secret_key","value" : [***SECRET_KEY***]}]} body = cm_client.ApiExternalAccount(name="cloudAccount1", display_name="cloudAccount1", type_name=TYPE_NAME, account_configs=account_configs) try: # Create a new external account. api_response = api_instance.create_account(body=body) pprint(api_response) except ApiException as e: print("Exception when calling ExternalAccountsResourceApi->create_account: %s\n" % e)
Step 2. Create the Replication Schedule
CLUSTER_NAME='Cluster-tgt-1' HDFS_NAME='HDFS-tgt-1' CLOUD_ACCOUNT='cloudAccount1' YARN_SERVICE='YARN-1' hdfs = api_root.get_cluster(CLUSTER_NAME).get_service(HDFS_NAME) hdfs_cloud_args = ApiHdfsCloudReplicationArguments(None) hdfs_cloud_args.sourceService = ApiServiceRef(None, peerName=None, clusterName=CLUSTER_NAME, serviceName=HDFS_NAME) hdfs_cloud_args.sourcePath = '/src/path' hdfs_cloud_args.destinationPath = 's3a://bucket/target/path/' hdfs_cloud_args.destinationAccount = CLOUD_ACCOUNT hdfs_cloud_args.mapreduceServiceName = YARN_SERVICE # creating a schedule with daily frequency start = datetime.datetime.now() # The time at which the scheduled activity is triggered for the first time. end = start + datetime.timedelta(days=365) # The time after which the scheduled activity will no longer be triggered. schedule = hdfs.create_replication_schedule(start, end, "DAY", 1, True, hdfs_args)
The example creates ApiHdfsCloudReplicationArguments, populates it, and creates an HDFS to S3 backup schedule. In addition to specifying attributes such as the source path and destination path, the example provides destinationAccount as CLOUD_ACCOUNT and peerName as None in sourceService. The peerName is None since there is no peer for cloud replication schedules.
hdfs_cloud_args is then used to create a HDFS-S3 replication schedule with a frequency of 1 day.
Step 3. Run the Replication Schedule
The replication schedule created in step 2 has a frequency of 1 DAY, so the schedule will run at the initial start time every day. You can also manually run the schedule using the following:
cmd = hdfs.trigger_replication_schedule(schedule.id)
Step 4. Monitor the Schedule
cmd = cmd.wait() result = hdfs.get_replication_schedule(schedule.id).history[0].hdfsResult
Maintaining Replication Schedules
The following actions can be performed on replication schedules that are cluster-to-cluster or cluster to/from a cloud provider:
- Get all replication schedules for a given service:
-
schs = hdfs.get_replication_schedules()
- Get a given replication schedule by schedule id for a given service:
-
sch = hdfs.get_replication_schedule(schedule_id)
- Delete a given replication schedule by schedule id for a given service:
-
sch = hdfs.delete_replication_schedule(schedule_id)
- Update a given replication schedule by schedule id for a given service:
-
sch.hdfsArguments.removeMissingFiles = True sch = hdfs.update_replication_schedule(sch.id, sch)
- Debugging failures during replication
- If a replication job fails, you can download replication diagnostic data for the replication command to troubleshoot and diagnose any issues.
The diagnostic data includes all the logs generated, including the MapReduce logs. You can also upload the logs to a support case for further analysis. Collecting a replication diagnostic bundle is available for API v11+ and Cloudera Manager version 5.5+.
args = {} resp = hdfs.collect_replication_diagnostic_data(schedule_id=schedule.id, args) # Download replication diagnostic bundle to a temp directory tmpdir = tempfile.mkdtemp(prefix="support-bundle-replication") support_bundle_path = os.path.join(tmpdir, "support-bundle.zip") cm.download_from_url(resp.resultDataUrl, support_bundle_path)