《影之刃3》转入礼包兑换规则
2023年12月31日00:00前购买过《影之刃3》产品并且账户里有代币留存的入局者,可在补偿活动期间根据自己的历史充值金额,领取《灵魂潮汐》游戏的相关礼包。领...
2024-11-11
肯定的说下,封面的妹子和本文毫无关系,程序猿出生,多少理解IT界阴气不足的遗憾,所以选了靓丽的妹子,大家先审美后撸代码,不多说了,言归正传。
之前把网盘搜索引擎的代码开源了,收到了很多朋友的感谢,所以打算把之前已经开源的DHT磁力的源码也搬到知乎上来(这个源码是之前ok搜搜的爬虫源码,后面这个网站关闭了,玉米解析到了去转盘网),磁力技术其实很好玩,但由于会涉及一些“净网行动”打击的内容,至少国内目前还没有做的很大的网站,我们也是因为这个原因选择了暂时关闭!好了,也许很多人还不知道磁力,那就百度下吧,我这里把源代码公开下:
不会写程序的朋友也可以使用,不过请先装个linux系统,具备公网条件,然后运行:
python startCrawler.py
有必要提醒你,数据库字段代码中都有,请你自己建张表格,这个太简单了,就不多说了。同时我也提供一下下载地址,源码都在:下载地址1下载地址2
#!/usr/bin/env python
# encoding: utf-8
"""
author:haoning
create time:2015.8.1
"""
import hashlib
import os
import time
import datetime
import traceback
import sys
import random
import json
import socket
import threading
from hashlib import sha1 #进行hash加密
from random import randint
from struct import unpack
from socket import inet_ntoa
from threading import Timer, Thread
from time import sleep
from collections import deque
from Queue import Queue
import MySQLdb as mdb #数据库连接器
import metautils
import downloadTorrent
from bencode import bencode, bdecode
import pygeoip
DB_HOST = '127.0.0.1'
DB_USER = 'root'
DB_PASS = 'root'
BOOTSTRAP_NODES = (
("67.215.246.10", 6881),
("82.221.103.244", 6881),
("23.21.224.150", 6881)
)
RATE = 1 #调控速率
TID_LENGTH = 2
RE_JOIN_DHT_INTERVAL = 3
TOKEN_LENGTH = 2
INFO_HASH_LEN = 500000 #50w数据很小,限制内存不至于消耗太大
CACHE_LEN = 100 #更新数据库缓存
WAIT_DOWNLOAD = 80
geoip = pygeoip.GeoIP('GeoIP.dat')
def is_ip_allowed(ip):
country = geoip.country_code_by_addr(ip)
if country in ('CN','TW','JP','HK', 'KR'):
return True
return False
def entropy(length):
return "".join(chr(randint(0, 255)) for _ in xrange(length))
def random_id():
h = sha1()
h.update(entropy(20))
return h.digest()
def decode_nodes(nodes):
n = []
length = len(nodes)
if (length % 26) != 0:
return n
for i in range(0, length, 26):
nid = nodes[i:i+20]
ip = inet_ntoa(nodes[i+20:i+24])
port = unpack("!H", nodes[i+24:i+26])[0]
n.append((nid, ip, port))
return n
def timer(t, f):
Timer(t, f).start()
def get_neighbor(target, nid, end=10):
return target[:end]+nid[end:]
class KNode(object):
def __init__(self, nid, ip, port):
self.nid = nid
self.ip = ip
self.port = port
class DHTClient(Thread):
def __init__(self, max_node_qsize):
Thread.__init__(self)
self.setDaemon(True)
self.max_node_qsize = max_node_qsize
self.nid = random_id()
self.nodes = deque(maxlen=max_node_qsize)
def send_krpc(self, msg, address):
try:
self.ufd.sendto(bencode(msg), address)
except Exception:
pass
def send_find_node(self, address, nid=None):
nid = get_neighbor(nid, self.nid) if nid else self.nid
tid = entropy(TID_LENGTH)
msg = {
"t": tid,
"y": "q",
"q": "find_node",
"a": {
"id": nid,
"target": random_id()
}
}
self.send_krpc(msg, address)
def join_DHT(self):
for address in BOOTSTRAP_NODES:
self.send_find_node(address)
def re_join_DHT(self):
if len(self.nodes) == 0:
self.join_DHT()
timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)
def auto_send_find_node(self):
wait = 1.0 / self.max_node_qsize
while True:
try:
node = self.nodes.popleft()
self.send_find_node((node.ip, node.port), node.nid)
except IndexError:
pass
try:
sleep(wait)
except KeyboardInterrupt:
os._exit(0)
def process_find_node_response(self, msg, address):
nodes = decode_nodes(msg["r"]["nodes"])
for node in nodes:
(nid, ip, port) = node
if len(nid) != 20: continue
if ip == self.bind_ip: continue
n = KNode(nid, ip, port)
self.nodes.append(n)
class DHTServer(DHTClient): #获得info_hash
def __init__(self, master, bind_ip, bind_port, max_node_qsize):
DHTClient.__init__(self, max_node_qsize)
self.master = master
self.bind_ip = bind_ip
self.bind_port = bind_port
self.speed=0
self.process_request_actions = {
"get_peers": self.on_get_peers_request,
"announce_peer": self.on_announce_peer_request,
}
self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.ufd.bind((self.bind_ip, self.bind_port))
timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)
def run(self):
self.re_join_DHT()
while True:
try:
(data, address) = self.ufd.recvfrom(65536)
msg = bdecode(data)
self.on_message(msg, address)
except Exception:
pass
def on_message(self, msg, address):
global RATE #设为全局量
try:
if msg["y"] == "r":
if msg["r"].has_key("nodes"):
self.process_find_node_response(msg, address) #发现节点
elif msg["y"] == "q":
try:
self.speed+=1
if self.speed % 10000 ==0:
RATE=random.randint(1,3)
if RATE==2:
RATE=1
if RATE==3:
RATE=10
if self.speed>100000:
self.speed=0
if self.speed % RATE==0: #数据过多,占用cpu太多,划分限速,1,1,10
self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取info_hash
#self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取info_hash
except KeyError:
self.play_dead(msg, address)
except KeyError:
pass
def on_get_peers_request(self, msg, address):
try:
infohash = msg["a"]["info_hash"]
tid = msg["t"]
nid = msg["a"]["id"]
token = infohash[:TOKEN_LENGTH]
msg = {
"t": tid,
"y": "r",
"r": {
"id": get_neighbor(infohash, self.nid),
"nodes": "",
"token": token
}
}
self.master.log(infohash, address)
self.send_krpc(msg, address)
except KeyError:
pass
def on_announce_peer_request(self, msg, address):
try:
infohash = msg["a"]["info_hash"]
token = msg["a"]["token"]
nid = msg["a"]["id"]
tid = msg["t"]
if infohash[:TOKEN_LENGTH] == token:
if msg["a"].has_key("implied_port ") and msg["a"]["implied_port "] != 0:
port = address[1]
else:
port = msg["a"]["port"]
self.master.log_announce(infohash, (address[0], port))
except Exception:
print 'error'
pass
finally:
self.ok(msg, address)
def play_dead(self, msg, address):
try:
tid = msg["t"]
msg = {
"t": tid,
"y": "e",
"e": [202, "Server Error"]
}
self.send_krpc(msg, address)
except KeyError:
pass
def ok(self, msg, address):
try:
tid = msg["t"]
nid = msg["a"]["id"]
msg = {
"t": tid,
"y": "r",
"r": {
"id": get_neighbor(nid, self.nid)
}
}
self.send_krpc(msg, address)
except KeyError:
pass
class Master(Thread): #解析info_hash
def __init__(self):
Thread.__init__(self)
self.setDaemon(True)
self.queue = Queue()
self.cache = Queue()
self.count=0
self.mutex = threading.RLock() #可重入锁,使单线程可以再次获得已经获得的?
self.waitDownload = Queue()
self.metadata_queue = Queue()
self.dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'oksousou', charset='utf8')
self.dbconn.autocommit(False)
self.dbcurr = self.dbconn.cursor()
self.dbcurr.execute('SET NAMES utf8')
self.visited = set()
def lock(self): #加锁
self.mutex.acquire()
def unlock(self): #解锁
self.mutex.release()
def work(self,item):
print "start thread",item
while True:
self.prepare_download_metadata()
self.lock()
self.download_metadata()
self.unlock()
self.lock()
self.got_torrent()
self.unlock()
def start_work(self,max):
for item in xrange(max):
t = threading.Thread(target=self.work, args=(item,))
t.setDaemon(True)
t.start()
#入队的种子效率更高
def log_announce(self, binhash, address=None):
if self.queue.qsize() < INFO_HASH_LEN : #大于INFO_HASH_LEN就不要入队,否则后面来不及处理
if is_ip_allowed(address[0]):
self.queue.put([address, binhash]) #获得info_hash
def log(self, infohash, address=None):
if self.queue.qsize() < INFO_HASH_LEN: #大于INFO_HASH_LEN/2就不要入队,否则后面来不及处理
if is_ip_allowed(address[0]):
self.queue.put([address, infohash])
def prepare_download_metadata(self):
if self.queue.qsize() == 0:
sleep(2)
#从queue中获得info_hash用来下载
address, binhash= self.queue.get()
if binhash in self.visited:
return
if len(self.visited) > 100000: #大于100000重置队列,认为已经访问过了
self.visited = set()
self.visited.add(binhash)
#跟新已经访问过的info_hash
info_hash = binhash.encode('hex')
utcnow = datetime.datetime.utcnow()
self.cache.put((address,binhash,utcnow)) #装入缓存队列
def download_metadata(self):
if self.cache.qsize() > CACHE_LEN/2: #出队更新下载
while self.cache.qsize() > 0: #排空队列
address,binhash,utcnow = self.cache.get()
info_hash = binhash.encode('hex')
self.dbcurr.execute('SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))
y = self.dbcurr.fetchone()
if y:
# 更新最近发现时间,请求数
self.dbcurr.execute('UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash))
else:
self.waitDownload.put((address, binhash))
self.dbconn.commit()
if self.waitDownload.qsize() > WAIT_DOWNLOAD:
while self.waitDownload.qsize() > 0:
address,binhash = self.waitDownload.get()
t = threading.Thread(target=downloadTorrent.download_metadata, args=(address, binhash, self.metadata_queue))
t.setDaemon(True)
t.start()
def decode(self, s):
if type(s) is list:
s = ';'.join(s)
u = s
for x in (self.encoding, 'utf8', 'gbk', 'big5'):
try:
u = s.decode(x)
return u
except:
pass
return s.decode(self.encoding, 'ignore')
def decode_utf8(self, d, i):
if i+'.utf-8' in d:
return d[i+'.utf-8'].decode('utf8')
return self.decode(d[i])
def parse_metadata(self, data): #解析种子
info = {}
self.encoding = 'utf8'
try:
torrent = bdecode(data) #编码后解析
if not torrent.get('name'):
return None
except:
return None
detail = torrent
info['name'] = self.decode_utf8(detail, 'name')
if 'files' in detail:
info['files'] = []
for x in detail['files']:
if 'path.utf-8' in x:
v = {'path': self.decode('/'.join(x['path.utf-8'])), 'length': x['length']}
else:
v = {'path': self.decode('/'.join(x['path'])), 'length': x['length']}
if 'filehash' in x:
v['filehash'] = x['filehash'].encode('hex')
info['files'].append(v)
info['length'] = sum([x['length'] for x in info['files']])
else:
info['length'] = detail['length']
info['data_hash'] = hashlib.md5(detail['pieces']).hexdigest()
return info
def got_torrent(self):
if self.metadata_queue.qsize() == 0:
return
binhash, address, data,start_time = self.metadata_queue.get()
if not data:
return
try:
info = self.parse_metadata(data)
if not info:
return
except:
traceback.print_exc()
return
temp = time.time()
x = time.localtime(float(temp))
utcnow = time.strftime("%Y-%m-%d %H:%M:%S",x) # get time now
info_hash = binhash.encode('hex') #磁力
info['info_hash'] = info_hash
# need to build tags
info['tagged'] = False
info['classified'] = False
info['requests'] = 1
info['last_seen'] = utcnow
info['create_time'] = utcnow
info['source_ip'] = address[0]
if info.get('files'):
files = [z for z in info['files'] if not z['path'].startswith('_')]
if not files:
files = info['files']
else:
files = [{'path': info['name'], 'length': info['length']}]
files.sort(key=lambda z:z['length'], reverse=True)
bigfname = files[0]['path']
info['extension'] = metautils.get_extension(bigfname).lower()
info['category'] = metautils.get_category(info['extension'])
try:
try:
print '\n', 'Saved', info['info_hash'], info['name'], (time.time()-start_time), 's', address[0]
except:
print '\n', 'Saved', info['info_hash']
ret = self.dbcurr.execute('INSERT INTO search_hash(info_hash,category,data_hash,name,extension,classified,source_ip,tagged,' +
'length,create_time,last_seen,requests) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',
(info['info_hash'], info['category'], info['data_hash'], info['name'], info['extension'], info['classified'],
info['source_ip'], info['tagged'], info['length'], info['create_time'], info['last_seen'], info['requests']))
if self.count %50 ==0:
self.dbconn.commit()
if self.count>100000:
self.count=0
except:
print self.name, 'save error', self.name, info
traceback.print_exc()
return
if __name__ == "__main__":
#启动客户端
master = Master()
master.start_work(150)
#启动服务器
dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=200)
dht.start()
dht.auto_send_find_node()
其实下载种子还有一种方式就是借助,但这个太耗费cpu了,所以我一般不用他,如下:
#coding: utf8
import threading
import traceback
import random
import time
import os
import socket
import libtorrent as lt
threading.stack_size(200*1024)
socket.setdefaulttimeout(30)
def fetch_torrent(session, ih, timeout):
name = ih.upper()
url = 'magnet:?xt=urn:btih:%s' % (name,)
data = ''
params = {
'save_path': '/tmp/downloads/',
'storage_mode': lt.storage_mode_t(2),
'paused': False,
'auto_managed': False,
'duplicate_is_error': True}
try:
handle = lt.add_magnet_uri(session, url, params)
except:
return None
status = session.status()
handle.set_sequential_download(1)
meta = None
down_time = time.time()
down_path = None
for i in xrange(0, timeout):
if handle.has_metadata():
info = handle.get_torrent_info()
down_path = '/tmp/downloads/%s' % info.name()
#print 'status', 'p', status.num_peers, 'g', status.dht_global_nodes, 'ts', status.dht_torrents, 'u', status.total_upload, 'd', status.total_download
meta = info.metadata()
break
time.sleep(1)
if down_path and os.path.exists(down_path):
os.system('rm -rf "%s"' % down_path)
session.remove_torrent(handle)
return meta
def download_metadata(address, binhash, metadata_queue, timeout=20):
metadata = None
start_time = time.time()
try:
session = lt.session()
r = random.randrange(10000, 50000)
session.listen_on(r, r+10)
session.add_dht_router('router.bittorrent.com',6881)
session.add_dht_router('router.utorrent.com',6881)
session.add_dht_router('dht.tran *** ission.com',6881)
session.add_dht_router('127.0.0.1',6881)
session.start_dht()
metadata = fetch_torrent(session, binhash.encode('hex'), timeout)
session = None
except:
traceback.print_exc()
finally:
metadata_queue.put((binhash, address, metadata,start_time))
整个代码其实耗费了很多互联网界的朋友大量心血,本人也只是做了一点工作,请看到这篇博客的朋友保持钻研精神,开源精神,多多交流,秉承分享!本文之前在其他网站也发布过,这次只是做了转移工作!
版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,请告知我们,本站将立刻删除涉嫌侵权内容。
相关文章
2023年12月31日00:00前购买过《影之刃3》产品并且账户里有代币留存的入局者,可在补偿活动期间根据自己的历史充值金额,领取《灵魂潮汐》游戏的相关礼包。领...
2024-11-11
Steam新品节隆重开幕!Steam新品节这一多日庆典为即将推出的游戏而打造,您可以在畅玩下一款新游戏中体验来自新老开发者的游戏。 即日起至6月22日上午10时...
2024-11-11
十万个大魔王是一款非常好玩的rpg动作游,支持收集各种魔法卡配合战斗。也可以加入公会,展开激战。在这个游戏中,我们可以解锁不同的风景,在魔法书的世界中探索,通过...
2024-11-11
吾名软件库app是一款很不错的资源软件库,在吾名软件库app中拥有许多正版的游戏资源,还有一些破解版游戏,用户想要的话可以直接下载,全是免费的,不需要一分钱...
2024-11-11
热评文章
2022年专属火龙之神途新版
1.80龙神合击传奇
1.76永恒小极品+5复古传奇
1.76双倍魔天大极品第三季单职业
1.76神梦传奇三职业
1.80聖统圣统合击三职业传奇