Python封装Redis通用工具类
1.django项目settings.py文件关于redis的配置# Redis缓存库配置CACHES = {"default": {"BACKEND": "django_redis.cache.RedisCache","LOCATION": "redis://192.168.10.2:6379/0",...
·
1.django项目settings.py文件关于redis的配置
# Redis缓存库配置
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://192.168.10.2:6379/0",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"COMPRESSOR": "django_redis.compressors.zlib.ZlibCompressor",
"CONNECTION_POOL_KWARGS": {"max_connections": 512},
"IGNORE_EXCEPTIONS": True,
"SOCKET_CONNECT_TIMEOUT": 5, # in seconds
"SOCKET_TIMEOUT": 5, # in seconds
}
},
"authority": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://192.168.10.2:6379/1",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"COMPRESSOR": "django_redis.compressors.zlib.ZlibCompressor",
"CONNECTION_POOL_KWARGS": {"max_connections": 512},
"IGNORE_EXCEPTIONS": True,
"SOCKET_CONNECT_TIMEOUT": 5, # in seconds
"SOCKET_TIMEOUT": 5, # in seconds
}
},
"feedflow": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://192.168.10.2:6379/2",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"COMPRESSOR": "django_redis.compressors.zlib.ZlibCompressor",
"CONNECTION_POOL_KWARGS": {"max_connections": 512},
"IGNORE_EXCEPTIONS": True,
"SOCKET_CONNECT_TIMEOUT": 5, # in seconds
"SOCKET_TIMEOUT": 5, # in seconds
}
},
}
2.封装redis工具类
"""
全局管理缓存的调用,封装django自带的cache
主要用于小数据量的调用
"""
import json
import re
import redis
from contextlib import contextmanager
from functools import reduce
from django.core import signals
from django.core.cache import _create_cache
from django_redis import get_redis_connection
def query_cache(dbname):
"""
redis 扩展redis 原生 功能用此连接方式
"""
conn_redis = get_redis_connection(dbname)
return conn_redis
def get_cache(backend, **kwargs):
"""django-redis 自带方法请用这个"""
cache = _create_cache(backend, **kwargs)
signals.request_finished.connect(cache.close)
return cache
class CacheAdpter():
@classmethod
def hset(cls, cache_name, mainkey, subkey, values, second=None):
"""
单个添加 redis hash 值
:param cache_name: db
:param mainkey: 主key
:param subkey: 子key
:param second: 不建议设置过期时间,此过期时间针对mainkey设置,对subkey无效
:return
CacheAdpter.hmset('authority', CacheKey.AUTHY_CUSTOMER_ROLE, 20, key_1='values_11', key_2='values_662')
"""
use_cache = query_cache(cache_name)
use_cache.hset(mainkey, subkey, values)
if second:
use_cache.expire(mainkey, second)
@classmethod
def hmset(cls, cache_name, mainkey, second=None, **kwargs):
"""
批量添加 redis hash 值
:param cache_name: db
:param mainkey: 主key
:param **kwargs: 批量subkey
:param second: 不建议设置过期时间,此过期时间针对mainkey设置,对subkey无效
:return
CacheAdpter.hmset('authority', CacheKey.AUTHY_CUSTOMER_ROLE, 20, key_1='values_11', key_2='values_662')
"""
use_cache = query_cache(cache_name)
use_cache.hmset(mainkey, kwargs)
if second:
use_cache.expire(mainkey, second)
@classmethod
def hdel(cls, cache_name, mainkey, key):
"""
删除 redis hash 数据
:param cache_name: db
:param mainkey: 主key
:param subkey: 子key
:return:
"""
use_cache = query_cache(cache_name)
use_cache.hdel(mainkey, key)
@classmethod
def hget(cls, cache_name, mainkey, subkey, type=False):
"""
获取 redis hash 数据 扩展redis hash hget 调用
:param cache_name: db
:param mainkey: 主key
:param subkey: 子key
:return:
"""
use_cache = query_cache(cache_name)
result = use_cache.hget(mainkey, subkey)
# bytes 转 str
if type:
return None if result is None else str(result, encoding='utf-8')
else:
result = json.loads(result) if result else None
return result
@classmethod
def hkeys(cls, cache_name, mainkey, default=None):
"""
获取主key中所有子key的列表
:param cache_name: 缓存库
:param mainkey: 主key
:return: 返回列表
"""
default = default if default else []
use_cache = query_cache(cache_name)
keys_list = use_cache.hkeys(mainkey)
if not keys_list:
return default
return keys_list
@classmethod
def get_time_out(cls, cache, timeout):
"""
设置db 过期时间
:param cache: db
:param timeout: 过期时间
:return:
"""
if not timeout:
timeout = cache.default_timeout
return timeout
@classmethod
def get(cls, cache_name, key, default=None):
"""
获取 redis 数据
:param cache_name: db
:param key: 主key
:return:
"""
use_cache = get_cache(cache_name)
value = use_cache.get(key)
if value is None:
return default
return value
@classmethod
def set(cls, cache_name, key, value, timeout=0):
"""
存储 redis 数据
:param cache_name :db
:param key: key
:param value: values
:param timeout:设置过期时间 单位秒
:return:
"""
use_cache = get_cache(cache_name)
use_cache.set(key, value, cls.get_time_out(use_cache, timeout))
@classmethod
def get_many(cls, cache_name, keys, default=None):
"""
批量获取 redis 数据
:param cache_name :db
:param keys: key list []
:return:
"""
default = default if default else {}
use_cache = get_cache(cache_name)
value_dict = use_cache.get_many(keys)
if value_dict is None:
return default
return value_dict
@classmethod
def set_many(cls, cache_name, keys, timeout=0):
"""
批量设置 redis 过期数据
:param cache_name :db
:param keys: key list []
:param timeout 60*24
:return:
"""
use_cache = get_cache(cache_name)
use_cache.set_many(keys, cls.get_time_out(use_cache, timeout))
@classmethod
def delete(cls, cache_name, key):
"""
删除 redis set 数据
:param cache_name :db
:param key: key
:return:
"""
use_cache = get_cache(cache_name)
use_cache.delete(key)
@classmethod
def delete_many(cls, cache_name, keys):
use_cache = get_cache(cache_name)
for key in keys:
use_cache.delete(key)
@classmethod
def get_large_list(cls, cache_name, key):
use_cache = get_cache(cache_name)
value_list = []
grp_num = use_cache.get(key)
if grp_num is None:
return value_list
keys = [key + '_' + str(i) for i in range(grp_num)]
value_dict = use_cache.get_many(keys)
if not value_dict:
return value_list
else:
return reduce(list.__add__, value_dict.values())
@classmethod
def sadd(cls, cache_name, key, values):
"""
添加元素到redis集合中,添加多个元素需要封装为可迭代类型
:param key: 键
:param values: 值
:param cache_name: 缓存库
:return:
"""
use_cache = query_cache(cache_name)
if isinstance(values, str or int):
return use_cache.sadd(key, values)
return use_cache.sadd(key, *values)
@classmethod
def smembers(cls, cache_name, key):
"""
获取缓存中集合的所有元素
:param key: 键
:param cache_name: 缓存库
:return: 以列表返回所有结果
"""
use_cache = query_cache(cache_name)
value = use_cache.smembers(key)
if value:
return sorted([i.decode() for i in value])
return None
@classmethod
def delete_set_key(cls, cache_name, key):
"""
由于django自带的redis无法删除set(集合)中的键,所以调用此方法
:param key: 键
:param cache_name: 缓存库
:return:
"""
use_cache = query_cache(cache_name)
return use_cache.delete(key)
@classmethod
def incr(cls, cache_name, key, count, timeout=0):
use_cache = query_cache(cache_name)
use_cache.incr(key, count)
if timeout:
use_cache.expire(key, timeout)
@classmethod
def get_8origin(cls, cache_name, key, default=None):
"""
获取 redis 数据, 通过get_redis_connection原生连接获取数据
:param cache_name: db
:param key: 主key
:return:
"""
use_cache = query_cache(cache_name)
value = use_cache.get(key)
result = json.loads(value) if value else default
return result
@classmethod
def add(cls, cache_name, key, value, timeout=None):
"""
当缓存中键不存在时向缓存中写入键值,可以设置有效期
:param key: 键
:param value: 值
:param cache_name: 缓存库
:param timeout: 有效期时间
:return:
"""
use_cache = get_cache(cache_name)
if isinstance(timeout, int):
return use_cache.add(key, value, timeout)
else:
return use_cache.add(key, value, timeout=cls.get_time_out(use_cache, None))
更多推荐
所有评论(0)