标题:多进程异步非堵塞的调用问题急
只看楼主
cbd666
Rank: 1
等 级:新手上路
帖 子:29
专家分:0
注 册:2019-9-26
结帖率:66.67%
 问题点数:0 回复次数:5 
多进程异步非堵塞的调用问题急
程序代码:
    time.sleep(3)
    queue = Queue()
    New_start = input(('导入批量url文本:'))
    levels = int(input(('设置扫描等级:')))
    countss = int(input(('设置扫描进程数:')))
    p = multiprocessing.Pool(countss)
    list_ = list(set(
        [x.replace('\n', '') if x.startswith('http') else 'http://' + x.replace('\n', '') for x in
         open(New_start, 'r', encoding='UTF-8').readlines()]))
    for i in list_:
        queue.put(i)
        print(i + " -------> 加入队列 当前队列总任务数:%s" % queue.qsize())
    while not queue.empty():
        # 子进程原来跑 进行测试
        p.apply_async(get_url_sql, args=(queue.get(), levels))
    p.close()
    p.join()
    print("当前队列中的任务数为:%s" % queue.qsize())
    while True:
        if queue.qsize() < 10:
            opm = OPMysql()
            sql = "select url from url_index limit 1,20"
            res = opm.op_select(sql)
            _list = [i[j].replace('\r\n', '').replace(' ', '')
                     for i in res for j in i]
            url_list = list(set(_list))
            for i in url_list:
                time.sleep(0.5)
                queue.put(i)
                print(i + " -------> 加入队列 当前队列总任务数:%s" % queue.qsize())
            opm.dispose()  # 释放资源


哥哥们帮帮忙  现在的情况是这样的。。。 自己做了一个下午了
我自己写的想实现多进程异步  子进程取出队列的url跑sql  主进程就是数据库的交互把url放入队列中  但是这样写好像不行 是哪里错了吗


[此贴子已经被作者于2019-9-26 16:54编辑过]

搜索更多相关主题的帖子: 进程 队列 queue replace for 
2019-09-26 16:50
cbd666
Rank: 1
等 级:新手上路
帖 子:29
专家分:0
注 册:2019-9-26
得分:0 
现在的情况就是一直卡住 动了不动 我用的是from multiprocessing Queue
2019-09-26 16:53
fall_bernana
Rank: 11Rank: 11Rank: 11Rank: 11
等 级:贵宾
威 望:17
帖 子:240
专家分:2086
注 册:2019-8-16
得分:0 
以下是引用cbd666在2019-9-26 16:50:53的发言:

    time.sleep(3)
    queue = Queue()
    New_start = input(('导入批量url文本:'))
    levels = int(input(('设置扫描等级:')))
    countss = int(input(('设置扫描进程数:')))
    p = multiprocessing.Pool(countss)
    list_ = list(set(
        [x.replace('\n', '') if x.startswith('http') else 'http://' + x.replace('\n', '') for x in
         open(New_start, 'r', encoding='UTF-8').readlines()]))
    for i in list_:
        queue.put(i)
        print(i + " -------> 加入队列 当前队列总任务数:%s" % queue.qsize())
    while not queue.empty():
        # 子进程原来跑 进行测试
        p.apply_async(get_url_sql, args=(queue.get(), levels))
    p.close()
    p.join()
    print("当前队列中的任务数为:%s" % queue.qsize())
    while True:
        if queue.qsize() < 10:
            opm = OPMysql()
            sql = "select url from url_index limit 1,20"
            res = opm.op_select(sql)
            _list = .replace('\r\n', '').replace(' ', '')
                     for i in res for j in i]
            url_list = list(set(_list))
            for i in url_list:
                time.sleep(0.5)
                queue.put(i)
                print(i + " -------> 加入队列 当前队列总任务数:%s" % queue.qsize())
            opm.dispose()  # 释放资源


哥哥们帮帮忙  现在的情况是这样的。。。 自己做了一个下午了
我自己写的想实现多进程异步  子进程取出队列的url跑sql  主进程就是数据库的交互把url放入队列中  但是这样写好像不行 是哪里错了吗


