|
|
- # There're two ways to run this app:
- #
- # run the web
- # $ export FLASK_APP=app.py
- # $ flask run
- #
- # create or refresh the database
- # $ python3 app.py
-
- from flask import Flask, request, render_template, send_from_directory, abort, redirect, session, url_for, send_file, flash
- from flask_sqlalchemy import SQLAlchemy
- from sqlalchemy import func
- from flask_limiter import Limiter
- from flask_limiter.util import get_remote_address
-
- import ipfshttpclient
- from mastodon import Mastodon
-
- import pypinyin
-
- from string import ascii_uppercase as AZ
- from datetime import date, datetime
- from functools import wraps
- import hashlib
- import shutil
- import random
- import os
- import re
- from config import C
-
- app = Flask(__name__)
- app.config.from_object('config.C')
- app.secret_key = C.session_key
-
- limiter = Limiter(
- app,
- key_func=get_remote_address,
- default_limits=["50 / minute"],
- )
-
- db = SQLAlchemy(app)
-
- ipfs_client = ipfshttpclient.connect()
- ipfs_info = ipfs_client.id()
-
- MAST_LOGIN_URL = Mastodon(api_base_url=C.mast_base_uri) \
- .auth_request_url(
- client_id = C.mast_client_id,
- redirect_uris = C.mast_redirect_uri,
- scopes = ['read:accounts']
- )
-
- class Paper(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- course = db.Column(db.String(30), index=True)
- teacher = db.Column(db.String(30), index=True)
- year = db.Column(db.Integer, index=True)
- author = db.Column(db.String(30), index=True)
- notes = db.Column(db.String(200))
- anon = db.Column(db.Boolean)
- create_date = db.Column(db.Date)
- like_num = db.Column(db.Integer, index=True, default=0)
- down_num = db.Column(db.Integer, index=True, default=0)
- file_hash = db.Column(db.String(64))
-
- # always increment 1 for the id of a new record
- __table_args__ = { 'sqlite_autoincrement': True }
-
- def is_downloaded(self):
- return bool(DownloadRelation.query.filter_by(paper_id=self.id, username=session.get('username')).count())
-
- def is_liked(self):
- return bool(LikeRelation.query.filter_by(paper_id=self.id, username=session.get('username')).count())
-
- class DownloadRelation(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- paper_id = db.Column(db.Integer, index=True)
- username = db.Column(db.String(30), index=True)
-
- class LikeRelation(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- paper_id = db.Column(db.Integer, index=True)
- username = db.Column(db.String(30), index=True)
-
- if __name__ == "__main__":
- db.create_all()
-
- @app.route('/pastExam/img/<path:path>')
- def send_img(path):
- return send_from_directory('static/img', path)
-
- def login_required(allow_guest=True):
- def login_required_instance(f):
- @wraps(f)
- def df(*args, **kwargs):
- username = session.get('username')
- if not username or (not allow_guest and username.startswith('guest<')):
- return redirect(url_for('login'))
- return f(*args, **kwargs, username=username)
- return df
- return login_required_instance
-
- @app.route('/pastExam/login/')
- def login():
- return app.send_static_file('login/index.html')
-
- @app.route('/pastExam/login/count-info.html')
- def login_count_info():
- return app.send_static_file('login/count-info.html')
-
- @app.route('/pastExam/logout')
- def logout():
- session.pop('username', None)
- return redirect('login')
-
- @app.route('/pastExam/login/guest/')
- def guest_login():
- return render_template('guest-login.html', vs=C.verify, allow_guest_upload=C.allow_guest_upload)
-
- @app.route('/pastExam/login/guest/verify', methods=['POST'])
- @limiter.limit("5 / hour")
- def guest_login_verify():
- for name, ques, hint, ans in C.verify:
- if request.form.get(name) != ans:
- return '错误!', 401
-
- if 'uid' not in session:
- session['uid'] = random.randint(0, 10000000)
-
- session['username'] = 'guest<%s>' % session['uid']
- session.pop('avatar', None)
- session.permanent = True
- return {'r':0}
-
- @app.route('/pastExam/login/mast/')
- def mast_login():
- return redirect(MAST_LOGIN_URL)
-
- @app.route('/pastExam/login/mast/auth')
- def mast_login_auth():
- code = request.args.get('code')
- client = Mastodon(
- client_id=C.mast_client_id,
- client_secret=C.mast_client_sec,
- api_base_url=C.mast_base_uri
- )
- token = client.log_in(code=code, redirect_uri=C.mast_redirect_uri,scopes=['read:accounts'])
- info = client.account_verify_credentials()
- session['username'] = info.acct
- session['avatar'] = info.avatar
- session.permanent = True
-
- return redirect(url_for('list'))
-
- def by_abc(l):
- d = {
- letter: [] for letter in AZ+'/'
- }
- for item in l:
- k = pypinyin.lazy_pinyin(item[0], style=pypinyin.Style.FIRST_LETTER, errors=lambda x:x[0])[0].upper()
- if k not in AZ:
- k = '/'
- d[k].append(item)
-
- return {k: v for k, v in d.items() if v}
-
-
- @app.route('/pastExam/')
- @login_required()
- def list(username):
- avatar = session.get('avatar', C.guest_avatar)
- course = request.args.get('course')
- teacher = request.args.get('teacher')
- year = request.args.get('year')
- year = year and year.isdigit() and int(year)
-
- is_my_upload = request.args.get('my_upload')
- is_my_fav = request.args.get('my_fav')
-
- has_course = course is not None
- has_teacher = teacher is not None
- has_year = year is not None
- ept = not (has_course or has_teacher or has_year) and is_my_fav is None and is_my_upload is None and 'page' not in request.args
-
- ps = Paper.query
-
- if course:
- ps = ps.filter_by(course=course)
- if teacher:
- ps = ps.filter_by(teacher=teacher)
- if year or year==0:
- ps = ps.filter_by(year=year)
-
- if is_my_upload:
- ps = ps.filter_by(author=username)
- if is_my_fav:
- ps = ps.join(LikeRelation, Paper.id==LikeRelation.paper_id).filter(LikeRelation.username==username)
-
-
- ps = ps.order_by(db.desc('like_num'), db.desc('down_num'), db.desc('id'))
- pagination = ps.paginate(max_per_page=100)
- curr_year = date.today().year
- all_courses = db.session.query(Paper.course, func.count()).group_by(Paper.course).all()
- all_courses_abc = by_abc(all_courses)
- all_teachers = db.session.query(Paper.teacher, func.count()).group_by(Paper.teacher).all()
- all_years = db.session.query(Paper.year, func.count()).group_by(Paper.year).all()
- ipfs_version = hashlib.sha256(C.ipfs_base_url.encode('utf-8')).hexdigest()
- ipfs_id = ipfs_info.get('ID')
- disable_upload = not C.allow_guest_upload and username.startswith('guest<')
- contributor_link = C.contributor_link
- return render_template('list.html', **locals())
-
- def check_length(x, limit=30, allow_null=False):
- return (x and len(x) <= limit) or (allow_null and not x)
-
- @app.route('/pastExam/upload', methods=['POST'])
- @limiter.limit("10 / hour")
- @login_required(allow_guest=C.allow_guest_upload)
- def upload(username):
- name = request.form.get('name')
- teacher = request.form.get('teacher')
- year = request.form.get('year')
- year = year and year.isdigit() and int(year) or 0
- notes = request.form.get('notes', '')
- anon = request.form.get('anon') == 'on'
-
- if not (check_length(name) and check_length(teacher) and check_length(notes, 200, True)):
- abort(422)
-
- files = request.files.getlist('files[]')
- print(files)
-
- dir_name = username + str(datetime.now())
- base_path = os.path.join('/tmp', dir_name)
- os.mkdir(base_path)
- for f in files:
- filename = f.filename.replace('..','__')
- os.makedirs(os.path.dirname(os.path.join(base_path, filename)), exist_ok=True)
- f.save(os.path.join(base_path, filename))
-
- res = ipfs_client.add(base_path, recursive=True)
- file_hash = ''
- for r in res:
- if r.get('Name') == dir_name:
- file_hash = r.get('Hash')
-
- if not file_hash:
- abort(500)
-
- paper = Paper(
- course=name,
- teacher=teacher,
- year=year,
- notes=notes,
- anon=anon,
- author=username,
- create_date=date.today(),
- file_hash=file_hash
- )
- db.session.add(paper)
- db.session.commit()
-
- flash('上传成功')
- return redirect('.#part2')
-
- @app.route('/pastExam/<pid>/download')
- @limiter.limit("100 / hour")
- @login_required()
- def download(pid, username):
- p = Paper.query.get_or_404(pid)
-
- r = DownloadRelation.query.filter_by(paper_id=pid, username=username).count()
-
- if not r:
- r = DownloadRelation(paper_id=pid, username=username)
- db.session.add(r)
- p.down_num += 1
- db.session.commit()
-
- dformat = request.args.get('format')
- suff = {
- 'zip': '.zip',
- 'tar': '.tar',
- 'gztar': '.tar.gz'
- }
-
- if dformat in suff:
- target_file = '/tmp/%s' % pid
- target_filename = target_file + suff[dformat]
- target_dir = os.path.join('/tmp', p.file_hash)
-
- if not os.path.exists(target_filename):
- if not os.path.exists(target_dir):
- ipfs_client.get(p.file_hash, target='/tmp')
- shutil.make_archive(target_file, dformat, target_dir)
-
- filename = re.sub('[^\w@_()()-]', '_', '%s_%s_共享计划_%d' %(p.course, p.teacher, p.id)) + suff[dformat]
- return send_file(target_filename, as_attachment=True, attachment_filename=filename)
-
- return redirect(C.ipfs_base_url + p.file_hash, code=301) # 301减少不必要的请求
-
- @app.route('/pastExam/<pid>/like', methods=['POST', 'DELETE'])
- @limiter.limit("100 / hour")
- @login_required()
- def like(pid, username):
- p = Paper.query.get_or_404(pid)
- r = LikeRelation.query.filter_by(paper_id=pid, username=username).first()
-
- if request.method == 'POST':
- if not r:
- r = LikeRelation(paper_id=p.id, username=username)
- db.session.add(r)
- p.like_num += 1
- db.session.commit()
- else:
- if r:
- db.session.delete(r)
- p.like_num -= 1
- db.session.commit()
-
- return str(p.like_num)
-
|