serv00邮箱自主申请系统整理完毕开源,但是运行不了🥲

从这个继续

玄学之在我的服务器上能跑,换了一台就不行了,但是大致的内容以及大差不差,给出的版本是我改进过的,把所有的敏感信息改到了.env里面。但是可能因为在改的过程中没注意缩进,有一些语法错误跑不起来。所以这里再放一个原来的版本仅供参考。

from flask import Flask, render_template, request, redirect, session, url_for, flash
import requests
from bs4 import BeautifulSoup
import os
from dotenv import load_dotenv
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime

# 加载 .env 文件中的环境变量
load_dotenv()

app = Flask(__name__)
app.secret_key = os.urandom(24)  # 用于保护会话

# 配置数据库连接
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://email_user3_biz:数据库密码@localhost/email_user3_biz'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

# 配置 OAuth 相关信息
client_id = os.getenv('OAUTH_CLIENT_ID')
client_secret = os.getenv('OAUTH_CLIENT_SECRET')  # 从环境变量获取 Client Secret
token_url = 'https://connect.linux.do/oauth2/token'
user_info_url = 'https://connect.linux.do/api/user'

# 定义用户模型
class User(db.Model):
    id = db.Column(db.String(50), primary_key=True)  # 假设用户 ID 是字符串类型
    username = db.Column(db.String(100), nullable=False)
    trust_level = db.Column(db.Integer, nullable=False)
    emails = db.relationship('Email', backref='user', lazy=True)

    def __init__(self, id, username, trust_level):
        self.id = id
        self.username = username
        self.trust_level = trust_level

