标题:python任务调度
取消只看楼主
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
结帖率:40%
已结贴  问题点数:20 回复次数:3 
python任务调度
from apscheduler.scheduler import Scheduler
import apscheduler
import time
from datetime import *
sched = Scheduler(daemonic = False)  
def job_function():
    print "Hello World"

current_time = (datetime.now()+timedelta(seconds=1))\
                     .strftime("%Y-%m-%d %X")
sched.add_interval_job(job_function,start_date=str(current_time))  

sched.start()
搜索更多相关主题的帖子: seconds python import False 
2013-09-11 09:49
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
得分:0 
# -*- coding:utf-8 -*-
import pymongo
 
from apscheduler.scheduler import Scheduler
 
from apscheduler.jobstores.mongodb_store import MongoDBJobStore
import apscheduler
import time
import logging
import threadpool


 
sched = Scheduler(daemonic = False)
 

 
mongo = pymongo.Connection(host='localhost', port=27017)
 
store = MongoDBJobStore(connection=mongo)
pool = threadpool.ThreadPool(5)
sched.add_jobstore(store, 'mongo')        # 别名是mongo


 
@sched.cron_schedule(second='*', day_of_week='*', hour='*')        # 向别名为mongo的jobstore添加job
 
def job():
    print 'a job'
    data=[]
    msg={}
    msg["action"]="stop"
    for each_vm in range(10):
        
        data.append(["self.scalr_conn", each_vm,\
                                msg])
              
    requests = threadpool.makeRequests(time_process_disaster,data)
    [pool.putRequest(req) for req in requests]
    pool.wait()
 
def err_listener(ev):
    err_logger = logging.getLogger('schedErrJob')
    if ev.exception:
        err_logger.exception('%s error.', str(ev.job))
    else:
        err_logger.info('%s miss', str(ev.job))
 
def time_process_disaster(data):
    print data
   
sched.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR | apscheduler.events.EVENT_JOB_MISSED)

 
sched.start()
2013-09-11 09:49
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
得分:0 
class TimeDisasterScheduler:
    '''
        interval scheduler
    '''
    def __init__(self, scalr_conn, influence_scope, \
                affect_number,interval,machine_time,\
                script_id, timeout, async, disaster_id, \
                disaster_sub_type):
        self.sched = Scheduler(daemonic = False)
        self.pool = threadpool.ThreadPool(5)
        
        self.scalr_conn = scalr_conn
        self.influence_scope = influence_scope
        self.old_affect_vm_list = []
        self.affect_number = affect_number

        self.interval = interval
        
        self.script_id = script_id
        self.timeout = timeout
        self.async = async
        self.disaster_id = disaster_id
        self.msg={"action":"start","type":disaster_sub_type,"machinetime":machine_time}
        logger.info("...............  self.influence_scope %s  " % \
                     self.influence_scope)
        self.affect_vm_list = random_select(self.influence_scope, \
                                               self.affect_number)
        logger.info("...............  self.affect_vm_list %s  " % \
                     self.affect_vm_list)
        
    def job_function(self):
        print "hhhhhhh"
        data=[]
        if len(self.old_affect_vm_list) > 0:
            for each_vm in self.old_affect_vm_list:
                self.msg["action"]="stop"
                data.append([self.scalr_conn, each_vm,\
                                self.msg,self.script_id, \
                                self.timeout, self.async, self.disaster_id])
              
            requests = threadpool.makeRequests(time_process_disaster,data)
            [self.pool.putRequest(req) for req in requests]
            pool.wait()
           
           
        self.affect_vm_list = random_select(self.influence_scope, \
                                    self.affect_number)
        self.old_affect_vm_list=self.affect_vm_list
        if not self.affect_vm_list:
            time.sleep(int(self.interval))
        else:            
            for each_vm in self.affect_vm_list:   
                logger.info("...............  vm %s is ready to %s " % \
                                (each_vm["instance_id"]))
                data.append([self.scalr_conn, each_vm,\
                                 self.msg,self.script_id, \
                                 self.timeout, self.async, self.disaster_id])
                print data
                affect_vm_list.remove(each_vm)
        requests = threadpool.makeRequests(time_process_disaster,data)
        [self.pool.putRequest(req) for req in requests]
        pool.wait()
                           
    def err_listener(self,ev):
        err_logger = logging.getLogger('schedErrJob')
        if ev.exception:
            err_logger.exception('%s error.', str(ev.job))
        else:
            err_logger.info('%s miss', str(ev.job))
        
    def run(self):
        '''
            the scheduler main function
        '''
        current_time = (datetime.now()+timedelta(seconds=1))\
                     .strftime("%Y-%m-%d %X")
        print current_time
        print int(self.interval)
        self.sched.add_interval_job(self.job_function, seconds=int(self.interval), start_date=str(current_time))
        #self.sched.add_listener(self.err_listener, apscheduler.events.EVENT_JOB_ERROR | apscheduler.events.EVENT_JOB_MISSED)
        self.sched.start()

         
    def stop(self):
        '''
            function to stop the interval scheduler
        '''
        data=[]
        for each_vm in self.influence_scope:
            self.msg["action"]="stop"
            data.append([self.scalr_conn, each_vm,\
                        self.msg,self.script_id, \
                        self.timeout, self.async, self.disaster_id])
        requests = threadpool.makeRequests(time_process_disaster,data)
        [self.pool.putRequest(req) for req in requests]
        pool.wait()
        self.sched.shutdown()
