Browse Source

pep8

master
欧醚 2 years ago
parent
commit
50856d1f78
1 changed files with 81 additions and 54 deletions
  1. +81
    -54
      ask.py

+ 81
- 54
ask.py View File

@ -4,7 +4,10 @@ from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_migrate import Migrate
from mastodon import Mastodon
import re, random, string, datetime
import re
import random
import string
import datetime
from dateutil.tz import tzlocal
import html2text
@ -12,17 +15,17 @@ BOT_NAME = '@ask_me_bot'
CLIENT_ID = 'tKqbqe-KfV6n2HUnSyjnBbsuzGCgshW5ICu-YTIoeSU'
CLIENT_SEC = open('client.secret', 'r').read().strip()
DOMAIN = 'thu.closed.social'
DOMAIN = 'thu.closed.social'
WORK_URL = 'https://closed.social'
#WORK_URL = 'http://127.0.0.1:5000'
# WORK_URL = 'http://127.0.0.1:5000'
REDIRECT_URI = WORK_URL + '/askMe/auth'
token = open('token.secret','r').read().strip()
token = open('token.secret', 'r').read().strip()
th = Mastodon(
access_token = token,
api_base_url = 'https://' + DOMAIN
access_token=token,
api_base_url='https://' + DOMAIN
)
@ -43,12 +46,13 @@ h2t.ignore_links = True
db = SQLAlchemy(app)
migrate = Migrate(app, db)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
acct = db.Column(db.String(64))
disp = db.Column(db.String(64))
avat = db.Column(db.String(256))
url = db.Column(db.String(128))
url = db.Column(db.String(128))
secr = db.Column(db.String(16))
root = db.Column(db.BigInteger)
@ -58,13 +62,14 @@ class User(db.Model):
def __repr__(self):
return f"@{self.acct}[{self.disp}]"
class Question(db.Model):
id = db.Column(db.Integer, primary_key=True)
acct = db.Column(db.String(64))
content = db.Column(db.String(400))
time = db.Column(db.DateTime)
toot = db.Column(db.BigInteger)
def __init__(self, acct, content, toot):
self.acct = acct
self.content = content
@ -74,17 +79,22 @@ class Question(db.Model):
def __repr__(self):
return f"[{self.acct}]{self.content}({self.toot})"
@app.route('/js/<path:path>')
def send_js(path):
return send_from_directory('static/js', path)
@app.route('/img/<path:path>')
def send_img(path):
return send_from_directory('static/img', path)
@app.route('/askMe/')
def root():
return app.send_static_file('ask.html')
@app.route('/askMe/footer.html')
def root_footer():
return app.send_static_file('footer.html')
@ -98,16 +108,21 @@ def set_inbox_auth():
secr = request.args.get('secr')
#print(code,autoSend, secr)
if secr and not re.match('[a-z]{0,16}', secr):
abort(422)
client = Mastodon(
client_id = CLIENT_ID,
client_secret = CLIENT_SEC,
api_base_url = 'https://' + DOMAIN
)
token = client.log_in(code=code, redirect_uri=f"{REDIRECT_URI}?autoSend={autoSend or ''}&secr={secr or ''}", scopes=['read:accounts', 'write:statuses'] if autoSend else ['read:accounts'])
client_id=CLIENT_ID,
client_secret=CLIENT_SEC,
api_base_url='https://' + DOMAIN
)
token = client.log_in(
code=code,
redirect_uri=f"{REDIRECT_URI}?autoSend={autoSend or ''}&secr={secr or ''}",
scopes=[
'read:accounts',
'write:statuses'] if autoSend else ['read:accounts'])
info = client.account_verify_credentials()
@ -116,40 +131,45 @@ def set_inbox_auth():
if not u:
u = User(acct)
db.session.add(u)
u.secr = secr or u.secr or ''.join(random.choice(string.ascii_lowercase) for i in range(16))
u.secr = secr or u.secr or ''.join(random.choice(
string.ascii_lowercase) for i in range(16))
u.disp = info.display_name
u.url = info.url
u.url = info.url
u.avat = info.avatar
if autoSend:
client.status_post(f"[自动发送] 我创建了一个匿名提问箱,欢迎提问~\n{WORK_URL}/askMe/{acct}/{u.secr}", visibility='public')
client.status_post(
f"[自动发送] 我创建了一个匿名提问箱,欢迎提问~\n{WORK_URL}/askMe/{acct}/{u.secr}",
visibility='public')
db.session.commit()
return redirect(f"/askMe/{acct}/{u.secr}")
@app.route('/askMe/inbox', methods=['POST'])
@limiter.limit("10 / minute")
def set_inbox():
acct = request.form.get('username')
if not re.match('[A-Za-z0-9_]{1,30}(@[a-z\.-_]+)?', acct):
if not re.match('[A-Za-z0-9_]{1,30}(@[a-z\\.-_]+)?', acct):
return '无效的闭社id', 422
r = th.conversations()
for conv in r:
status = conv.last_status
account = status.account
#print(account)
# print(account)
if acct == account.acct:
pt = h2t.handle(status.content).strip()
x = re.findall('新建(\[[a-z]{1,32}\])?', pt)
x = re.findall('新建(\\[[a-z]{1,32}\\])?', pt)
if not x:
return '私信格式无效,请检查并重新发送', 422
secr = x[0][1:-1] if x[0] else ''.join(random.choice(string.ascii_lowercase) for i in range(16))
secr = x[0][1:-1] if x[0] else ''.join(
random.choice(string.ascii_lowercase) for i in range(16))
u = User.query.filter_by(acct=acct).first()
if not u:
@ -157,34 +177,37 @@ def set_inbox():
db.session.add(u)
u.disp = account.display_name
u.url = account.url
u.url = account.url
u.avat = account.avatar
u.secr = secr
db.session.commit()
th.status_post(f"@{acct} 设置成功! 当前提问箱链接 {WORK_URL}/askMe/{acct}/{secr}\n(如需在微信等无链接预览的平台分享,建议先发给自己,点开,再点击分享到朋友圈等)",
in_reply_to_id=status.id,
visibility='direct'
)
th.status_post(f"@{acct} 设置成功! 当前提问箱链接 {WORK_URL}/askMe/{acct}/{secr}\n(如需在微信等无链接预览的平台分享,建议先发给自己,点开,再点击分享到朋友圈等)",
in_reply_to_id=status.id,
visibility='direct'
)
return acct + '/' + secr
return '未找到私信,请确认已发送且是最近发送', 404
@app.route('/askMe/<acct>/<secr>/')
def inbox(acct, secr):
u = User.query.filter_by(acct=acct, secr=secr).first()
if not u:
abort(404)
qs = [{
'content': q.content,
'toot': q.toot,
'time': q.time.replace(tzinfo=tzlocal())
} for q in Question.query.filter_by(acct=acct).all()
]
'content': q.content,
'toot': q.toot,
'time': q.time.replace(tzinfo=tzlocal())
} for q in Question.query.filter_by(acct=acct).all()
]
return render_template('inbox.html', acct=u.acct, disp=(
u.disp or u.acct), url=u.url, avat=u.avat, qs=qs)
return render_template('inbox.html', acct=u.acct, disp=(u.disp or u.acct), url=u.url, avat=u.avat, qs=qs)
@app.route('/askMe/<acct>/<secr>/new', methods=['POST'])
@limiter.limit("50 / hour; 1 / 2 second")
@ -195,19 +218,21 @@ def new_question(acct, secr):
content = request.form.get('question')
print(content)
if not content or len(content)>400:
if not content or len(content) > 400:
abort(422)
if not Question.query.filter_by(acct=acct, content=content).first():
if not u.root:
toot = th.status_post(f"@{acct} 欢迎使用匿名提问箱。未来的新提问会集中显示在这里,方便管理。戳我头像了解如何回复。", visibility='direct')
toot = th.status_post(
f"@{acct} 欢迎使用匿名提问箱。未来的新提问会集中显示在这里,方便管理。戳我头像了解如何回复。",
visibility='direct')
u.root = toot.id
toot = th.status_post(f"@{acct} 叮~ 有新提问:\n\n{content}",
in_reply_to_id=u.root,
visibility='direct'
)
in_reply_to_id=u.root,
visibility='direct'
)
q = Question(acct, content, toot.id)
db.session.add(q)
@ -215,6 +240,7 @@ def new_question(acct, secr):
return redirect(".")
@app.route('/askMe/<acct>/<secr>/<int:toot>')
def question_info(acct, secr, toot):
q = Question.query.filter_by(acct=acct, toot=toot).first()
@ -223,15 +249,15 @@ def question_info(acct, secr, toot):
context = th.status_context(toot)
replies = [
{
'disp': (t.account.display_name or t.account.acct),
'url': t.account.url,
'content': h2t.handle(t.content).replace(BOT_NAME,'').strip(),
'time': str(t.created_at)
}
for t in context.descendants
]
#print(replies)
{
'disp': (t.account.display_name or t.account.acct),
'url': t.account.url,
'content': h2t.handle(t.content).replace(BOT_NAME, '').strip(),
'time': str(t.created_at)
}
for t in context.descendants
]
# print(replies)
if replies and replies[-1].get('content') == '删除':
db.session.delete(q)
db.session.commit()
@ -240,5 +266,6 @@ def question_info(acct, secr, toot):
return {'replies': replies}
if __name__ == '__main__':
app.run(debug=True)

Loading…
Cancel
Save