summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMilo Casagrande <milo.casagrande@linaro.org>2015-03-23 09:56:32 (GMT)
committerMilo Casagrande <milo.casagrande@linaro.org>2015-03-23 13:42:17 (GMT)
commit0ec8f9cf8d2d35901f85706117a439a4b181239e (patch)
tree26441ac03a7cbd95067c47c3934d3793b152500c
parentf21f974dfe21394f3c8dbba9aa02ecb07a01dbb3 (diff)
downloadlava-dispatcher-0ec8f9cf8d2d35901f85706117a439a4b181239e.tar.gz
lava-dispatcher-0ec8f9cf8d2d35901f85706117a439a4b181239e.tar.xz
More pylint checks and cleanup.
* Use Python tempfile module to obtain temp directory. * Use os.path.join to create paths. * Cleanup docstrings and variable name. Change-Id: I59bf530537ec38592f026d85354740254fce52a3
-rwxr-xr-xlava/dispatcher/lava-dispatcher-slave145
1 files changed, 71 insertions, 74 deletions
diff --git a/lava/dispatcher/lava-dispatcher-slave b/lava/dispatcher/lava-dispatcher-slave
index 164cf57..650ec1e 100755
--- a/lava/dispatcher/lava-dispatcher-slave
+++ b/lava/dispatcher/lava-dispatcher-slave
@@ -31,6 +31,7 @@ import signal
import socket
import subprocess
import sys
+import tempfile
import time
import traceback
import yaml
@@ -48,18 +49,18 @@ import zmq
TIMEOUT = 5
END_OK_TIMEOUT = 5
SEND_QUEUE = 10
-TMP_DIR = "/tmp/lava-dispatcher/slave/"
+# FIXME: This is a temporary fix and the dir contents need to be assessed
+# whether they are useful (logs should go into /tmp or tmpfs) or not.
+TMP_DIR = os.path.join(tempfile.gettempdir(), "lava-dispatcher/slave/")
-# Setup the logger
+# Setup the log.
FORMAT = "%(asctime)-15s %(levelname)s %(message)s"
logging.basicConfig(format=FORMAT)
-logger = logging.getLogger("dispatcher-slave")
+LOG = logging.getLogger("dispatcher-slave")
def mkdir(path):
- """
- Create a directory only if needed.
- """
+ """Create a directory only if needed."""
try:
os.makedirs(path)
except OSError as exc:
@@ -70,43 +71,39 @@ def mkdir(path):
class Master(object):
- """
- Store information about the master status
- """
+ """Store information about the master status."""
def __init__(self):
self.last_msg = 0
self.last_ping = 0
self.online = True
def received_msg(self):
- """ We received a valid message from the master """
+ """We received a valid message from the master."""
self.last_msg = time.time()
if not self.online:
- logger.info("Master back ONLINE")
+ LOG.info("Master back ONLINE")
self.online = True
class Job(object):
- """
- Wrapper around a job process
- """
+ """Wrapper around a job process."""
def __init__(self, job_id, definition, device_definition, env, log_socket):
self.job_id = job_id
self.log_socket = log_socket
self.env = env
self.proc = None
self.running = False
- self.base_dir = "%s%s" % (TMP_DIR, self.job_id)
+ self.base_dir = os.path.join(TMP_DIR, "%s/" % self.job_id)
mkdir(self.base_dir)
# Write back the job and device cofniguration
- with open("%s/job.yaml" % self.base_dir, "w") as f_job:
+ with open(os.path.join(self.base_dir, "job.yaml"), "w") as f_job:
f_job.write(definition)
- with open("%s/device.yaml" % self.base_dir, "w") as f_device:
+ with open(os.path.join(self.base_dir, "device.yaml"), "w") as f_device:
f_device.write(device_definition)
def create_environ(self):
- """ Generate the env variables for the job """
+ """Generate the env variables for the job."""
conf = yaml.load(self.env)
if conf.get("purge", False):
environ = {}
@@ -125,41 +122,41 @@ class Job(object):
return environ
def start(self):
- """ Start the process """
+ """Start the process."""
try:
- logger.debug("start %s", self.job_id)
+ LOG.debug("start %s", self.job_id)
env = self.create_environ()
args = [
"lava-dispatch",
"--target",
- "%s/device.yaml" % self.base_dir,
- "%s/job.yaml" % self.base_dir,
- "--output-dir=%s%s/logs/" % (TMP_DIR, self.job_id),
+ os.path.join(self.base_dir, "device.yaml"),
+ os.path.join(self.base_dir, "job.yaml"),
+ "--output-dir=%s" % os.path.join(self.base_dir, "logs/"),
"--job-id=%s" % self.job_id,
"--socket-addr=%s" % self.log_socket
]
- # TODO: maybe using something else than this basic functions to
- # control the processes?
+
+ out_file = os.path.join(self.base_dir, "out")
+ err_file = os.path.join(self.base_dir, "err")
self.proc = subprocess.Popen(
args,
- stdout=open("%s/out" % self.base_dir, "w"),
- stderr=open("%s/err" % self.base_dir, "w"),
- env=env)
+ stdout=open(out_file, "w"),
+ stderr=open(err_file, "w"), env=env)
self.running = True
except Exception as exc:
# daemon must always continue running even if the job crashes
if hasattr(exc, "child_traceback"):
- logger.exception(
+ LOG.exception(
{exc.strerror: exc.child_traceback.split("\n")})
else:
- logger.exception(exc)
- with open("%s/err" % self.base_dir, "a") as errlog:
- # TODO: send something to the zmq logger
+ LOG.exception(exc)
+ with open(err_file, "a") as errlog:
+ # TODO: send something to the zmq LOG
errlog.write("%s\n%s\n" % (exc, traceback.format_exc()))
self.cancel()
def cancel(self):
- """ Cancel the job and kill the process """
+ """Cancel the job and kill the process."""
if self.proc is not None:
self.proc.kill()
# TODO: be sure not to block here
@@ -169,9 +166,7 @@ class Job(object):
def get_fqdn():
- """
- Returns the fully qualified domain name.
- """
+ """Return the fully qualified domain name."""
host = socket.getfqdn()
try:
if bool(re.match("[-_a-zA-Z0-9.]+$", host)):
@@ -183,7 +178,7 @@ def get_fqdn():
def main():
- # Parse command line
+ """Set up and start the dispatcher slave."""
parser = argparse.ArgumentParser(description="LAVA Dispatcher Slave")
parser.add_argument(
"--hostname", default=get_fqdn(), type=str, help="Name of the slave")
@@ -196,15 +191,15 @@ def main():
help="Log level (ERROR, WARN, INFO, DEBUG)", default="INFO")
args = parser.parse_args()
- # Set-up the logger level
+ # Set-up the LOG level
if args.level == "ERROR":
- logger.setLevel(logging.ERROR)
+ LOG.setLevel(logging.ERROR)
elif args.level == "WARN":
- logger.setLevel(logging.WARN)
+ LOG.setLevel(logging.WARN)
elif args.level == "INFO":
- logger.setLevel(logging.INFO)
+ LOG.setLevel(logging.INFO)
else:
- logger.setLevel(logging.DEBUG)
+ LOG.setLevel(logging.DEBUG)
# Connect to the master dispatcher
context = zmq.Context()
@@ -219,15 +214,15 @@ def main():
jobs = {}
# Say hello to the server
- logger.info("Connecting to master as <%s>", args.hostname)
+ LOG.info("Connecting to master as <%s>", args.hostname)
sock.send_multipart(["HELLO"])
# Waiting for an 'HELLO_OK'
# TODO: add a loop to retry
msg = sock.recv_multipart()
if msg[0] != "HELLO_OK":
- logger.error("Invalid answer from server")
- logger.debug("Received: '%s'", msg)
+ LOG.error("Invalid answer from server")
+ LOG.debug("Received: '%s'", msg)
sys.exit(1)
# Mark the master as alive
@@ -250,7 +245,7 @@ def main():
poller.register(pipe_r, zmq.POLLIN)
# Loop for server instructions
- logger.info("Waiting for master instructions")
+ LOG.info("Waiting for master instructions")
while True:
# TODO: compute the right timeout on the socket
try:
@@ -259,7 +254,7 @@ def main():
continue
if sockets.get(pipe_r) == zmq.POLLIN:
- logger.info("Received a signal, leaving")
+ LOG.info("Received a signal, leaving")
break
if sockets.get(sock) == zmq.POLLIN:
@@ -269,13 +264,13 @@ def main():
try:
action = msg[0]
except (IndexError, TypeError):
- logger.error("Invalid message '%s'", msg)
+ LOG.error("Invalid message '%s'", msg)
continue
- logger.debug("Received action=%s, args=(%s)", action, msg[1:])
+ LOG.debug("Received action=%s, args=(%s)", action, msg[1:])
# Parse the action
if action == "PONG":
- logger.debug("Connection to master ok")
+ LOG.debug("Connection to master ok")
# Mark the master as alive
master.received_msg()
@@ -287,24 +282,25 @@ def main():
device_definition = msg[3]
env = msg[4]
except (IndexError, ValueError):
- logger.error("Invalid message '%s'", msg)
+ LOG.error("Invalid message '%s'", msg)
continue
- logger.info("[%d] Starting job", job_id)
- logger.debug("[%d] : %s", job_id, job_definition)
- logger.debug("[%d] device: %s", job_id, device_definition)
- logger.debug("[%d] env : %s", job_id, env)
+
+ LOG.info("[%d] Starting job", job_id)
+ LOG.debug("[%d] : %s", job_id, job_definition)
+ LOG.debug("[%d] device: %s", job_id, device_definition)
+ LOG.debug("[%d] env : %s", job_id, env)
# Check if the job is known and started. In this case, send
# back the right signal (ignoring the duplication or signaling
# the end of the job).
if job_id in jobs:
if jobs[job_id].running:
- logger.info(
+ LOG.info(
"[%d] Job has already been started", job_id)
sock.send_multipart(["START_OK", str(job_id)])
else:
- logger.warning("[%d] Job has already ended", job_id)
- sock.send_multipart(["END", str(job_id), 0])
+ LOG.warning("[%d] Job has already ended", job_id)
+ sock.send_multipart(["END", str(job_id)])
else:
jobs[job_id] = Job(job_id, job_definition,
device_definition, env,
@@ -319,9 +315,9 @@ def main():
try:
job_id = int(msg[1])
except (IndexError, ValueError):
- logger.error("Invalid message '%s'", msg)
+ LOG.error("Invalid message '%s'", msg)
continue
- logger.info("[%d] Canceling", job_id)
+ LOG.info("[%d] Canceling", job_id)
# Check if the job is known and started. In this case, send
# back the right signal (ignoring the duplication or signaling
@@ -330,10 +326,10 @@ def main():
if jobs[job_id].running:
jobs[job_id].cancel()
else:
- logger.info(
+ LOG.info(
"[%d] Job has already been canceled", job_id)
else:
- logger.debug("[%d] Unknown job, sending END", job_id)
+ LOG.debug("[%d] Unknown job, sending END", job_id)
jobs[job_id] = Job(job_id, "", "", None, None)
jobs[job_id].running = False
# Send the END message anyway
@@ -346,13 +342,13 @@ def main():
try:
job_id = int(msg[1])
except (IndexError, ValueError):
- logger.error("Invalid message '%s'", msg)
+ LOG.error("Invalid message '%s'", msg)
continue
if job_id in jobs:
- logger.debug("[%d] Job END acked", job_id)
+ LOG.debug("[%d] Job END acked", job_id)
del jobs[job_id]
else:
- logger.debug("[%d] Unknown job END acked", job_id)
+ LOG.debug("[%d] Unknown job END acked", job_id)
# Do not mark the master as alive. In fact we are not sending
# back any data so the master will not be able to mark the
@@ -362,7 +358,7 @@ def main():
try:
job_id = int(msg[1])
except (IndexError, ValueError):
- logger.error("Invalid message '%s'", msg)
+ LOG.error("Invalid message '%s'", msg)
continue
if job_id in jobs:
if jobs[job_id].running:
@@ -373,7 +369,8 @@ def main():
sock.send_multipart(["END", str(job_id), 0])
else:
# Unknown job: return END anyway
- logger.debug("[%d] Unknown job, sending END after STATUS", job_id)
+ LOG.debug(
+ "[%d] Unknown job, sending END after STATUS", job_id)
jobs[job_id] = Job(job_id, "", "", None, None)
jobs[job_id].running = False
sock.send_multipart(["END", str(job_id), 0])
@@ -382,7 +379,7 @@ def main():
master.received_msg()
else:
- logger.error(
+ LOG.error(
"Unknown action: '%s', args=(%s)", action, msg[1:])
# Do not write the master as alive as the message does not mean
# anything.
@@ -393,10 +390,10 @@ def main():
ret = jobs[job_id].proc.poll()
# Job has finished
if ret is not None:
- logger.info("[%d] Job END", job_id)
+ LOG.info("[%d] Job END", job_id)
job_status = jobs[job_id].proc.returncode
if job_status:
- logger.info("[%d] Job returned non-zero", job_id)
+ LOG.info("[%d] Job returned non-zero", job_id)
jobs[job_id].running = False
sock.send_multipart(["END", str(job_id), str(job_status)])
@@ -407,18 +404,18 @@ def main():
if now - max(master.last_msg, master.last_ping) > TIMEOUT:
# Is the master offline ?
if master.online and now - master.last_msg > 4 * TIMEOUT:
- logger.warning("Master goes OFFLINE")
+ LOG.warning("Master goes OFFLINE")
master.online = False
- logger.debug(
+ LOG.debug(
"Sending PING to the master (last message %ss ago)",
int(now - master.last_msg))
sock.send_multipart(["PING"])
master.last_ping = now
- # Closing sockets and droping messages.
- logger.info("Closing the socket and dropping messages")
+ # Closing sockets and dropping messages.
+ LOG.info("Closing the socket and dropping messages")
sock.close(linger=0)
context.term()