current position:Home>Generate password based on fast token and fast token

Generate password based on fast token and fast token

2022-05-15 03:36:09debugfeng

The last article developed two interfaces , But the registered interface password is not encrypted , Return for login interface token No encryption, no expiration time, etc …

So the main narrative , Processing for these two interfaces , The first is based on python-jose Library to generate token, Then based on the passlib Library to do password encryption and decryption .

Remember to install the dependency library first :

pip install passlib

pip install python-jose

….. Okay , Don't talk much , Let's move on to the text …..

sc_app/dependencies.py

# -*- coding: utf-8 -*-
# @Time    : 2022/4/12 20:20
# @Author  : Lifeng
# @File    : dependencies.py
# @Software: PyCharm

from typing import Optional
from jose import JWTError, jwt
from sc_app.redispy import redispy
from sc_app import settings as sp
from datetime import timedelta, datetime
from passlib.context import CryptContext
from fastapi import status, Header, HTTPException


def verify_x_token(x_token: str = Header(default="debugfeng")):
    """
     Verification and authentication 
    :param x_token:
    :return:
    """
    if x_token != "debugfeng":
        raise HTTPException(status_code=400, detail="not x_token in header !")


def generate_access_token(data: dict, expiration: Optional[timedelta] = None):
    """
     Generate token And encrypt 
    :param data:
    :param expiration:
    :return:
    """
    to_encode = data.copy()
    if expiration:
        expire = datetime.utcnow() + expiration
    else:
        expire = datetime.utcnow() + timedelta(days=3)
    to_encode.update({"exp": expire})
    to_encode_jwt = jwt.encode(to_encode, key=sp.KEY, algorithm=sp.ALGORITHM)
    return to_encode_jwt


