造了个多线程代理的轮子

为什么要造轮子?

之前分享的多线程播放的方案存在许多局限性,不支持自定义header、cookie,也不支持多源、不能自定义连接数。联系一下我最近的帖子,你猜我为什么要照这个轮子 :sweat_smile:

这个轮子有什么用?

服务端提供的直链往往会有一些限制:

  • 需要校验请求头(可能是UA或者cookie)
  • 限制了单线程下载速度
  • 限制线程数量
  • 限制分片大小

服务端做这些限制的主要目的是防止你使用播放器播放,下载限速,通过降低用户体验的方式将用户导向付费通道。
为了突破上述的限制,我造了这个轮子,主要有这些功能:

  • 支持多源地址
  • 自定义线程数量加速
  • 自定义请求头/cookie
  • 自定义分片大小
  • 支持代理url,可以使得播放器无感使用多线程

老规矩,直接贴源码:
python依赖包

httpx
fastapi
humanfriendly
tqdm

源码

import concurrent
import re
from abc import abstractmethod
# from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from multiprocessing import Pipe
from multiprocessing import Process

# import requests
import httpx
import uvicorn
from fastapi import FastAPI
from fastapi import HTTPException
from fastapi import Request
from fastapi.responses import StreamingResponse
from humanfriendly import parse_size
from tqdm import tqdm


# 多源地址、多线程、播放代理

# todo
# 负载均衡、代理池、边下边播

class Source:
    @abstractmethod
    def get(self, begin, end):
        pass

    @abstractmethod
    def info(self):
        pass


class URLSource(Source):
    def __init__(self, url, headers, cookies, conns=8):
        self.url = url
        self.headers = headers
        self.cookies = cookies
        # request 客户端
        # self.session = requests.Session()
        # from requests.packages.urllib3.util.retry import Retry
        # retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
        # adapter = HTTPAdapter(pool_connections=100, pool_maxsize=10, max_retries=retries)
        # self.session.mount('http://', adapter)
        # self.session.mount('https://', adapter)
        # httpx 客户端
        self.session = httpx.Client(
            limits=httpx.Limits(max_connections=conns, max_keepalive_connections=conns)
        )

        self.session.headers.update(self.headers)
        self.session.cookies.update(self.cookies)

    def get(self, begin, end):
        return self.session.get(
            self.url,
            headers={'Range': f'bytes={begin}-{end}'}
        ).content, begin, end

    def info(self):
        resp = self.session.get(self.url, headers={'Range': 'bytes=0-0'})
        # 获取响应类型
        content_type = resp.headers['Content-Type']
        # 获取文件名称
        file_name = resp.headers['Content-Disposition'].split('"')[-2]
        try:
            file_name = file_name.encode('iso-8859-1').decode()
        except UnicodeEncodeError:
            pass
        # 获取文件大小
        length = int(resp.headers['Content-Range'].split('/')[-1])
        return content_type, file_name, length


class Selector:
    def __init__(self, targets):
        self.targets = targets

        # 轮询
        def loop():
            while True:
                for target in targets:
                    yield target

        self.gen = loop()

    def select(self):
        return next(self.gen)


class SourceGroup(Source):
    def __init__(self, selector: Selector):
        self.selector = selector

    def get(self, begin, end):
        return self.selector.select().get(begin, end)

    def info(self):
        return self.selector.select().info()


class Spliter:
    def __init__(self, *, begin=None, end=None, length=None):
        if begin is not None and end is not None:
            self.begin = begin
            self.length = end - begin + 1
        elif length:
            self.begin = 0
            self.length = length
        else:
            raise Exception('切片器参数不全')

    def iter(self, *, split: int | str = '5M', begin=None, end=None):
        if isinstance(split, str):
            split = parse_size(split, True)
        if not begin:
            begin = self.begin
        if not end:
            end = self.begin + self.length - 1

        def gen():
            left, right = begin, begin + split - 1

            while right <= end:
                yield left, right
                left, right = right + 1, right + split
            if left <= end:
                yield left, end

        return gen()

    def sub_split(self, trunk: int | str = '10M'):
        if isinstance(trunk, str):
            trunk = parse_size(trunk, True)

        def gen():
            for begin, end in self.iter(split=trunk):
                yield Spliter(begin=begin, end=end)

        return gen()


def write_task(pipe, file_name):
    msg = pipe.recv()
    with open(file_name, "rb+") as f:
        while msg is not None:
            data, index = msg
            f.seek(index)
            f.write(data)
            msg = pipe.recv()
    pipe.send(None)


