OAuth: (开放授权)

OAuth的授权模式:

  • 授权码模式: 功能最完善,流程最严密

  • 简码模式: 不通过第三方应用程序服务器,直接在浏览器中向认证服务器申请指令

  • 密码模式:用户向客户端提供用户名和密码

  • 客户端模式:

OAuth授权服务器:

在logindemo.py中添加:

#!/usr/bin/env python# -*- coding: utf-8 -*-import base64import randomimport timefrom flask import Flask, request, redirectapp = Flask(__name__)users = {    "xxxx": ["xxxxx"]}auth_code = {}redirect_uri='http://localhost:5000/client/passport'# 给用户添加账号client_id = 'xxxxxx'users[client_id] = []# 授权服务器需要保存重定向urioauth_redirect_uri = []def gen_token(uid):    token = base64.b64encode(':'.join([str(uid), str(random.random()), str(time.time() + 7200)]))    users[uid].append(token)    return token# 生成授权码:def gen_auth_code(uri):    code = random.randint(0, 10000)    auth_code[code] = uri    return codedef verify_token(token):    _token = base64.b64decode(token)    if not users.get(_token.split(':')[0])[-1] == token:        return -1    if float(_token.split(':')[-1]) >= time.time():        return 1    else:        return 0@app.route('/', methods=['GET'])def index():    print request.headers    return 'hello'@app.route('/login', methods=['GET'])def login():    uid, pw = base64.b64decode(request.headers['Authorization'].split(' ')[-1]).split(':')    if users.get(uid)[0] == pw:        return gen_token(uid)    else:        return 'error'#授权码的发放:@app.route('/oauth', methods=['GET'])def oauth():    # 验证用户授权    if request.args.get('user'):        if users.get(request.args.get('user'))[0] == request.args.get('pw') and oauth_redirect_uri:            uri = oauth_redirect_uri[0] + '?code=%s' % gen_auth_code(oauth_redirect_uri[0])            return redirect(uri)    if request.args.get('code'): # 若请求中携带授权码,        # 比对uri        if auth_code.get(int(request.args.get('code'))) == request.args.get('redirect_uri'):            return gen_token(request.args.get('client_id'))# 发放token    if request.args.get('redirect_uri'):        oauth_redirect_uri.append(request.args.get('redirect_uri'))    return 'please login'# 重定向# 用户访问客户端的login目录,客户端将用户重定向到授权服务端的oauth@app.route('/client/login', methods=['GET'])def client_login():    uri = 'http://localhost:5000/oauth?response_type=code&client_id=%s&redirect_uri=%s' % (        client_id, redirect_uri)    return redirect(uri)@app.route('/client/passport', methods=['POST', 'GET'])def client_passport():    code = request.args.get('code')    uri = 'http://localhost:5000/oauth?grant_type=authorization_code&code=%s&redirect_uri=%s&client_id=%s' % (code, redirect_uri, client_id)    return redirect(uri)@app.route('/test1', methods=['GET'])def test():    token = request.args.get('token')    if verify_token(token) == 1:        return 'data'    else:        return 'error'if __name__ == '__main__':    app.run(debug=True)

在requests_t.py

#!/usr/bin/env python# -*- coding: utf-8 -*-import requestsr = requests.get('http://localhost:5000/client/login')print r.textprint r.historyprint r.urluri_login = r.url.split('?')[0] + '?user=zx&pw=thystar'r2 = requests.get(uri_login)print r2.textr = requests.get('http://127.0.0.1:5000/test1', params={'token': r2.text})print r.text

Flask渲染页面集Cookies;

Cookies的加密方法:

对源代码的修改:

logindemo.py

