|
本帖最后由 bean2582004 于 2014-5-13 11:09 编辑
有用过论坛上这个脚本多拨成功的兄弟么?- # -*- coding: UTF-8 -*-
- '''
- author : lycecom#gmail.com
- date : 2013-11-01
- 为 ikuai 软路由编写的多拨脚本
- 适用于基于物理网卡的一号 一卡/多卡 多拨
- '''
- import threading
- import time,urllib,urllib2,sys,cookielib
- ip = "192.168.1.1:8000" #你自己的ikuai路由内网WEB管理地址
- user = "root" #修改为你自己的管理员名
- passwd = "XXX" #修改为自己的密码
- wans = 1 #物理网卡数
- vids = 2 #每物理网卡多拨数
- g_func_list = []
- cj = cookielib.CookieJar()
- def login():
- i = 0
- while True and i<10: #为防止开机后DHCP未能及时分配地址此处进行了多次尝试登录,i为重试次数,每次重试间隔20秒
- req = urllib2.Request("http://%s/cgi-bin/logProcess.php"%ip)
- data = urllib.urlencode({'username':user,'password':passwd})
- opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
- urllib2.install_opener(opener)
- try:
- response = opener.open(req, data,timeout=3)
- except urllib2.URLError, e:
- time.sleep(20)
- i+=1
- continue
- else:
- break
- def loopMe(): #多卡
- for i in range(wans):
- for n in range(1,vids+1):
- vid = i*vids+n
- req = urllib2.Request("http://%s/cgi-bin/wanSetProcess.php?old_internet=MAC_VLAN&interface=wan%s&vid_ty=%s"%(ip,i+1,vid))
- urllib2.urlopen(req,timeout=3)
- print "wan%s vid%s was closed %s"%(i+1,vid,time.time())
- g_func_list.append({"func":dial,"args":(i+1,vid)})
- def dial(wan,vid):
- req = urllib2.Request("http://%s/cgi-bin/wanSetProcess.php?old_internet=MAC_VLAN&interface=wan%s&vid_qy=%s"%(ip,wan,vid))
- urllib2.urlopen(req,timeout=3)
- print "starting up wan%s vid%s time:%s\n"%(wan,vid,time.time())
- #content = urllib2.urlopen(req).read()
- #type = sys.getfilesystemencoding()
- #print content.decode("UTF-8").encode(type)
- class MyThread(object):
- def __init__(self, func_list=None):
- #所有线程函数的返回值汇总,如果最后为0,说明全部成功
- self.ret_flag = 0
- self.func_list = func_list
- self.threads = []
-
- def set_thread_func_list(self, func_list):
- """
- @note: func_list是一个list,每个元素是一个dict,有func和args两个参数
- """
- self.func_list = func_list
-
- def start(self):
- """
- @note: 启动多线程执行,并阻塞到结束
- """
- self.threads = []
- self.ret_flag = 0
- for func_dict in self.func_list:
- if func_dict["args"]:
- t = threading.Thread(target=func_dict["func"], args=func_dict["args"])
- else:
- t = threading.Thread(target=func_dict["func"])
- self.threads.append(t)
-
- for thread_obj in self.threads:
- thread_obj.start()
-
- for thread_obj in self.threads:
- thread_obj.join()
-
- def ret_value(self):
- """
- @note: 所有线程函数的返回值之和,如果为0那么表示所有函数执行成功
- """
- return self.ret_flag
- login()
- mt = MyThread()
- mt.set_thread_func_list(g_func_list)
- loopMe()
- time.sleep(5)
- mt.start()
复制代码
|
|