This repository has been archived on 2023-10-09. You can view files and clone it, but cannot push or open issues or pull requests.
Files
blender-archive/release/io/netrender/slave.py
Martin Poirier 22274d3807 More automatic stuff.
Server can now be set to broadcast on local network (every 10s, approximately 20 bytes of data) where client and slave can pick up its existence. This is on by default.

Default ip address is now "[default]", which means for the master that it will listen to all interface and for the clients and slave that they will automatically work out the master's address from its broadcast.
2009-09-15 19:53:18 +00:00

181 lines
5.2 KiB
Python

import sys, os
import http, http.client, http.server, urllib
import subprocess, time
from netrender.utils import *
import netrender.model
CANCEL_POLL_SPEED = 2
MAX_TIMEOUT = 10
INCREMENT_TIMEOUT = 1
def slave_Info():
sysname, nodename, release, version, machine = os.uname()
slave = netrender.model.RenderSlave()
slave.name = nodename
slave.stats = sysname + " " + release + " " + machine
return slave
def testCancel(conn, job_id):
conn.request("HEAD", "status", headers={"job-id":job_id})
response = conn.getresponse()
# cancelled if job isn't found anymore
if response.status == http.client.NO_CONTENT:
return True
else:
return False
def testFile(conn, JOB_PREFIX, file_path, main_path = None):
job_full_path = prefixPath(JOB_PREFIX, file_path, main_path)
if not os.path.exists(job_full_path):
temp_path = JOB_PREFIX + "slave.temp.blend"
conn.request("GET", "file", headers={"job-id": job.id, "slave-id":slave_id, "job-file":file_path})
response = conn.getresponse()
if response.status != http.client.OK:
return None # file for job not returned by server, need to return an error code to server
f = open(temp_path, "wb")
buf = response.read(1024)
while buf:
f.write(buf)
buf = response.read(1024)
f.close()
os.renames(temp_path, job_full_path)
return job_full_path
def render_slave(engine, scene):
netsettings = scene.network_render
timeout = 1
engine.update_stats("", "Network render node initiation")
conn = clientConnection(scene)
if conn:
conn.request("POST", "slave", repr(slave_Info().serialize()))
response = conn.getresponse()
slave_id = response.getheader("slave-id")
NODE_PREFIX = netsettings.path + "slave_" + slave_id + os.sep
if not os.path.exists(NODE_PREFIX):
os.mkdir(NODE_PREFIX)
while not engine.test_break():
conn.request("GET", "job", headers={"slave-id":slave_id})
response = conn.getresponse()
if response.status == http.client.OK:
timeout = 1 # reset timeout on new job
job = netrender.model.RenderJob.materialize(eval(str(response.read(), encoding='utf8')))
JOB_PREFIX = NODE_PREFIX + "job_" + job.id + os.sep
if not os.path.exists(JOB_PREFIX):
os.mkdir(JOB_PREFIX)
job_path = job.files[0][0] # data in files have format (path, start, end)
main_path, main_file = os.path.split(job_path)
job_full_path = testFile(conn, JOB_PREFIX, job_path)
print("Fullpath", job_full_path)
print("File:", main_file, "and %i other files" % (len(job.files) - 1,))
engine.update_stats("", "Render File", main_file, "for job", job.id)
for file_path, start, end in job.files[1:]:
print("\t", file_path)
testFile(conn, JOB_PREFIX, file_path, main_path)
frame_args = []
for frame in job.frames:
print("frame", frame.number)
frame_args += ["-f", str(frame.number)]
start_t = time.time()
process = subprocess.Popen([sys.argv[0], "-b", job_full_path, "-o", JOB_PREFIX + "######", "-E", "BLENDER_RENDER", "-F", "MULTILAYER"] + frame_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
headers = {"job-id":job.id, "slave-id":slave_id}
cancelled = False
stdout = bytes()
run_t = time.time()
while process.poll() == None and not cancelled:
stdout += process.stdout.read(32)
current_t = time.time()
cancelled = engine.test_break()
if current_t - run_t > CANCEL_POLL_SPEED:
# update logs. Eventually, it should support one log file for many frames
for frame in job.frames:
headers["job-frame"] = str(frame.number)
conn.request("PUT", "log", stdout, headers=headers)
response = conn.getresponse()
stdout = bytes()
run_t = current_t
if testCancel(conn, job.id):
cancelled = True
if cancelled:
# kill process if needed
if process.poll() == None:
process.terminate()
continue # to next frame
total_t = time.time() - start_t
avg_t = total_t / len(job.frames)
status = process.returncode
print("status", status)
# flush the rest of the logs
if stdout:
for frame in job.frames:
headers["job-frame"] = str(frame.number)
conn.request("PUT", "log", stdout, headers=headers)
response = conn.getresponse()
headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)}
if status == 0: # non zero status is error
headers["job-result"] = str(DONE)
for frame in job.frames:
headers["job-frame"] = str(frame.number)
# send result back to server
f = open(JOB_PREFIX + "%06d" % frame.number + ".exr", 'rb')
conn.request("PUT", "render", f, headers=headers)
f.close()
response = conn.getresponse()
else:
headers["job-result"] = str(ERROR)
for frame in job.frames:
headers["job-frame"] = str(frame.number)
# send error result back to server
conn.request("PUT", "render", headers=headers)
response = conn.getresponse()
else:
if timeout < MAX_TIMEOUT:
timeout += INCREMENT_TIMEOUT
for i in range(timeout):
time.sleep(1)
if engine.test_break():
conn.close()
return
conn.close()