#!/usr/bin/env python# -*- coding: utf-8 -*-import base64import randomimport timeimport jsonimport hmacfrom datetime import datetime, timedeltafrom flask import Flask, request, redirect, make_responseapp = Flask(__name__)users = {    "zx": ["thystar"]}redirect_uri='http://localhost:5000/client/passport'client_id = 'thystar'users[client_id] = []auth_code = {}oauth_redirect_uri = []TIMEOUT = 3600 * 2# 新版本的token生成器def gen_token(data):    '''    :param data: dict type    :return: base64 str    '''    data = data.copy()    if "salt" not in data:        data["salt"] = unicode(random.random()).decode("ascii")    if "expires" not in data:        data["expires"] = time.time() + TIMEOUT    payload = json.dumps(data).encode("utf8")    # 生成签名    sig = _get_signature(payload)    return encode_token_bytes(payload + sig)# 授权码生成器def gen_auth_code(uri, user_id):    code = random.randint(0,10000)    auth_code[code] = [uri, user_id]    return code# 新版本的token验证def verify_token(token):    '''    :param token: base64 str    :return: dict type    '''    decoded_token = decode_token_bytes(str(token))    payload = decoded_token[:-16]    sig = decoded_token[-16:]    # 生成签名    expected_sig = _get_signature(payload)    if sig != expected_sig:        return {}    data = json.loads(payload.decode("utf8"))    if data.get('expires') >= time.time():        return data    return 0# 使用hmac为消息生成签名def _get_signature(value):    """Calculate the HMAC signature for the given value."""    return hmac.new('secret123456', value).digest()# 下面两个函数将base64编码和解码单独封装def encode_token_bytes(data):    return base64.urlsafe_b64encode(data)def decode_token_bytes(data):    return base64.urlsafe_b64decode(data)# 验证服务器端@app.route('/index', methods=['POST', 'GET'])def index():    print request.headers    return 'hello'@app.route('/login', methods=['POST', 'GET'])def login():    uid, pw = base64.b64decode(request.headers['Authorization'].split(' ')[-1]).split(':')    if users.get(uid)[0] == pw:        return gen_token(dict(user=uid, pw=pw))    else:        return 'error'@app.route('/oauth', methods=['POST', 'GET'])def oauth():    # 处理表单登录, 同时设置Cookie    if request.method == 'POST' and request.form['user']:        u = request.form['user']        p = request.form['pw']        if users.get(u)[0] == p and oauth_redirect_uri:            uri = oauth_redirect_uri[0] + '?code=%s' % gen_auth_code(oauth_redirect_uri[0], u)            expire_date = datetime.now() + timedelta(minutes=1)            resp = make_response(redirect(uri))            resp.set_cookie('login', '_'.join([u, p]), expires=expire_date)            return resp    # 验证授权码,发放token    if request.args.get('code'):        auth_info = auth_code.get(int(request.args.get('code')))        if auth_info[0] == request.args.get('redirect_uri'):            # 可以在授权码的auth_code中存储用户名,编进token中            return gen_token(dict(client_id=request.args.get('client_id'), user_id=auth_info[1]))    # 如果登录用户有Cookie,则直接验证成功,否则需要填写登录表单    if request.args.get('redirect_uri'):        oauth_redirect_uri.append(request.args.get('redirect_uri'))        if request.cookies.get('login'):            u, p = request.cookies.get('login').split('_')            if users.get(u)[0] == p:                uri = oauth_redirect_uri[0] + '?code=%s' % gen_auth_code(oauth_redirect_uri[0], u)                return redirect(uri)        return '''            
                

                

                

                    '''# 客户端@app.route('/client/login', methods=['POST', 'GET'])def client_login():    uri = 'http://localhost:5000/oauth?response_type=code&client_id=%s&redirect_uri=%s' % (client_id, redirect_uri)    return redirect(uri)@app.route('/client/passport', methods=['POST', 'GET'])def client_passport():    code = request.args.get('code')    uri = 'http://localhost:5000/oauth?grant_type=authorization_code&code=%s&redirect_uri=%s&client_id=%s' % (code, redirect_uri, client_id)    return redirect(uri)# 资源服务器端@app.route('/test1', methods=['POST', 'GET'])def test():    token = request.args.get('token')    ret = verify_token(token)    if ret:        return json.dumps(ret)    else:        return 'error'if __name__ == '__main__':    app.run(debug=True)

运行

登陆得到token,把token拼接到test1中测试

极客学院: