Worker Network Communication
This section demonstrates some trivial examples of how two worker engines communicate with the master engine.
Workers are a low-level feature to help use higher level libraries that can operate across multiple hosts. As such, you will generally want to use workers only to launch the backends for these libraries.
To help you get your workers or distributed computing
framework components talking to one another, every worker engine run
includes an environmental variable CML_MASTER_IP
with the fully addressable IP of the master
engine. Every engine has a dedicated IP access with no possibility of port
conflicts.
For instance, the following are trivial examples of two worker engines talking to the master engine.
R
From the master engine, the following master.r
script will launch
two workers and accept incoming connections from them.
# master.r
library("cdsw")
# Launch two Cloudera AI workers. These are engines that will run in
# the same project, run a given code or script, and exit.
workers <- launch.workers(n=2, cpu=0.2, memory=0.5, env="", script="worker.r")
# Accept two connections, one from each worker. Workers will
# run worker.r.
for(i in c(1,2)) {
# Receive a message from each worker and return a response.
con <- socketConnection(host="0.0.0.0", port = 6000, blocking=TRUE, server=TRUE, open="r+")
data <- readLines(con, 1)
print(paste("Server received:", data))
writeLines("Hello from master!", con)
close(con)
}
The workers will run the following worker.r
script and respond
to the master.
# worker.r
print(Sys.getenv("CML_MASTER_IP"))
con <- socketConnection(host=Sys.getenv("CML_MASTER_IP"), port = 6000, blocking=TRUE, server=FALSE, open="r+")
write_resp <- writeLines("Hello from Worker", con)
server_resp <- readLines(con, 1)
print(paste("Worker received: ", server_resp))
close(con)
Python
From the master engine, the following master.py
script will launch
two workers and accept incoming connections from them.
# master.py
import cdsw, socket
# Launch two CDSW workers. These are engines that will run in
# the same project, run a given code or script, and exit.
workers = cdsw.launch_workers(n=2, cpu=0.2, memory=0.5, script="worker.py")
# Listen on TCP port 6000
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("0.0.0.0", 6000))
s.listen(1)
# Accept two connections, one from each worker. Workers will
# run worker.py.
conn, addr = s.accept()
for i in range(2):
# Receive a message from each worker and return a response.
data = conn.recv(20)
if not data: break
print("Master received:", data)
conn.send("Hello From Server!".encode())
conn.close()
The workers will run the following worker.py
script and respond
to the master.
# worker.py
import os, socket
# Open a TCP connection to the master.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((os.environ["CDSW_MASTER_IP"], 6000))
# Send some data and receive a response.
s.send("Hello From Worker!".encode())
data = s.recv(1024)
s.close()
print("Worker received:", data)