|
|
- from flask import Flask, request, render_template, send_from_directory, abort, redirect
- from flask_sqlalchemy import SQLAlchemy
- from flask_limiter import Limiter
- from flask_limiter.util import get_remote_address
- from flask_migrate import Migrate
- from mastodon import Mastodon
- import re
- import random
- import string
- import datetime
- from dateutil.tz import tzlocal
-
- BOT_NAME = '@ask_me_bot'
- CLIENT_ID = 'WQHzKKvfahkkcFm_iErT6ZdYvczi8L6Uunsoa88bCKA'
- CLIENT_SEC = open('client.secret', 'r').read().strip()
-
- DOMAIN = 'thu.closed.social'
-
- MENTION_BOT_TEMP = re.compile(r'<span class=\"h-card\"><a href=\"https://thu.closed.social/@ask_me_bot\" class=\"u-url mention\">@<span>.*?</span></a></span>')
- DELETE_TEMP = re.compile(r'<p>\s*删除\s*</p>')
-
- WORK_URL = 'https://closed.social'
- # WORK_URL = 'http://127.0.0.1:5000'
-
- REDIRECT_URI = WORK_URL + '/askMe/auth'
-
- token = open('token.secret', 'r').read().strip()
- th = Mastodon(
- access_token=token,
- api_base_url='https://' + DOMAIN
- )
-
-
- app = Flask(__name__)
- app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///ask.db'
- app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
- app.config['JSON_AS_ASCII'] = False
-
- limiter = Limiter(
- app,
- key_func=get_remote_address,
- default_limits=["50 / minute"],
- )
-
- db = SQLAlchemy(app)
- migrate = Migrate(app, db)
-
-
- class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- acct = db.Column(db.String(64))
- disp = db.Column(db.String(64))
- avat = db.Column(db.String(256))
- url = db.Column(db.String(128))
- secr = db.Column(db.String(16))
- root = db.Column(db.BigInteger)
-
- def __init__(self, acct):
- self.acct = acct
-
- def __repr__(self):
- return f"@{self.acct}[{self.disp}]"
-
-
- class Question(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- acct = db.Column(db.String(64))
- content = db.Column(db.String(400))
- time = db.Column(db.DateTime)
- toot = db.Column(db.BigInteger)
-
- def __init__(self, acct, content, toot):
- self.acct = acct
- self.content = content
- self.toot = toot
- self.time = datetime.datetime.now()
-
- def __repr__(self):
- return f"[{self.acct}]{self.content}({self.toot})"
-
-
- @app.route('/js/<path:path>')
- def send_js(path):
- return send_from_directory('static/js', path)
-
-
- @app.route('/img/<path:path>')
- def send_img(path):
- return send_from_directory('static/img', path)
-
-
- @app.route('/askMe/')
- def root():
- return app.send_static_file('ask.html')
-
-
- @app.route('/askMe/footer.html')
- def root_footer():
- return app.send_static_file('footer.html')
-
-
- @app.route('/askMe/auth')
- @limiter.limit("10 / minute")
- def set_inbox_auth():
- code = request.args.get('code')
- autoSend = request.args.get('autoSend')
- secr = request.args.get('secr')
-
- if secr and not re.match('[a-z]{0,16}', secr):
- abort(422)
-
- client = Mastodon(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SEC,
- api_base_url='https://' + DOMAIN
- )
- client.log_in(
- code=code,
- redirect_uri=f"{REDIRECT_URI}?autoSend={autoSend or ''}&secr={secr or ''}",
- scopes=[
- 'read:accounts',
- 'write:statuses'
- ] if autoSend else ['read:accounts']
- )
-
- info = client.account_verify_credentials()
-
- acct = info.acct
- u = User.query.filter_by(acct=acct).first()
- if not u:
- u = User(acct)
- db.session.add(u)
-
- u.secr = secr or u.secr or ''.join(random.choice(
- string.ascii_lowercase) for i in range(16))
-
- u.disp = info.display_name
- u.url = info.url
- u.avat = info.avatar
-
- if autoSend:
- client.status_post(
- f"[自动发送] 我创建了一个匿名提问箱,欢迎提问~\n{WORK_URL}/askMe/{acct}/{u.secr}",
- visibility='public')
-
- db.session.commit()
-
- return redirect(f"/askMe/{acct}/{u.secr}")
-
-
- @app.route('/askMe/inbox', methods=['POST'])
- @limiter.limit("10 / minute")
- def set_inbox():
- acct = request.form.get('username')
- if not re.match('[A-Za-z0-9_]{1,30}(@[a-z\\.-_]+)?', acct):
- return '无效的闭社id', 422
-
- r = th.conversations()
- for conv in r:
- status = conv.last_status
- account = status.account
- if acct == account.acct:
- pt = status.content.strip()
-
- x = re.findall(r'新建(\[[a-z]{1,32}\])?', pt)
- if not x:
- return '私信格式无效,请检查并重新发送', 422
-
- secr = x[0][1:-1] if x[0] else ''.join(
- random.choice(string.ascii_lowercase) for i in range(16))
-
- u = User.query.filter_by(acct=acct).first()
- if not u:
- u = User(acct)
- db.session.add(u)
-
- u.disp = account.display_name
- u.url = account.url
- u.avat = account.avatar
- u.secr = secr
- db.session.commit()
-
- th.status_post(f"@{acct} 设置成功! 当前提问箱链接 {WORK_URL}/askMe/{acct}/{secr}\n(如需在微信等无链接预览的平台分享,建议先发给自己,点开,再点击分享到朋友圈等)",
- in_reply_to_id=status.id,
- visibility='direct'
- )
-
- return acct + '/' + secr
-
- return '未找到私信,请确认已发送且是最近发送', 404
-
-
- @app.route('/askMe/<acct>/<secr>/')
- def inbox(acct, secr):
- u = User.query.filter_by(acct=acct, secr=secr).first_or_404()
-
- qs = [{
- 'content': q.content,
- 'toot': q.toot,
- 'time': q.time.replace(tzinfo=tzlocal())
- } for q in Question.query.filter_by(acct=acct).all()
- ]
-
- return render_template('inbox.html', acct=u.acct, disp=(
- u.disp or u.acct), url=u.url, avat=u.avat, qs=qs)
-
-
- @app.route('/askMe/<acct>/<secr>/new', methods=['POST'])
- @limiter.limit("50 / hour; 1 / 2 second")
- def new_question(acct, secr):
- u = User.query.filter_by(acct=acct, secr=secr).first()
- if not u:
- abort(404)
-
- content = request.form.get('question')
- if not content or len(content) > 400:
- abort(422)
-
- if not Question.query.filter_by(acct=acct, content=content).first():
-
- if not u.root:
- toot = th.status_post(
- f"@{acct} 欢迎使用匿名提问箱。未来的新提问会集中显示在这里,方便管理。戳我头像了解如何回复。",
- visibility='direct')
- u.root = toot.id
-
- toot = th.status_post(f"@{acct} 叮~ 有新提问:\n\n{content}",
- in_reply_to_id=u.root,
- visibility='direct'
- )
-
- q = Question(acct, content, toot.id)
- db.session.add(q)
- db.session.commit()
-
- return redirect(".")
-
-
- def render_content(text, emojis):
- text = MENTION_BOT_TEMP.sub('', text)
- for emoji in emojis:
- text = text.replace(':%s:' % emoji.shortcode, '<img class="emoji" src="%s">' % emoji.url)
-
- return text
-
-
- @app.route('/askMe/<acct>/<secr>/<int:toot>')
- def question_info(acct, secr, toot):
- q = Question.query.filter_by(acct=acct, toot=toot).first()
- if not q or not User.query.filter_by(acct=acct, secr=secr).first():
- abort(404)
-
- context = th.status_context(toot)
- replies = [
- {
- 'disp': (t.account.display_name or t.account.acct),
- 'url': t.account.url,
- 'content': render_content(t.content, t.emojis),
- 'time': str(t.created_at),
- 'media': t.media_attachments,
- }
- for t in context.descendants
- ]
-
- if replies and DELETE_TEMP.match(replies[-1].get('content')):
- db.session.delete(q)
- db.session.commit()
- th.status_delete(toot)
- return '该提问已被删除', 404
-
- return {'replies': replies}
-
-
- if __name__ == '__main__':
- app.run(debug=True)
|