def verify_token(x_token: str = Header(...), token: str = Header(...)):
    """
     Get the user and decrypt token
    :param x_token:
    :param token:
    :return:
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail={"Message": "  The voucher is wrong or invalid ...... "},
    )
    credentials_exception_token = HTTPException(
        status_code=status.HTTP_403_FORBIDDEN,
        detail={"Message": "  The user is not logged in or logged in  token  It's invalid !"}
    )
    try:
        #    analysis token value 
        payload = jwt.decode(token, key=sp.KEY, algorithms=sp.ALGORITHM)
        username: str = payload["username"]
        #    Judge whether the user is null 
        if username is None:
            raise credentials_exception
        #   redis Read token value 
        redis_token = redispy.get_value(username, is_data=True)
        #    If the conditions are not met, an error is thrown 
        if not username and not redis_token and x_token != "debugfeng" and redis_token != token:
            raise credentials_exception_token
        return
    except JWTError:
        raise credentials_exception


def encryption_password_or_decode(*, pwd: str, hashed_password: str = None):
    """
     Password encryption or decryption 
    :param pwd:
    :param hashed_password:
    :return:
    """
    encryption_pwd = CryptContext(
        schemes=["sha256_crypt", "md5_crypt", "des_crypt"]
    )

    def encryption_password():
        password = encryption_pwd.hash(pwd)
        return password

    def decode_password():
        password = encryption_pwd.verify(pwd, hashed_password)
        return password

    return decode_password() if hashed_password else encryption_password()

Information notes :

  • datetime.utcnow() + expiration: Is to set the time for the signature .

  • jwt.encode(to_encode, key=sp.KEY, algorithm=sp.ALGORITHM): encryption ,to_encode It's a required statement ,sp.KEY It's the key passed ,sp.ALGORITHM It's a signature algorithm , The default is HS256.

  • jwt.decode(token, key=sp.KEY, algorithms=sp.ALGORITHM): Decrypt , The parameters in it , except token Outside , Other parameters are consistent with encryption .

  • redispy.get_value(username, is_data=True): From... According to user name redis Read from token, And assign to variable redis_token.

  • CryptContext(schemes=["sha256_crypt", "md5_crypt", "des_crypt"]): Initialize an object ,schemes The parameters in are three ways of encryption .

  • encryption_pwd.hash(pwd): Is to perform password encryption

  • encryption_pwd.verify(pwd, hashed_password): Is to perform decryption , If the decryption is consistent, it returns True

sc_app/routers/register.py

# -*- encoding: utf-8 -*-
"""
@__Author__: lifeng
@__Software__: PyCharm
@__File__: register.py
@__Date__: 2022/5/3 14:48
"""

from fastapi import Depends
from fastapi import APIRouter
from fastapi import HTTPException
from sc_app.databases import get_db
from sqlalchemy.orm import Session
from sc_app.model import user_models
from sc_app.schemas.users.register import User, UserPwd
from sc_app.dependencies import encryption_password_or_decode


class DatabaseUser:
    def __init__(self, db: Session):
        self.db = db

    def get_user_username(self, *, username: str):
        return self.db.query(
            user_models.Users
        ).filter(user_models.Users.username == username).first()

    def register_user(self, *, user: UserPwd):
        hashed_password = encryption_password_or_decode(pwd=user.password)
        db_user = user_models.Users(
            username=user.username, password=hashed_password
        )
        self.db.add(db_user)
        self.db.commit()
        self.db.refresh(db_user)
        return db_user

    def __del__(self):
        self.db.close()


router = APIRouter(
    prefix="/user"
)


@router.post("/register/", response_model=User)
def api_register_user(user: UserPwd, db: Session = Depends(get_db)):
    """
     Student registration interface 
    :param user:
    :param db:
    :return:
    """
    connect = DatabaseUser(db)
    data = connect.get_user_username(username=user.username)

    if data:
        raise HTTPException(
            status_code=400,
            detail="  The data does not exist or the name is duplicate  !"
        )

    get_user = connect.register_user(user=user)
    print(get_user.password)
    return get_user

Information notes :

  • encryption_password_or_decode(pwd=user.password): Call the encryption function to encrypt the password .

sc_app/routers/login.py

# -*- encoding: utf-8 -*-
"""
@__Author__: lifeng
@__Software__: PyCharm
@__File__: login.py
@__Date__: 2022/5/3 15:19
"""

from datetime import timedelta
from sc_app.databases import get_db
from sqlalchemy.orm import Session
from sc_app.model import user_models
from sc_app.redispy import redispy
from sc_app.dependencies import generate_access_token
from fastapi import APIRouter, HTTPException, Depends, status
from sc_app.schemas.users.login import UserPwd, UserToken
from sc_app.dependencies import encryption_password_or_decode

ACCESS_TOKEN_EXPIRE_DAYS = 3


def check_user(user: str, pwd: str, db: Session = Depends(get_db)):
    """
     According to the entered user information , Query and compare the account and password in the database , And decrypt the password 
    :param user:
    :param pwd:
    :param db:
    :return:
    """
    username, password = db.query(
        user_models.Users.username, user_models.Users.password
    ).filter(user_models.Users.username == user).first()

    if not username and not encryption_password_or_decode(pwd=pwd, hashed_password=password):
        return False
    return True


router = APIRouter(
    prefix="/user"
)


@router.post("/login/", response_model=UserToken)
def api_login_user(user: UserPwd, db: Session = Depends(get_db)):
    """
     Login interface 
    :param user:
    :param db:
    :return:
    """

    #    If you return False Throws an error 
    if not check_user(user.username, user.password, db):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail={
                "status": 0,
                "data": {},
                "error_msg": " Wrong password or account number  !",
                "error_code": 1000
            }
        )
    #   Judge redis in key Whether there is 
    if not redispy.get_exists(user.username):
        #   Set the time to 3 God 
        access_token_expires = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)

        #    establish token
        access_token = generate_access_token(
            data={"username": user.username}, expiration=access_token_expires
        )
        #    Go to redis writes token
        redispy.set_value(user.username, access_token, is_data=True)
        return {"token": access_token, "username": user.username}

    #    from redis Read from token
    access_token = redispy.get_value(user.username, is_data=True)
    return {"token": access_token, "username": user.username}

Information notes :

  • ACCESS_TOKEN_EXPIRE_DAYS = 3: Set up token Failure time .

    #  If the return is false, the creation time , And create token
    if not redispy.get_exists(user.username):
        #   Set the time to 3 God 
        access_token_expires = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)

        #    establish token
        access_token = generate_access_token(
            data={"username": user.username}, expiration=access_token_expires
        )
        #    Go to redis writes token
        redispy.set_value(user.username, access_token, is_data=True)

    #  If the user name is false and the password is false after decryption, false is returned 
    if not username and not encryption_password_or_decode(pwd=pwd, hashed_password=password):
        return False

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2022/4/12 20:20
# @Author  : Lifeng
# @Site    : 
# @File    : main.py
# @Software: PyCharm

import uvicorn
from fastapi import FastAPI, Depends
from sc_app.routers import register, login
from sc_app.dependencies import verify_token
from sc_app.dependencies import verify_x_token

app = FastAPI()
app.include_router(router=register.router, dependencies=[Depends(verify_x_token)])
app.include_router(router=login.router, dependencies=[Depends(verify_x_token)])


@app.get("/index/", dependencies=[Depends(verify_token)])
def index():
    """
     home page 
    :return:
    """
    return {"message": "Welcome to the home page !"}


if __name__ == '__main__':
    uvicorn.run(app="main:app", reload=True, debug=True)

Information notes :

#  relation token Sign in 
@app.get("/index/", dependencies=[Depends(verify_token)])

Now start the service , And then use it postman Make an interface request , As shown below :

Registered interface

  • Request interface
POST  http://127.0.0.1:8000/user/register/
  • Request header :
{
    x-token: debugfeng
}
  • Request parameters :
{
    "username": "[email protected]",
    "password": "123456"
}
  • Request the results
{
    "username": "[email protected]",
    "id": 16,
    "is_active": true
}

After the registration interface request is successful , Because I printed the password , The console will display as follows :

$5$rounds=535000$.jZyKjLiJRINVMdT$n8.wo3yfUQ2tdAdYMkipPAGXrlQ9.lXSN6iB8PiR162
INFO:     127.0.0.1:58328 - "POST /user/register/ HTTP/1.1" 200 OK

Login interface

  • Request interface :
POST  http://127.0.0.1:8000/user/login/
  • Request header :
{
    x-token: debugfeng
}
  • Request parameters :
{
    "username": "[email protected]",
    "password": "123456"
}
  • Request the results :
{
    "username": "[email protected]",
    "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImRlYnVnZmVuZzE2QHFxLmNvbSIsImV4cCI6MTY1MjQ1ODc5OH0.JyuA1u9JNkvR6rjMZ8ZvxzLmz_kEfaH-8U3nbM9JKn8",
    "x_token": "debugfeng"
}

Home interface

  • Request interface
GET  http://127.0.0.1:8000/user/login/
  • Request header :
{
    x-token: debugfeng
    token: 123456
}
  • Request the results :
{
    "detail": {
        "Message": "  The voucher is wrong or invalid ...... "
    }
}

This is because it is wrong token, That's why the error message is thrown … Now? , We are header It's really token Parameters , as follows :

  • Request header :
{
    "x-token": "debugfeng",
    "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImRlYnVnZmVuZzE2QHFxLmNvbSIsImV4cCI6MTY1MjQ1ODc5OH0.JyuA1u9JNkvR6rjMZ8ZvxzLmz_kEfaH-8U3nbM9JKn8",
}
  • Request the results :
{
    "message": "Welcome to the home page !"
}

Let's stop here today , The above summary may help you , Maybe I can't help you , But I still hope it can help you , If you have any questions 、 Ambiguity , Direct private messages will be revised and released in time ; I'm looking forward to your one click 3 even 【 give the thumbs-up 、 Collection 、 Share 】 yo , thank you !

Hang in the air , To be continued ……

Keep trying , I hope you too !

WeChat search official account : Just use python

copyright notice
author[debugfeng],Please bring the original link to reprint, thank you.
https://en.pythonmana.com/2022/131/202205111536376372.html

Random recommended