class URLProxy:
    def __init__(
            self,
            urls,
            headers=None,
            cookies=None,
            trunk: str | int = '8M',
            split: str | int = '1M',
            conns=8
    ):
        if cookies is None:
            cookies = dict()
        if headers is None:
            headers = dict()
        if isinstance(trunk, str):
            trunk = parse_size(trunk, True)
        self.trunk = trunk
        if isinstance(split, str):
            split = parse_size(split, True)
        self.split = split
        if isinstance(urls, list):
            self.source = SourceGroup(Selector([URLSource(url, headers, cookies, conns) for url in urls]))
            self.workers = conns * len(urls)
        else:
            self.source = URLSource(urls, headers, cookies, conns)
            self.workers = conns
        self.content_type, self.file_name, self.length = self.source.info()

    def stream(self, *, begin=None, end=None, split=None):
        if not begin:
            begin = 0
        if not end:
            end = self.length - 1
        if not split:
            split = self.split
        spliter = Spliter(begin=begin, end=end)
        # executor = ProcessPoolExecutor(max_workers=self.workers)
        executor = ThreadPoolExecutor(max_workers=self.workers)

        for future in concurrent.futures.as_completed(
                [executor.submit(self.source.get, b, e) for b, e in spliter.iter(split=split)]
        ):
            content, b, e = future.result()
            yield content, b, e

    def sorted_stream(self, *, begin=None, end=None, trunk=None, split=None):
        if not begin:
            begin = 0
        if not end:
            end = self.length - 1
        if not trunk:
            trunk = self.trunk
        if not split:
            split = self.split
        spliter = Spliter(begin=begin, end=end)
        for l, r in spliter.iter(split=trunk):  # noqa: E741
            content = BytesIO()
            for data, b, e in self.stream(begin=l, end=r, split=split):
                content.seek(b - l)
                content.write(data)
            yield content.getvalue()

    def download(self):
        print('开始下载', self.file_name, '...')
        with open(self.file_name, "wb"):
            pass

        main, sub = Pipe()

        sub_task = Process(target=write_task, args=(sub, self.file_name))
        sub_task.start()
        tqdm_obj = tqdm(total=self.length + 100, unit_scale=True)
        for content, b, e in self.stream(split=self.split):
            main.send((content, b))
            tqdm_obj.update(len(content))
        main.send(None)
        main.recv()
        tqdm_obj.update(100)
        print('下载完成')

    def proxy(self, host='127.0.0.1', port=9999):

        app = FastAPI()

        @app.get("/")
        async def play(request: Request):
            range_str = request.headers.get("Range")
            size = self.length
            if not range_str:
                end = min(self.split, size) - 1
                return StreamingResponse(self.sorted_stream(begin=0, end=end), headers={
                    'Content-Range': f'bytes 0-{end}/{size}',
                })
            match = re.compile(r'bytes=(\d+)-(\d*)').match(range_str)
            begin, end = match.groups()
            begin = int(begin) if begin else 0
            # end = int(end) if end else size - 1
            # 调整缓冲范围
            # begin = max(0, begin - self.split)
            end = min(begin + self.trunk, size) - 1
            try:
                return StreamingResponse(self.sorted_stream(begin=begin, end=end), headers={
                    'Content-Range': f'bytes {begin}-{end}/{size}',
                })
            except Exception:
                raise HTTPException(status_code=404)

        uvicorn.run(app, host=host, port=port)


if __name__ == '__main__':
    # urls = 'your url link'
    # obj = URLProxy(urls=urls, trunk='8M', split='1M', conns=2, headers=headers)
    # obj.download()
    # obj.proxy()
    pass

贴个源码仓库

未实现的功能(后面有时间加)
  • 负载均衡
  • 代理池
  • 边下边播
使用提示
  • 同样的资源从多个网盘(也可以同一个网盘多个账号)分别提取直链,然后使用本工具代理得到一个url,接下来就可以使用播放器直接在线播放这个代理地址了。
31 个赞

牛!

1 个赞

大佬牛:beer:

1 个赞

感谢大佬

1 个赞

Qiang!

11 个赞

不多也就一,备注你是阿里云盘专家了

大佬强

游走在封号边缘

不明觉厉

太强了!

点个赞 :heart:

给大佬点赞

大佬们真的牛,来学习一下

厉害,要是能集成到alist里就更好了 :rofl:

牛!

牛!

我试着拿来放pikpak的视频,感觉不大好使,速度还没直接加载原来的地址快 :cry:

From 软件开发 to 开发调优