# 定义邮箱模型
class Email(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    email_address = db.Column(db.String(100), nullable=False)
    password = db.Column(db.String(100), nullable=False)
    user_id = db.Column(db.String(50), db.ForeignKey('user.id'), nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)  # 记录邮箱创建时间

# OAuth 回调处理
@app.route('/oauth/callback')
def oauth_callback():
    code = request.args.get('code')
    if code is None:
        return "Authorization failed.", 400

    # 使用授权码交换访问令牌
    token_data = {
        'grant_type': 'authorization_code',
        'code': code,
        'redirect_uri': url_for('oauth_callback', _external=True),
        'client_id': client_id,
        'client_secret': client_secret,
    }

    token_response = requests.post(token_url, data=token_data)
    token_response_data = token_response.json()

    if 'access_token' not in token_response_data:
        return "Failed to retrieve access token.", 400

    access_token = token_response_data['access_token']

    # 使用访问令牌获取用户信息
    headers = {'Authorization': f'Bearer {access_token}'}
    user_info_response = requests.get(user_info_url, headers=headers)
    user_info = user_info_response.json()

    user_id = user_info.get('id')
    username = user_info.get('username')
    trust_level = user_info.get('trust_level')

    # 将用户信息保存到 session
    session['user'] = {
        'id': user_id,
        'username': username,
        'trust_level': trust_level
    }

    # 检查用户是否已存在
    user = User.query.get(user_id)
    if user is None:
        # 如果用户不存在,则创建新用户
        user = User(id=user_id, username=username, trust_level=trust_level)
        db.session.add(user)
    else:
        # 如果用户已存在,则更新信息
        user.username = username
        user.trust_level = trust_level

    try:
        db.session.commit()  # 提交更改
    except Exception as e:
        print(f"Error committing to the database: {e}")  # 错误调试信息

    # 重定向到主页面
    return redirect(url_for('index'))

# 创建邮箱的主页面
@app.route('/', methods=['GET', 'POST'])
def index():
    if 'user' not in session:
        # 如果用户未登录,显示一个提示,而不是直接跳转到 OAuth 页面
        flash("请先登录以创建邮箱。")
        return render_template('index.html', today_email_count=None, max_daily_email_limit=None)

    # 获取当前登录的用户
    user = User.query.get(session['user']['id'])

    # 获取今天的日期
    today = datetime.utcnow().date()

    # 查询今天创建的邮箱数量
    today_email_count = Email.query.filter(
        Email.user_id == user.id,
        Email.created_at >= today
    ).count()

    max_daily_email_limit = 6  # 每天最多创建 6 个邮箱

    if today_email_count >= max_daily_email_limit:
        flash(f'您今天已经创建了 {today_email_count} 个邮箱,达到每日创建上限 {max_daily_email_limit} 个。', 'warning')
        return render_template('index.html', today_email_count=today_email_count, max_daily_email_limit=max_daily_email_limit)

    if request.method == 'POST':
        email_username = request.form['username']
        email_password = request.form['password']
        domain = request.form['domain']  # 获取用户选择的域名

        # 检查数据库中是否已经存在相同的邮箱
        email_address = f"{email_username}{domain}"
        existing_email = Email.query.filter_by(email_address=email_address).first()

        if existing_email:
            flash('当前邮箱已存在,请选择其他邮箱名称。')
            return redirect(url_for('index'))

        account_info = {
            "username": "",
            "password": "*",
            "panel": ""
        }

        result = login_and_create_email(account_info, email_username, email_password, domain)
        if result["success"]:
            new_email = Email(email_address=result['email'], password=result['password'], user_id=user.id)
            db.session.add(new_email)
            db.session.commit()
            flash(f'邮箱 {new_email.email_address} 创建成功!', 'success')
            return redirect(url_for('index'))
        else:
            flash(f'邮箱创建失败: {result["message"]}', 'danger')

    return render_template('index.html', today_email_count=today_email_count, max_daily_email_limit=max_daily_email_limit)

# 登录处理
@app.route('/login')
def login():
    # 重定向到 OAuth 授权页面
    clientId = client_id
    redirectUri = url_for('oauth_callback', _external=True)
    authUrl = f"https://connect.linux.do/oauth2/authorize?client_id={clientId}&response_type=code&redirect_uri={redirectUri}"
    return redirect(authUrl)

import logging

@app.route('/user_center')
def user_center():
    if 'user' not in session:
        return redirect(url_for('login'))

    # 获取最新的用户信息和邮箱数据
    user = User.query.get(session['user']['id'])
    emails = Email.query.filter_by(user_id=user.id).all()

    # 添加调试信息
    print(f"User: {user.username}, Emails: {[email.email_address for email in emails]}")

    return render_template('user_center.html', user=user, emails=emails)


@app.route('/logout')
def logout():
    session.pop('user', None)  # 移除会话中的用户信息
    session.pop('email_username', None)
    session.pop('email_password', None)
    return redirect(url_for('index'))

# 邮箱创建逻辑
def login_and_create_email(account, email_username, email_password, domain):
    session = requests.Session()
    user_agent = "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1"
    login_url = f"https://{account['panel']}/login/?next=/mail/add"
    email_creation_url = f"https://{account['panel']}/mail/add"

    try:
        # 获取登录页面和CSRF token
        login_page = session.get(login_url, headers={"User-Agent": user_agent})
        soup = BeautifulSoup(login_page.content, 'html.parser')
        csrf_token = soup.find('input', {'name': 'csrfmiddlewaretoken'})['value']

        # 提交登录表单
        login_data = {
            'username': account['username'],
            'password': account['password'],
            'csrfmiddlewaretoken': csrf_token,
            'next': '/mail/add'
        }
        login_response = session.post(login_url, data=login_data, headers={
            "User-Agent": user_agent,
            "Referer": login_url,
            "Content-Type": "application/x-www-form-urlencoded"
        })

        # 创建邮箱
        if "Dodaj nowy adres e-mail" in login_response.text or "/mail/add" in login_response.url:
            mail_add_page = session.get(email_creation_url, headers={"User-Agent": user_agent})
            soup = BeautifulSoup(mail_add_page.content, 'html.parser')
            new_csrf_token = soup.find('input', {'name': 'csrfmiddlewaretoken'})['value']

            email = f"{email_username}{domain}"

            email_data = {
                'csrfmiddlewaretoken': new_csrf_token,
                'email': email,
                'password1': email_password,
                'password2': email_password
            }

            email_creation_response = session.post(email_creation_url, data=email_data, headers={
                "User-Agent": user_agent,
                "Referer": email_creation_url,
                "Content-Type": "application/x-www-form-urlencoded"
            })

            if email_creation_response.status_code == 200:
                return {"success": True, "email": email, "password": email_password}
            else:
                raise Exception("邮箱创建失败")
        else:
            raise Exception("登录失败")

    except Exception as e:
        return {"success": False, "message": str(e)}

if __name__ == '__main__':
    with app.app_context():
        db.create_all()  # 创建表
    app.run(debug=True)

18 Likes

要是有教程就好了

我想写,但是他**跑不起来,我再改改吧

期待,,,,

大佬太强了 :tieba_087:

还得靠3b最开始的源码 :tieba_087:

感谢佬的分享

感谢大佬分享

技术贴感谢分享

不错,支持下大佬