python multithreading data race -
i`m making multithread system on python 2.7. basically, has 3 thread , 1 singleton-class shared data.
red arrow - invoke; blue arrow - access
every thread separate class in file. file main.py import working , communication files, , shared data. main thread invoke working class in 1 thread , communication in 1 thread. herewith shared data, 1 instance of singleton, passed in constructors of working class , communication class.
file main.py
import communication import worker import data app_data = data.instance() #........... srv = communication.server(app_data) srv.setdaemon(true) srv.start() #........... while true #........... # must locker.acquire() if condition1: if condition2: job = worker(app_data, srv.taskresultsendtoslaves, app_data.ip_table[app_data.cfg.my_ip]['tasks'].pop()) job.setdaemon(true) job.start() # must locker.release()
file communication.py
class server(threading.thread): # ................. def __init__(self, data): self.data = data # ................. threading.thread.__init__(self) def run(self): srv = socket.socket(socket.af_inet, socket.sock_stream) srv.settimeout(self.data.cfg.timeout) srv.bind((self.my_addr, self.my_port)) srv.listen(self.data.cfg.number_of_clients) print "start server" while true: # handling messages other pc # .................
file worker.py
class worker(threading.thread): def __init__(self, data, sender, taskname): self.data = data self.sender = sender self.taskname = taskname threading.thread.__init__(self) def run(self): import thread self.data.complete_task.clear() tick_before = time.time() startupinfo = subprocess.startupinfo() startupinfo.dwflags |= subprocess.startf_useshowwindow startupinfo.wshowwindow = subprocess.sw_hide p = subprocess.popen(self.data.cfg.path_interpreter + " " + self.data.cfg.path_tasks + self.taskname, startupinfo=startupinfo, shell=false, stdout=subprocess.pipe) job_result, err = p.communicate() tick_after = time.time() work_time = tick_after - tick_before # must locker.acquire() self.data.task_table[self.taskname]['status'] = 'complete' self.data.task_table[self.taskname]['result'] = job_result self.data.task_table[self.taskname]['time'] = work_time # must locker.release() logging.debug("%s task done" % self.taskname) tr = threading.thread(target=self.sender, name="sender", args=(self.taskname, )) tr.setdaemon(true) tr.start() tr.join() logging.debug("%s task sent" % self.taskname) self.data.complete_task.set() thread.exit()
singletone.py
class singleton: def __init__(self, decorated): self._decorated = decorated def instance(self): try: return self._instance except attributeerror: self._instance = self._decorated() return self._instance def __call__(self): raise typeerror('singletons must accessed through `instance()`.') def __instancecheck__(self, inst): return isinstance(inst, self._decorated)
data.py
#-*- coding: utf-8 -*- singletone import singleton configs import configurations import threading import logging @singleton class data: def __init__(self): logging.basicconfig(format=u'%(filename)-10s[line:%(lineno)d] <%(funcname)-15s> # %(levelname)-8s [%(asctime)s] %(message)s'.encode('cp1251', 'ignore'), level=logging.debug, filename='mylog.log') logging.log(100, '='*120) self.cfg = configurations() self.ip_table = self.getiptable() self.task_table = self.gettasktable() self.locker = threading.lock() self.initialization = threading.event() self.initialization.clear() self.identification = threading.event() self.identification.clear() self.complete_task = threading.event() self.complete_task.set() self.flag_of_close = false def __str__(self): return "\ {0}\n\ \n\ {1}\n\ \n\ {2}\n\ ".format(str(self.cfg), self.striptable(), self.strtasktable()) def striptable(self): #return str(self.ip_table) result = ["%s = %s" % (key, str(value)) key, value in self.ip_table.items()] result.sort() return "\n\t\t".join(result) def strtasktable(self): #return str(self.task_table) result = ["%s = %s" % (key, str(value)) key, value in self.task_table.items()] result.sort() return "\n\t\t".join(result) def getiptable(self): result = {} if self.cfg.ips: result = dict((item.strip(), {'status': true, 'port': 8000, 'tasks': []}) item in self.cfg.ips.split(',')) # result = dict((item.strip(), {'status': false, 'port': 8000, 'tasks': []}) item in self.cfg.ips.split(',')) result[self.cfg.my_ip] = {'status': true, 'port': 8000, 'tasks': []} return result def gettasktable(self): result = {} if self.cfg.tasks: result = dict((item.strip(), {'status': 'uncomplete', 'result': '', 'time': 0}) item in self.cfg.tasks.split(',')) return result def gettotalcompletetasks(self): result = 0 taskname in self.task_table.keys(): if self.task_table[taskname]['status'] == 'complete': result += 1 return result if __name__ == '__main__': data = data.instance() print data
singleton has stolen stackoverflow
after start system, have data race. when working , main thread @ same time read shared data. think need threading.lock here. did mistake, put lock object in shared data , have used separate access. shortly have understand mistake.
filenames has changed, pieces of code has removed.
but don't know must put lock object, every thread can , in right way access , use it. can give me advice?
english not good, tolerant. hope understand question...
p.s.
besides that, tried pass lock()
object in constructors of classes. , have same trouble. application has fallen somewhere, data accessed. , can't find out is. every start can fall down application 50% probability.
i found bug. singleton class, don't know how fix it.
Comments
Post a Comment