卡住了是因为你已经运行到最下面的读取数据库的while True:因为队列值已经大于10了,你一直在循环
要么你从 p.close(),p.join()这里分成2个程序.一个往queue里写入.一个专门用于多进程读取queue数据做处理.
要么把p.close(),p.join()放最后.在数据库读取结果循环这里添加p.apply_async(get_url_sql, args=(queue.get(), levels)),不过这样queue就没必要了
2019-09-26 17:47
cbd666
Rank: 1
等 级:新手上路
帖 子:29
专家分:0
注 册:2019-9-26
得分:0 
回复 3楼 fall_bernana
老哥我按照你的这样写了
程序代码:
    while not queue.empty():
        # 子进程原来跑 进行测试
        p.apply_async(get_url_sql, args=(queue.get(), levels))
        p.apply_async(tttttta, args=(queue,))
    p.close()
    p.join()

还是不行 很着急 弄了一个下午都没好 可能我太笨了 你能不能给个联系QQ 我好问你问题
上面的这样子 结果是加入了队列就直接结束了
2019-09-26 18:54
cbd666
Rank: 1
等 级:新手上路
帖 子:29
专家分:0
注 册:2019-9-26
得分:0 
回复 4楼 cbd666
tttttta函数封装的是数据库存入队列的操作
程序代码:
def tttttta(qqqq):
    print("当前队列中的任务数为:%s" % qqqq.qsize())
    while True:
        if qqqq.qsize() < 10:
            opm = OPMysql()
            sql = "select url from url_index limit 1,20"
            res = opm.op_select(sql)
            _list = [i[j].replace('\r\n', '').replace(' ', '')
                     for i in res for j in i]
            url_list = list(set(_list))
            for i in url_list:
                time.sleep(0.5)
                qqqq.put(i)
                print(i + " -------> 加入队列 当前队列总任务数:%s" % qqqq.qsize())
            opm.dispose()  # 释放资源
2019-09-26 18:55
fall_bernana
Rank: 11Rank: 11Rank: 11Rank: 11
等 级:贵宾
威 望:17
帖 子:240
专家分:2086
注 册:2019-8-16
得分:0 
回复 5楼 cbd666
程序代码:
# -*- coding:utf-8 -*-
from multiprocessing import Pool, Queue, Manager
import time
import os


def run(q):
    
    print("子进程开始,进程ID:%d" % (os.getpid()))
    while True:
        print("循环一次")
        print(" -------> 当前队列 %d 总任务数:%d" % (os.getpid(),q.qsize()))
        if q.qsize() > 0:
            
            qqqq_info = q.get()
            print(" -------> 队列 %d 获取任务:%s" % (os.getpid(),qqqq_info))
            print(" -------> 当前队列 %d 总任务数:%d" %  (os.getpid(),q.qsize()))
            time.sleep(1)
        else:
            print("队列已经取完,跳出循环,子进程结束")
            break
    print("子进程结束,进程ID:%d。" % (os.getpid()))


if __name__ == "__main__":
    print("父进程开始")
    #创建Queue队列管理Manager,有了Manager才能在apply_async里使用参数传递Queue对象,
    m=Manager()
    # 创建Queue队列
    q = m.Queue()
    # 创建多个进程,表示可以同时执行的进程数量。默认大小是CPU的核心数
    p = Pool(4)
    New_start = ["http://www.baidu.com","http://www."]
    for news in New_start:
        q.put(news)
    #for i in range(10):
    
    for i in range(5):
        # 创建进程,放入进程池统一管理,并使用异步非阻塞方式运行
        p.apply_async(run, args=(q,))
    # 如果我们用的是进程池,在调用join()之前必须要先close(),并且在close()之后不能再继续往进程池添加新的进程
    p.close()
    # 进程池对象调用join,会等待进程吃中所有的子进程结束完毕再去结束父进程
    p.join()
    
    print("父进程结束。")

希望对你有帮组

[此贴子已经被作者于2019-9-27 11:00编辑过]

2019-09-27 10:56



参与讨论请移步原网站贴子:https://bbs.bccn.net/thread-497026-1-1.html




关于我们 | 广告合作 | 编程中国 | 清除Cookies | TOP | 手机版

编程中国 版权所有,并保留所有权利。
Powered by Discuz, Processed in 0.117245 second(s), 8 queries.
Copyright©2004-2024, BCCN.NET, All Rights Reserved