匿名提问箱
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

275 lines
7.7 KiB

from flask import Flask, request, render_template, send_from_directory, abort, redirect
from flask_sqlalchemy import SQLAlchemy
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_migrate import Migrate
from mastodon import Mastodon
import re
import random
import string
import datetime
from dateutil.tz import tzlocal
BOT_NAME = '@ask_me_bot'
CLIENT_ID = 'WQHzKKvfahkkcFm_iErT6ZdYvczi8L6Uunsoa88bCKA'
CLIENT_SEC = open('client.secret', 'r').read().strip()
DOMAIN = 'thu.closed.social'
MENTION_BOT_TEMP = re.compile(r'<span class=\"h-card\"><a href=\"https://thu.closed.social/@ask_me_bot\" class=\"u-url mention\">@<span>.*?</span></a></span>')
DELETE_TEMP = re.compile(r'<p>\s*删除\s*</p>')
WORK_URL = 'https://closed.social'
# WORK_URL = 'http://127.0.0.1:5000'
REDIRECT_URI = WORK_URL + '/askMe/auth'
token = open('token.secret', 'r').read().strip()
th = Mastodon(
access_token=token,
api_base_url='https://' + DOMAIN
)
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///ask.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['JSON_AS_ASCII'] = False
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["50 / minute"],
)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
acct = db.Column(db.String(64))
disp = db.Column(db.String(64))
avat = db.Column(db.String(256))
url = db.Column(db.String(128))
secr = db.Column(db.String(16))
root = db.Column(db.BigInteger)
def __init__(self, acct):
self.acct = acct
def __repr__(self):
return f"@{self.acct}[{self.disp}]"
class Question(db.Model):
id = db.Column(db.Integer, primary_key=True)
acct = db.Column(db.String(64))
content = db.Column(db.String(400))
time = db.Column(db.DateTime)
toot = db.Column(db.BigInteger)
def __init__(self, acct, content, toot):
self.acct = acct
self.content = content
self.toot = toot
self.time = datetime.datetime.now()
def __repr__(self):
return f"[{self.acct}]{self.content}({self.toot})"
@app.route('/js/<path:path>')
def send_js(path):
return send_from_directory('static/js', path)
@app.route('/img/<path:path>')
def send_img(path):
return send_from_directory('static/img', path)
@app.route('/askMe/')
def root():
return app.send_static_file('ask.html')
@app.route('/askMe/footer.html')
def root_footer():
return app.send_static_file('footer.html')
@app.route('/askMe/auth')
@limiter.limit("10 / minute")
def set_inbox_auth():
code = request.args.get('code')
autoSend = request.args.get('autoSend')
secr = request.args.get('secr')
if secr and not re.match('[a-z]{0,16}', secr):
abort(422)
client = Mastodon(
client_id=CLIENT_ID,
client_secret=CLIENT_SEC,
api_base_url='https://' + DOMAIN
)
client.log_in(
code=code,
redirect_uri=f"{REDIRECT_URI}?autoSend={autoSend or ''}&secr={secr or ''}",
scopes=[
'read:accounts',
'write:statuses'
] if autoSend else ['read:accounts']
)
info = client.account_verify_credentials()
acct = info.acct
u = User.query.filter_by(acct=acct).first()
if not u:
u = User(acct)
db.session.add(u)
u.secr = secr or u.secr or ''.join(random.choice(
string.ascii_lowercase) for i in range(16))
u.disp = info.display_name
u.url = info.url
u.avat = info.avatar
if autoSend:
client.status_post(
f"[自动发送] 我创建了一个匿名提问箱,欢迎提问~\n{WORK_URL}/askMe/{acct}/{u.secr}",
visibility='public')
db.session.commit()
return redirect(f"/askMe/{acct}/{u.secr}")
@app.route('/askMe/inbox', methods=['POST'])
@limiter.limit("10 / minute")
def set_inbox():
acct = request.form.get('username')
if not re.match('[A-Za-z0-9_]{1,30}(@[a-z\\.-_]+)?', acct):
return '无效的闭社id', 422
r = th.conversations()
for conv in r:
status = conv.last_status
account = status.account
if acct == account.acct:
pt = status.content.strip()
x = re.findall(r'新建(\[[a-z]{1,32}\])?', pt)
if not x:
return '私信格式无效,请检查并重新发送', 422
secr = x[0][1:-1] if x[0] else ''.join(
random.choice(string.ascii_lowercase) for i in range(16))
u = User.query.filter_by(acct=acct).first()
if not u:
u = User(acct)
db.session.add(u)
u.disp = account.display_name
u.url = account.url
u.avat = account.avatar
u.secr = secr
db.session.commit()
th.status_post(f"@{acct} 设置成功! 当前提问箱链接 {WORK_URL}/askMe/{acct}/{secr}\n(如需在微信等无链接预览的平台分享,建议先发给自己,点开,再点击分享到朋友圈等)",
in_reply_to_id=status.id,
visibility='direct'
)
return acct + '/' + secr
return '未找到私信,请确认已发送且是最近发送', 404
@app.route('/askMe/<acct>/<secr>/')
def inbox(acct, secr):
u = User.query.filter_by(acct=acct, secr=secr).first_or_404()
qs = [{
'content': q.content,
'toot': q.toot,
'time': q.time.replace(tzinfo=tzlocal())
} for q in Question.query.filter_by(acct=acct).all()
]
return render_template('inbox.html', acct=u.acct, disp=(
u.disp or u.acct), url=u.url, avat=u.avat, qs=qs)
@app.route('/askMe/<acct>/<secr>/new', methods=['POST'])
@limiter.limit("50 / hour; 1 / 2 second")
def new_question(acct, secr):
u = User.query.filter_by(acct=acct, secr=secr).first()
if not u:
abort(404)
content = request.form.get('question')
if not content or len(content) > 400:
abort(422)
if not Question.query.filter_by(acct=acct, content=content).first():
if not u.root:
toot = th.status_post(
f"@{acct} 欢迎使用匿名提问箱。未来的新提问会集中显示在这里,方便管理。戳我头像了解如何回复。",
visibility='direct')
u.root = toot.id
toot = th.status_post(f"@{acct} 叮~ 有新提问:\n\n{content}",
in_reply_to_id=u.root,
visibility='direct'
)
q = Question(acct, content, toot.id)
db.session.add(q)
db.session.commit()
return redirect(".")
def render_content(text, emojis):
text = MENTION_BOT_TEMP.sub('', text)
for emoji in emojis:
text = text.replace(':%s:' % emoji.shortcode, '<img class="emoji" src="%s">' % emoji.url)
return text
@app.route('/askMe/<acct>/<secr>/<int:toot>')
def question_info(acct, secr, toot):
q = Question.query.filter_by(acct=acct, toot=toot).first()
if not q or not User.query.filter_by(acct=acct, secr=secr).first():
abort(404)
context = th.status_context(toot)
replies = [
{
'disp': (t.account.display_name or t.account.acct),
'url': t.account.url,
'content': render_content(t.content, t.emojis),
'time': str(t.created_at),
'media': t.media_attachments,
}
for t in context.descendants
]
if replies and DELETE_TEMP.match(replies[-1].get('content')):
db.session.delete(q)
db.session.commit()
th.status_delete(toot)
return '该提问已被删除', 404
return {'replies': replies}
if __name__ == '__main__':
app.run(debug=True)