if __name__ == '__main__':
    scalr_test = ""
    JsonString={
                "disaster"  : {
                                  "disaster_type":  "SYSTEM",
                                  "disaster_id": "520c60ee607ecf7bcedee007",
                                  "disaster_sub_type": "time_disaster",
                                  "level":  "customized",
                                  "customized_parameter": {
                                                           "affect_rate":30  ,      
                                                           "machine_time":123213423
                                                           },
                                  "influence_scope": [
                                                 {"ip_addr":"192.168.27.112","local_ipv4":"10.168.27.112","farm_id":"10174","instance_id":"i-f0a5ccaa","server_id":"339b73a5-8217-4e4c-848b-3b1cccb62b70"},
                                                 {"ip_addr":"192.168.27.113","local_ipv4":"10.168.27.113","farm_id":"10174","instance_id":"i-3edb5964","server_id":"3af78f18-54c3-431d-952f-15aef8178a1e"},
                                                 {"ip_addr":"192.168.27.114","local_ipv4":"10.168.27.114","farm_id":"10174","instance_id":"i-19decd42","server_id":"cdc60c82-596e-40f9-a41a-a2a25ca66ad3"}
                                                    ],
                               },
                "interval"  : 3,     
            }
   
    #TimeProcessDisaster(scalr_test,JsonString).start()
    TimeDisasterScheduler("scalr_conn", JsonString["disaster"]["influence_scope"], \
                1,1,JsonString["disaster"]["customized_parameter"]["machine_time"],\
                2895, 10, 1, "520c60ee607ecf7bcedee007", \
                "customized").run()
2013-09-11 09:51
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
得分:0 
class ProcessDisasterScheduler:
    '''
        interval scheduler
    '''
    def __init__(self,scalr_conn, influence_scope, \
                affect_number, interval,max_open_files, \
                system_call_failure_rate,process_name,scenario, max_processes,\
                script_id, timeout, async, disaster_id,\
                disaster_sub_type):
        self.sched = Scheduler(daemonic = False)
        self.pool = threadpool.ThreadPool(5)
        
        self.scalr_conn = scalr_conn
        self.influence_scope = influence_scope
        self.old_affect_vm_list = []

      

        self.affect_number = affect_number
        self.interval = interval
        
        self.script_id = script_id
        self.timeout = timeout
        self.async = async
        self.disaster_id = disaster_id
        self.msg={"action":"start","type":disaster_sub_type,\
                  "process_name":process_name,"max_open_files":max_open_files,\
                  "system_call_failure_rate":system_call_failure_rate,\
                  "scenario":scenario,"max_processes":max_processes}
        logger.info("...............  self.influence_scope %s  " % \
                     self.influence_scope)
        self.affect_vm_list =  random_select(self.influence_scope, \
                                    self.affect_number)

        logger.info("...............  self.affect_vm_list %s  " % \
                     self.affect_vm_list)
        
        
    def run(self):
        '''
            the scheduler main function
        '''
        current_time = (datetime.now()+timedelta(seconds=1))\
                     .strftime("%Y-%m-%d %X")
        self.sched.add_interval_job(self.job_function, seconds=int(self.interval), start_date=str(current_time))
        self.sched.add_listener(self.err_listener, apscheduler.events.EVENT_JOB_ERROR | apscheduler.events.EVENT_JOB_MISSED)
        self.sched.start()
   
    def stop(self):
        '''
            function to stop the interval scheduler
        '''
        current_time = datetime.now()
        data=[]
        self.msg["action"]="stop"
        for each_vm in self.influence_scope:
            data.append([self.scalr_conn, each_vm,\
                                 self.msg,self.script_id, \
                                 self.timeout, self.async, self.disaster_id])
        requests = threadpool.makeRequests(time_process_disaster,data)
        [self.pool.putRequest(req) for req in requests]
        self.pool.wait()
        self.sched.shutdown()
      
    def job_function(self):
        data=[]
        if len(self.old_affect_vm_list) > 0:
            
            self.msg["action"]="stop"
            for each_vm in self.old_affect_vm_list:
                data.append([self.scalr_conn, each_vm,\
                                 self.msg,self.script_id, \
                                 self.timeout, self.async, self.disaster_id])
            print "stop",len(data),data
            requests = threadpool.makeRequests(time_process_disaster,data)
            [self.pool.putRequest(req) for req in requests]
            self.pool.wait()
        self.affect_vm_list = random_select(self.influence_scope, \
                                    self.affect_number)
        self.old_affect_vm_list=self.affect_vm_list
        if not self.affect_vm_list:
            time.sleep(int(self.interval))
        else:
            self.msg["action"]="start"
            data=[]            
            for each_vm in self.affect_vm_list:   
                data.append([self.scalr_conn, each_vm,\
                                 self.msg,self.script_id, \
                                 self.timeout, self.async, self.disaster_id])
            print "start",len(data),data
        requests = threadpool.makeRequests(time_process_disaster,data)
        [self.pool.putRequest(req) for req in requests]
        self.pool.wait()                    
        
    def err_listener(self,ev):
        err_logger = logging.getLogger('schedErrJob')
        if ev.exception:
            err_logger.exception('%s error.', str(ev.job))
        else:
            err_logger.info('%s miss', str(ev.job))
            




