class ProcessDisasterScheduler:
'''
interval scheduler
'''
def __init__(self,scalr_conn, influence_scope, \
affect_number, interval,max_open_files, \
system_call_failure_rate,process_name,scenario, max_processes,\
script_id, timeout, async, disaster_id,\
disaster_sub_type):
self.sched = Scheduler(daemonic = False)
self.pool = threadpool.ThreadPool(5)
self.scalr_conn = scalr_conn
self.influence_scope = influence_scope
self.old_affect_vm_list = []
self.affect_number = affect_number
self.interval = interval
self.script_id = script_id
self.timeout = timeout
self.async = async
self.disaster_id = disaster_id
self.msg={"action":"start","type":disaster_sub_type,\
"process_name":process_name,"max_open_files":max_open_files,\
"system_call_failure_rate":system_call_failure_rate,\
"scenario":scenario,"max_processes":max_processes}
logger.info("............... self.influence_scope %s " % \
self.influence_scope)
self.affect_vm_list = random_select(self.influence_scope, \
self.affect_number)
logger.info("............... self.affect_vm_list %s " % \
self.affect_vm_list)
def run(self):
'''
the scheduler main function
'''
current_time = (datetime.now()+timedelta(seconds=1))\
.strftime("%Y-%m-%d %X")
self.sched.add_interval_job(self.job_function, seconds=int(self.interval), start_date=str(current_time))
self.sched.add_listener(self.err_listener, apscheduler.events.EVENT_JOB_ERROR | apscheduler.events.EVENT_JOB_MISSED)
self.sched.start()
def stop(self):
'''
function to stop the interval scheduler
'''
current_time = datetime.now()
data=[]
self.msg["action"]="stop"
for each_vm in self.influence_scope:
data.append([self.scalr_conn, each_vm,\
self.msg,self.script_id, \
self.timeout, self.async, self.disaster_id])
requests = threadpool.makeRequests(time_process_disaster,data)
[self.pool.putRequest(req) for req in requests]
self.pool.wait()
self.sched.shutdown()
def job_function(self):
data=[]
if len(self.old_affect_vm_list) > 0:
self.msg["action"]="stop"
for each_vm in self.old_affect_vm_list:
data.append([self.scalr_conn, each_vm,\
self.msg,self.script_id, \
self.timeout, self.async, self.disaster_id])
print "stop",len(data),data
requests = threadpool.makeRequests(time_process_disaster,data)
[self.pool.putRequest(req) for req in requests]
self.pool.wait()
self.affect_vm_list = random_select(self.influence_scope, \
self.affect_number)
self.old_affect_vm_list=self.affect_vm_list
if not self.affect_vm_list:
time.sleep(int(self.interval))
else:
self.msg["action"]="start"
data=[]
for each_vm in self.affect_vm_list:
data.append([self.scalr_conn, each_vm,\
self.msg,self.script_id, \
self.timeout, self.async, self.disaster_id])
print "start",len(data),data
requests = threadpool.makeRequests(time_process_disaster,data)
[self.pool.putRequest(req) for req in requests]
self.pool.wait()
def err_listener(self,ev):
err_logger = logging.getLogger('schedErrJob')
if ev.exception:
err_logger.exception('%s error.', str(ev.job))
else:
err_logger.info('%s miss', str(ev.job))
if __name__ == '__main__':
scalr_test = ""
JsonString={
"disaster" : {
"disaster_type": "SYSTEM",
"disaster_id": "520c60ee607ecf7bcedee007",
"disaster_sub_type": "time_disaster",
"level": "customized",
"customized_parameter": {
"affect_rate":100 ,
"machine_time":123213423
},
"influence_scope": [
{"ip_addr":"192.168.27.112","local_ipv4":"10.168.27.112","farm_id":"10174","instance_id":"i-f0a5ccaa","server_id":"339b73a5-8217-4e4c-848b-3b1cccb62b70"},
{"ip_addr":"192.168.27.113","local_ipv4":"10.168.27.113","farm_id":"10174","instance_id":"i-3edb5964","server_id":"3af78f18-54c3-431d-952f-15aef8178a1e"},
{"ip_addr":"192.168.27.114","local_ipv4":"10.168.27.114","farm_id":"10174","instance_id":"i-19decd42","server_id":"cdc60c82-596e-40f9-a41a-a2a25ca66ad3"}
],
},
"interval" : 3,
}
Json={
"disaster" : {
"disaster_type": "SYSTEM",
"disaster_id": "520c60ee607ecf7bcedee007",
"disaster_sub_type": "process",
"level": "customized",
"customized_parameter": {
"affect_rate":10 ,
"process_name": "mysqld",
"system_call_failure_rate":10,
"max_open_files":20,
"max_processes":20,
"scenario":"resource_limitation"
},
"influence_scope": [
{"ip_addr":"192.168.27.112","local_ipv4":"10.168.27.112","farm_id":"10174","instance_id":"i-f0a5ccaa","server_id":"339b73a5-8217-4e4c-848b-3b1cccb62b70"},
{"ip_addr":"192.168.27.113","local_ipv4":"10.168.27.113","farm_id":"10174","instance_id":"i-3edb5964","server_id":"3af78f18-54c3-431d-952f-15aef8178a1e"},
{"ip_addr":"192.168.27.114","local_ipv4":"10.168.27.114","farm_id":"10174","instance_id":"i-19decd42","server_id":"cdc60c82-596e-40f9-a41a-a2a25ca66ad3"}
],
},
"interval" : 3,
}
#p=TimeProcessDisaster(scalr_test,Json)
#p.start()
TimeProcessDisaster(scalr_test,JsonString).start()
""" p=TimeDisasterScheduler("scalr_conn", JsonString["disaster"]["influence_scope"], \
1,1,JsonString["disaster"]["customized_parameter"]["machine_time"],\
2895, 10, 1, "520c60ee607ecf7bcedee007", \
"customized")
p.run()
pass
pass
p.stop()"""