netrender: first draft for process jobs, to be able to run arbitrary commands on slaves. This could be used to bake physics on network or whatnot.
This commit is contained in:
@@ -3,12 +3,12 @@ import sys, os, re
|
||||
import http, http.client, http.server, urllib
|
||||
import subprocess, shutil, time, hashlib
|
||||
|
||||
import netrender.model
|
||||
import netrender.slave as slave
|
||||
import netrender.master as master
|
||||
from netrender.utils import *
|
||||
|
||||
|
||||
def clientSendJob(conn, scene, anim = False, chunks = 5):
|
||||
def clientSendJob(conn, scene, anim = False):
|
||||
netsettings = scene.network_render
|
||||
job = netrender.model.RenderJob()
|
||||
|
||||
|
||||
@@ -42,9 +42,10 @@ class MRenderSlave(netrender.model.RenderSlave):
|
||||
self.job = None
|
||||
|
||||
class MRenderJob(netrender.model.RenderJob):
|
||||
def __init__(self, job_id, name, files, chunks = 1, priority = 1, blacklist = []):
|
||||
def __init__(self, job_id, job_type, name, files, chunks = 1, priority = 1, blacklist = []):
|
||||
super().__init__()
|
||||
self.id = job_id
|
||||
self.type = job_type
|
||||
self.name = name
|
||||
self.files = files
|
||||
self.frames = []
|
||||
@@ -53,6 +54,10 @@ class MRenderJob(netrender.model.RenderJob):
|
||||
self.usage = 0.0
|
||||
self.blacklist = blacklist
|
||||
self.last_dispatched = time.time()
|
||||
|
||||
# force one chunk for process jobs
|
||||
if self.type == netrender.model.JOB_PROCESS:
|
||||
self.chunks = 1
|
||||
|
||||
# special server properties
|
||||
self.last_update = 0
|
||||
@@ -93,8 +98,8 @@ class MRenderJob(netrender.model.RenderJob):
|
||||
if frame:
|
||||
frame.log_path = log_path
|
||||
|
||||
def addFrame(self, frame_number):
|
||||
frame = MRenderFrame(frame_number)
|
||||
def addFrame(self, frame_number, command):
|
||||
frame = MRenderFrame(frame_number, command)
|
||||
self.frames.append(frame)
|
||||
return frame
|
||||
|
||||
@@ -114,12 +119,14 @@ class MRenderJob(netrender.model.RenderJob):
|
||||
return frames
|
||||
|
||||
class MRenderFrame(netrender.model.RenderFrame):
|
||||
def __init__(self, frame):
|
||||
def __init__(self, frame, command):
|
||||
super().__init__()
|
||||
self.number = frame
|
||||
self.slave = None
|
||||
self.time = 0
|
||||
self.status = QUEUED
|
||||
self.command = command
|
||||
|
||||
self.log_path = None
|
||||
|
||||
def reset(self, all):
|
||||
@@ -368,10 +375,10 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
|
||||
job_id = self.server.nextJobID()
|
||||
|
||||
job = MRenderJob(job_id, job_info.name, job_info.files, chunks = job_info.chunks, priority = job_info.priority, blacklist = job_info.blacklist)
|
||||
job = MRenderJob(job_id, job_info.type, job_info.name, job_info.files, chunks = job_info.chunks, priority = job_info.priority, blacklist = job_info.blacklist)
|
||||
|
||||
for frame in job_info.frames:
|
||||
frame = job.addFrame(frame.number)
|
||||
frame = job.addFrame(frame.number, frame.command)
|
||||
|
||||
self.server.addJob(job)
|
||||
|
||||
@@ -538,17 +545,18 @@ class RenderHandler(http.server.BaseHTTPRequestHandler):
|
||||
frame = job[job_frame]
|
||||
|
||||
if frame:
|
||||
if job_result == DONE:
|
||||
length = int(self.headers['content-length'])
|
||||
buf = self.rfile.read(length)
|
||||
f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb')
|
||||
f.write(buf)
|
||||
f.close()
|
||||
if job.type == netrender.model.JOB_BLENDER:
|
||||
if job_result == DONE:
|
||||
length = int(self.headers['content-length'])
|
||||
buf = self.rfile.read(length)
|
||||
f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb')
|
||||
f.write(buf)
|
||||
f.close()
|
||||
|
||||
del buf
|
||||
elif job_result == ERROR:
|
||||
# blacklist slave on this job on error
|
||||
job.blacklist.append(slave.id)
|
||||
del buf
|
||||
elif job_result == ERROR:
|
||||
# blacklist slave on this job on error
|
||||
job.blacklist.append(slave.id)
|
||||
|
||||
self.server.stats("", "Receiving result")
|
||||
|
||||
|
||||
@@ -32,9 +32,8 @@ def get(handler):
|
||||
def endTable():
|
||||
output("</table>")
|
||||
|
||||
handler.send_head(content = "text/html")
|
||||
|
||||
if handler.path == "/html" or handler.path == "/":
|
||||
handler.send_head(content = "text/html")
|
||||
output("<html><head><title>NetRender</title></head><body>")
|
||||
|
||||
output("<h2>Master</h2>")
|
||||
@@ -86,6 +85,7 @@ def get(handler):
|
||||
output("</body></html>")
|
||||
|
||||
elif handler.path.startswith("/html/job"):
|
||||
handler.send_head(content = "text/html")
|
||||
job_id = handler.path[9:]
|
||||
|
||||
output("<html><head><title>NetRender</title></head><body>")
|
||||
@@ -108,10 +108,9 @@ def get(handler):
|
||||
output("</body></html>")
|
||||
|
||||
elif handler.path.startswith("/html/log"):
|
||||
handler.send_head(content = "text/plain")
|
||||
pattern = re.compile("([a-zA-Z0-9]+)_([0-9]+)")
|
||||
|
||||
output("<html><head><title>NetRender</title></head><body>")
|
||||
|
||||
match = pattern.match(handler.path[9:])
|
||||
if match:
|
||||
job_id = match.groups()[0]
|
||||
@@ -125,12 +124,8 @@ def get(handler):
|
||||
if frame:
|
||||
f = open(frame.log_path, 'rb')
|
||||
|
||||
output("<pre>")
|
||||
|
||||
shutil.copyfileobj(f, handler.wfile)
|
||||
|
||||
output("</pre>")
|
||||
|
||||
f.close()
|
||||
else:
|
||||
output("no such frame")
|
||||
@@ -138,5 +133,3 @@ def get(handler):
|
||||
output("no such job")
|
||||
else:
|
||||
output("malformed url")
|
||||
|
||||
output("</body></html>")
|
||||
|
||||
@@ -72,9 +72,18 @@ class RenderSlave:
|
||||
|
||||
return slave
|
||||
|
||||
JOB_BLENDER = 1
|
||||
JOB_PROCESS = 2
|
||||
|
||||
JOB_TYPES = {
|
||||
JOB_BLENDER: "Blender",
|
||||
JOB_PROCESS: "Process"
|
||||
}
|
||||
|
||||
class RenderJob:
|
||||
def __init__(self):
|
||||
self.id = ""
|
||||
self.type = JOB_BLENDER
|
||||
self.name = ""
|
||||
self.files = []
|
||||
self.frames = []
|
||||
@@ -87,8 +96,8 @@ class RenderJob:
|
||||
def addFile(self, file_path, start=-1, end=-1):
|
||||
self.files.append((file_path, start, end))
|
||||
|
||||
def addFrame(self, frame_number):
|
||||
frame = RenderFrame(frame_number)
|
||||
def addFrame(self, frame_number, command = ""):
|
||||
frame = RenderFrame(frame_number, command)
|
||||
self.frames.append(frame)
|
||||
return frame
|
||||
|
||||
@@ -138,6 +147,7 @@ class RenderJob:
|
||||
max_frame = max((f.number for f in frames)) if frames else -1
|
||||
return {
|
||||
"id": self.id,
|
||||
"type": self.type,
|
||||
"name": self.name,
|
||||
"files": [f for f in self.files if f[1] == -1 or not frames or (f[1] <= min_frame <= f[2] or f[1] <= max_frame <= f[2])],
|
||||
"frames": [f.serialize() for f in self.frames if not frames or f in frames],
|
||||
@@ -155,6 +165,7 @@ class RenderJob:
|
||||
|
||||
job = RenderJob()
|
||||
job.id = data["id"]
|
||||
job.type = data["type"]
|
||||
job.name = data["name"]
|
||||
job.files = data["files"]
|
||||
job.frames = [RenderFrame.materialize(f) for f in data["frames"]]
|
||||
@@ -167,11 +178,12 @@ class RenderJob:
|
||||
return job
|
||||
|
||||
class RenderFrame:
|
||||
def __init__(self, number = 0):
|
||||
def __init__(self, number = 0, command = ""):
|
||||
self.number = number
|
||||
self.time = 0
|
||||
self.status = QUEUED
|
||||
self.slave = None
|
||||
self.command = command
|
||||
|
||||
def statusText(self):
|
||||
return STATUS_TEXT[self.status]
|
||||
@@ -181,7 +193,8 @@ class RenderFrame:
|
||||
"number": self.number,
|
||||
"time": self.time,
|
||||
"status": self.status,
|
||||
"slave": None if not self.slave else self.slave.serialize()
|
||||
"slave": None if not self.slave else self.slave.serialize(),
|
||||
"command": self.command
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
@@ -194,5 +207,6 @@ class RenderFrame:
|
||||
frame.time = data["time"]
|
||||
frame.status = data["status"]
|
||||
frame.slave = RenderSlave.materialize(data["slave"])
|
||||
frame.command = data["command"]
|
||||
|
||||
return frame
|
||||
|
||||
@@ -99,37 +99,46 @@ def render_slave(engine, scene):
|
||||
if not os.path.exists(JOB_PREFIX):
|
||||
os.mkdir(JOB_PREFIX)
|
||||
|
||||
job_path = job.files[0][0] # data in files have format (path, start, end)
|
||||
main_path, main_file = os.path.split(job_path)
|
||||
|
||||
job_full_path = testFile(conn, job.id, slave_id, JOB_PREFIX, job_path)
|
||||
print("Fullpath", job_full_path)
|
||||
print("File:", main_file, "and %i other files" % (len(job.files) - 1,))
|
||||
engine.update_stats("", "Render File", main_file, "for job", job.id)
|
||||
|
||||
for file_path, start, end in job.files[1:]:
|
||||
print("\t", file_path)
|
||||
testFile(conn, job.id, slave_id, JOB_PREFIX, file_path, main_path)
|
||||
|
||||
frame_args = []
|
||||
|
||||
for frame in job.frames:
|
||||
print("frame", frame.number)
|
||||
frame_args += ["-f", str(frame.number)]
|
||||
|
||||
if job.type == netrender.model.JOB_BLENDER:
|
||||
job_path = job.files[0][0] # data in files have format (path, start, end)
|
||||
main_path, main_file = os.path.split(job_path)
|
||||
|
||||
job_full_path = testFile(conn, job.id, slave_id, JOB_PREFIX, job_path)
|
||||
print("Fullpath", job_full_path)
|
||||
print("File:", main_file, "and %i other files" % (len(job.files) - 1,))
|
||||
engine.update_stats("", "Render File", main_file, "for job", job.id)
|
||||
|
||||
for file_path, start, end in job.files[1:]:
|
||||
print("\t", file_path)
|
||||
testFile(conn, job.id, slave_id, JOB_PREFIX, file_path, main_path)
|
||||
|
||||
# announce log to master
|
||||
logfile = netrender.model.LogFile(job.id, [frame.number for frame in job.frames])
|
||||
conn.request("POST", "/log", bytes(repr(logfile.serialize()), encoding='utf8'), headers={"slave-id":slave_id})
|
||||
response = conn.getresponse()
|
||||
|
||||
first_frame = job.frames[0].number
|
||||
|
||||
first_frame = job.frames[0].number
|
||||
|
||||
# start render
|
||||
start_t = time.time()
|
||||
|
||||
val = SetErrorMode()
|
||||
process = subprocess.Popen([sys.argv[0], "-b", job_full_path, "-o", JOB_PREFIX + "######", "-E", "BLENDER_RENDER", "-F", "MULTILAYER"] + frame_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
RestoreErrorMode(val)
|
||||
|
||||
if job.type == netrender.model.JOB_BLENDER:
|
||||
frame_args = []
|
||||
|
||||
for frame in job.frames:
|
||||
print("frame", frame.number)
|
||||
frame_args += ["-f", str(frame.number)]
|
||||
|
||||
val = SetErrorMode()
|
||||
process = subprocess.Popen([sys.argv[0], "-b", job_full_path, "-o", JOB_PREFIX + "######", "-E", "BLENDER_RENDER", "-F", "MULTILAYER"] + frame_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
RestoreErrorMode(val)
|
||||
elif job.type == netrender.model.JOB_PROCESS:
|
||||
command = job.frames[0].command
|
||||
val = SetErrorMode()
|
||||
process = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
RestoreErrorMode(val)
|
||||
|
||||
headers = {"job-id":job.id, "slave-id":slave_id}
|
||||
|
||||
@@ -155,6 +164,9 @@ def render_slave(engine, scene):
|
||||
if testCancel(conn, job.id, first_frame):
|
||||
cancelled = True
|
||||
|
||||
# read leftovers if needed
|
||||
stdout += process.stdout.read()
|
||||
|
||||
if cancelled:
|
||||
# kill process if needed
|
||||
if process.poll() == None:
|
||||
@@ -182,11 +194,16 @@ def render_slave(engine, scene):
|
||||
headers["job-result"] = str(DONE)
|
||||
for frame in job.frames:
|
||||
headers["job-frame"] = str(frame.number)
|
||||
# send result back to server
|
||||
f = open(JOB_PREFIX + "%06d" % frame.number + ".exr", 'rb')
|
||||
conn.request("PUT", "/render", f, headers=headers)
|
||||
f.close()
|
||||
response = conn.getresponse()
|
||||
|
||||
if job.type == netrender.model.JOB_BLENDER:
|
||||
# send image back to server
|
||||
f = open(JOB_PREFIX + "%06d" % frame.number + ".exr", 'rb')
|
||||
conn.request("PUT", "/render", f, headers=headers)
|
||||
f.close()
|
||||
response = conn.getresponse()
|
||||
elif job.type == netrender.model.JOB_PROCESS:
|
||||
conn.request("PUT", "/render", headers=headers)
|
||||
response = conn.getresponse()
|
||||
else:
|
||||
headers["job-result"] = str(ERROR)
|
||||
for frame in job.frames:
|
||||
|
||||
Reference in New Issue
Block a user