微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python操作redis普通操作,连接池,封装

安装python包

pip install redis==2.10.6

普通操作

import redis

abc = {"aa":{"bb":"cc"}}
r = redis.Redis(host='192.168.2.139',port=6379,db=1,password="111111",decode_responses=True)
try:
    r.ping()
except TimeoutError:
    print('redis connection timeout')
r.lpush("abccba",abc)

连接池

#-*-coding:utf-8-*-
import redis

# 连接池连接使用,节省了每次连接用的时间
conn_pool = redis.ConnectionPool(host='localhost',decode_responses=True)
# 第一个客户端访问
re_pool = redis.Redis(connection_pool=conn_pool,decode_responses=True)
# 第二个客户端访问
re_pool2 = redis.Redis(connection_pool=conn_pool,decode_responses=True)
# key value存储到redis数据库
try:
    re_pool.set('chinese1','hello_world')
    re_pool2.set('chinese2','hello_python')
except Exception as e:
    print(e)
# 根据key获取存的数据的内容
data_info = re_pool.get('chinese1')
data_info2 = re_pool.get('chinese2')
# 输出从redis库中取出来的数据的内容
print(data_info)
print(data_info2)
# 获取两个连接的信息
id1 = re_pool.client_list()
id2 = re_pool2.client_list()
# 输出两个连接的id,判断是否一致
print('re_pool_id{}======re_pool2_id{}'.format(id1[0]['id'],id2[0]['id']))

 

实例封装

#! /usr/bin/env python
# -*- coding: utf-8 -*-


import sys
import time
import logging

import redis

logger = logging.getLogger()


class RedisDB(object):
    def __init__(
        self,host,port,password,db,list_name,):
        self.host = host
        self.port = port
        self.password = password
        self.db = db
        self.list_name = list_name
        self.lock = False
        self.redis_conn = None
        #self.redis_conn = self.get_redis_connect()
        #if self.redis_conn is None:
        #    logger.error("Redis connect fail.")
        #    sys.exit()

    def get_redis_connect(self):
        #connect_num = 5
        #while connect_num:
        while True:
            try:
                self.redis_conn = redis.Redis(
                    self.host,self.port,self.db,self.password,decode_responses=True
                )
                self.redis_conn.ping()
                self.lock = True
                break
            except Exception as e:
                logger.error("Redis connection timeout,waiting for reconnection.")
                #if connect_num > 0:
                #    connect_num -= 1
                #    continue
                self.lock = False
                time.sleep(5)
                continue

    def pinging(self):
        try:
            self.redis_conn.ping()
            return True
        except Exception as e:
            logger.error("Redis connection timeout")
            return False

    def get_redis_data(self):
        try:
            data = self.redis_conn.rpop(self.list_name)
            return data
        except Exception as e:
            logger.error("Get redis data fail.")
            return None

    def send_result(self,result_list_name,result):
        try:
            self.redis_conn.lpush(result_list_name,result)
            return True
        except Exception as e:
            logger.error("Send ai result data to redis fail: {}.".format(e))
            return False

调用

def thread_get_data():
    global redis_obj

    try:
        while True:
            if not redis_obj.lock:
                time.sleep(1)
                continue

            if thread_lock:
                continue

            #data = json.loads(redis_obj.get_redis_data())
            data = ast.literal_eval(line.strip())   

            if data is None:
                time.sleep(1)
                continue
         
            event_type = data["event_type"]

            # dns
            if event_type.lower() == "dns":
                。。。。。。
            # icmp
            elif event_type.lower() == "icmp":
                。。。。。。
    except Exception as e:
        #logger.error("Get redis data thread error:{0},{1}".format(e,e.__traceback__.tb_lineno))
        print("Get redis data thread error:{0},e.__traceback__.tb_lineno))

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