if __name__ == '__main__':
    scalr_test = ""
    JsonString={
                "disaster"  : {
                                  "disaster_type":  "SYSTEM",
                                  "disaster_id": "520c60ee607ecf7bcedee007",
                                  "disaster_sub_type": "time_disaster",
                                  "level":  "customized",
                                  "customized_parameter": {
                                                           "affect_rate":100  ,      
                                                           "machine_time":123213423
                                                           },
                                  "influence_scope": [
                                                 {"ip_addr":"192.168.27.112","local_ipv4":"10.168.27.112","farm_id":"10174","instance_id":"i-f0a5ccaa","server_id":"339b73a5-8217-4e4c-848b-3b1cccb62b70"},
                                                 {"ip_addr":"192.168.27.113","local_ipv4":"10.168.27.113","farm_id":"10174","instance_id":"i-3edb5964","server_id":"3af78f18-54c3-431d-952f-15aef8178a1e"},
                                                 {"ip_addr":"192.168.27.114","local_ipv4":"10.168.27.114","farm_id":"10174","instance_id":"i-19decd42","server_id":"cdc60c82-596e-40f9-a41a-a2a25ca66ad3"}
                                                    ],
                               },
                "interval"  : 3,     
            }
   
    Json={
                "disaster"  : {
                                  "disaster_type":  "SYSTEM",
                                  "disaster_id": "520c60ee607ecf7bcedee007",
                                  "disaster_sub_type": "process",
                                  "level":  "customized",
                                  "customized_parameter": {
                                                           "affect_rate":10  ,      
                                                           "process_name": "mysqld",
                                                           "system_call_failure_rate":10,
                                                           "max_open_files":20,
                                                           "max_processes":20,
                                                           "scenario":"resource_limitation"
                                                           },
                                  "influence_scope": [
                                                 {"ip_addr":"192.168.27.112","local_ipv4":"10.168.27.112","farm_id":"10174","instance_id":"i-f0a5ccaa","server_id":"339b73a5-8217-4e4c-848b-3b1cccb62b70"},
                                                 {"ip_addr":"192.168.27.113","local_ipv4":"10.168.27.113","farm_id":"10174","instance_id":"i-3edb5964","server_id":"3af78f18-54c3-431d-952f-15aef8178a1e"},
                                                 {"ip_addr":"192.168.27.114","local_ipv4":"10.168.27.114","farm_id":"10174","instance_id":"i-19decd42","server_id":"cdc60c82-596e-40f9-a41a-a2a25ca66ad3"}
                                                    ],
                               },
                "interval"  : 3,   
            }
    #p=TimeProcessDisaster(scalr_test,Json)
    #p.start()


   
    TimeProcessDisaster(scalr_test,JsonString).start()
    """ p=TimeDisasterScheduler("scalr_conn", JsonString["disaster"]["influence_scope"], \
                1,1,JsonString["disaster"]["customized_parameter"]["machine_time"],\
                2895, 10, 1, "520c60ee607ecf7bcedee007", \
                "customized")
      p.run()
    pass
    pass
    p.stop()"""
2013-09-11 13:11



参与讨论请移步原网站贴子:https://bbs.bccn.net/thread-420423-1-1.html




关于我们 | 广告合作 | 编程中国 | 清除Cookies | TOP | 手机版

编程中国 版权所有,并保留所有权利。
Powered by Discuz, Processed in 0.094039 second(s), 8 queries.
Copyright©2004-2025, BCCN.NET, All Rights Reserved