Python数据库连接池DBUtils(以pymysql模块为例连接数据库)

pip3 install DBUtils

DBUtils是Python的一个用于实现数据库连接池的模块。

此连接池有两种连接模式:

# BDUtils数据库链接池:
模式一:基于threaing.local实现为每一个线程创建一个连接,关闭是伪关闭,当前线程可以重复
模式二:连接池原理
    如果有三个线程来数据库中获取连接:
        如果三个同时来的,一人给一个连接;
        如果一个一个来,有时间间隔,用一个连接就可以为三个线程提供服务;
        其他情况:
            有可能:1个连接就可以为三个线程提供服务
            有可能:2个连接就可以为三个线程提供服务
            有可能:3个连接就可以为三个线程提供服务
    PS:maxshared在使用pymysql中均无用。链接数据库的模块:只有threadsafety>1的时候才有用。

模式一:为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭。(如果线程比较多还是会创建很多连接,推荐使用模式二)

import pymysql
from DBUtils.PersistentDB import PersistentDB
 
POOL = PersistentDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxusage=None,    # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],    # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 如果为False时,conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    host="127.0.0.1",
    port=3306,
    user="root",
    password="",
    database="code_record",
    charset="utf8"
)
 
def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute("select * from userinfo")
    ret = cursor.fetchall()
    print(ret)
    cursor.close()
    conn.close()
 
func()

模式二:创建一批连接到连接池,供所有线程共享使用,使用完毕后再放回到连接池。(由于pymysql、MySQLdb等threadsafety值为1,所以该模式连接池中的线程会被所有线程共享。)

import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
    creator=pymysql,   # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,
    # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用,如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host="127.0.0.1",
    port=3306,
    user="root",
    password="",
    database="code_record",
)
 
def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    # 否则
    # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    conn = POOL.connection()
    cursor = conn.cursor()
    cursor.execute('select * from userinfo')
    result = cursor.fetchall()
    print(result)
    conn.close()
 
func()

为什么要使用数据库连接池呢?

如果没有连接池,使用pymysql来连接数据库时,单线程应用完全没有问题;但如果涉及到多线程应用那么就需要加锁,一旦加锁那么连接势必就会排队等待(无法并发),当请求比较多时,性能就会降低了。

加锁

import pymysql
import threading
from threading import RLock
 
LOCK = RLock()
CONN = pymysql.connect(host='127.0.0.1',
                       port=3306,
                       user='root',
                       password='',
                       database='code_record',
                       charset='utf8')
 
def task(arg):
    with LOCK:
        cursor = CONN.cursor()
        cursor.execute('select * from userinfo')
        ret = cursor.fetchall()
        cursor.close()
        print(ret)
 
for i in range(10):
    t = threading.Thread(target=task, args=(i,))
    t.start()

不加锁(报错)

import pymysql
import threading
CONN = pymysql.connect(host='127.0.0.1',
                       port=3306,
                       user='root',
                       password='',
                       database='code_record',
                       charset='utf8')
 
def task(arg):
    cursor = CONN.cursor()
    cursor.execute('select * from userinfo')
    ret = cursor.fetchall()
    cursor.close()
    print(ret)
 
for i in range(10):
    t = threading.Thread(target=task, args=(i,))
    t.start()

 

UNIQUE 约束 pymsql 插入数据

# -*- coding: utf-8 -*-
import pymysql


def insert_db(items_list):
    db = pymysql.connect(host='localhost',
                         database='shops1',
                         user='shops1',
                         port=3306,
                         password='123456')
    cursor = db.cursor()
    insert_sql = "INSERT IGNORE INTO tp_shops(sellerId,shopName) VALUES (%s,%s)"
    try:
        result = cursor.executemany(insert_sql, items_list)
        db.commit()
        return result
    except Exception as e:
        print(e)
        # 如果发生错误则回滚
        db.rollback()
    # 关闭数据库连接
    db.close()


if __name__ == '__main__':
    shop = [565656565, '磬格旗舰店']
    shops = [(894111711, 'aaaaaaaaaa'),(895746871, '6568878787887'),(565656565656666666666, '6568878787887')]
    print(insert_db(shops))

 

文件夹下csv数据写入mysql数据库

import os
import pandas as pd
import csv
import pymysql


def save_db(row):
    db = pymysql.connect(host='127.0.0.1', user='shijuan', password='123456', database='shijuan',
                         charset="utf8")
    cursor = db.cursor()
    insert_sql = 'insert into tp_keyword(keyword) values(%s)'
    try:
        # cursor.execute(insert_sql)
        cursor.executemany(insert_sql, row)
        db.commit()
    except:
        # 如果发生错误则回滚
        db.rollback()
        cursor.close()
        # 关闭数据库连接
        db.close()


path = 'E:\网站\关键词\\12'
files = os.listdir(path)
for file in files:
    url=path + '\\' + file
    df = pd.read_csv(url,low_memory=False,quoting=csv.QUOTE_NONE)
    result = df.values.tolist()
    # print(result)
    save_db(result)

 

转换csv编码格式为utf-8

# -*- coding: utf-8 -*-
import os
from chardet.universaldetector import UniversalDetector


def get_filelist(path):
    """
    获取路径下所有csv文件的路径列表
    """
    Filelist = []
    for home, dirs, files in os.walk(path):
        for filename in files:
            if ".csv" in filename:
                Filelist.append(os.path.join(home, filename))
    return Filelist


def read_file(file):
    """
    逐个读取文件的内容
    """
    with open(file, 'rb') as f:
        return f.read()


def get_encode_info(file):
    """
    逐个读取文件的编码方式
    """
    with open(file, 'rb') as f:
        detector = UniversalDetector()
        for line in f.readlines():
            detector.feed(line)
            if detector.done:
                break
        detector.close()
        return detector.result['encoding']


def convert_encode2utf8(file, original_encode, des_encode):
    """
    将文件的编码方式转换为utf-8,并写入原先的文件中。
    """
    file_content = read_file(file)
    file_decode = file_content.decode(original_encode, 'ignore')
    file_encode = file_decode.encode(des_encode)
    with open(file, 'wb') as f:
        f.write(file_encode)


def read_and_convert(path):
    """
    读取文件并转换
    """
    Filelist = get_filelist(path=path)
    fileNum = 0
    for filename in Filelist:
        try:
            file_content = read_file(filename)
            encode_info = get_encode_info(filename)
            if encode_info != 'utf-8':
                fileNum += 1
                convert_encode2utf8(filename, encode_info, 'utf-8')
                print('成功转换 %s 个文件 %s ' % (fileNum, filename))
        except BaseException:
            print(filename, '存在问题,请检查!')


def recheck_again(path):
    """
    再次判断文件是否为utf-8
    """
    print('---------------------以下文件仍存在问题---------------------')
    Filelist = get_filelist(path)
    for filename in Filelist:
        encode_info_ch = get_encode_info(filename)
        if encode_info_ch != 'utf-8':
            print(filename, '的编码方式是:', encode_info_ch)

    print('--------------------------检查结束--------------------------')


if __name__ == "__main__":
    """
    输入文件路径
    """
    path = 'E:\网站\关键词\\123'
    read_and_convert(path)
    recheck_again(path)
    print('转换结束!')

 

基于selenium godaddy域名历史利用聚名自动化检查

"""
godaddy域名自动化检查
"""
from selenium import webdriver
import time
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
import json
import re
from importcsv import importCsv


def checkDomain(domain):
    d = DesiredCapabilities.CHROME
    d['loggingPrefs'] = {'performance': 'ALL'}
    chrome_options = Options()
    chrome_options.add_experimental_option('w3c', False)  # 重要参数,不添加在无头模式无法获取日志!!
    chrome_options.add_argument('--disable-blink-features=AutomationControlled')  # 重点代码:去掉了webdriver
    caps = {
        'browserName': 'chrome',
        'loggingPrefs': {
            'browser': 'ALL',
            'driver': 'ALL',
            'performance': 'ALL',
        },
        'goog:chromeOptions': {
            'perfLoggingPrefs': {
                'enableNetwork': True,
            },
            'w3c': False,
        },
    }
    # 填写webdriver的保存目录
    driver = webdriver.Chrome(desired_capabilities=caps, chrome_options=chrome_options)

    # 记得写完整的url 包括http和https
    driver.get('http://www.jucha.com/lishi/')

    # 首先清除由于浏览器打开已有的cookies
    driver.delete_all_cookies()

    time.sleep(3)

    f1 = open('cookie.txt')
    cookie = f1.read()
    cookie_list = json.loads(cookie)  # json读取cookies
    for c in cookie_list:
        driver.add_cookie(c)  # 取出的cookie循环加入driver

    driver.refresh()

    # 定位到搜索框元素,并且填充内容为域名
    driver.find_element_by_xpath('/html/body/div[2]/div[1]/div[2]/div[2]/div[2]/div[1]/textarea').send_keys(domain)
    # 点击查询按钮
    driver.find_element_by_xpath('/html/body/div[2]/div[1]/div[2]/div[2]/div[2]/div[2]/div[1]/div/button').click()

    time.sleep(2)

    # 获取滑块位置
    ele_button = driver.find_element_by_xpath('//*[@id="nc_1_n1z"]')
    # 获取滑条
    ele = driver.find_element_by_xpath('//*[@id="nc_1__scale_text"]/span')
    # print('滑块区域的宽:', ele.size['width'])
    # print('滑块区域的高:', ele.size['height'])
    # 拖动滑块滑条末尾
    ActionChains(driver).drag_and_drop_by_offset(ele_button, ele.size['width'], ele.size['height']).perform()

    time.sleep(3)

    request_log = driver.get_log('performance')
    # for entry in logs:
    #             params = json.loads(entry.get('message')).get('message').get('params')
    #             print(params)  # 请求连接 包含错误连接
    for i in range(len(request_log)):
        message = json.loads(request_log[i]['message'])
        message = message['message']['params']
        # .get() 方式获取是了避免字段不存在时报错
        request = message.get('request')
        if (request is None):
            continue
        url = request.get('url')
        # print(url)
        if 'http://www.jucha.com:8866/item/search?domain' in url:
            # 得到requestId
            print(message['requestId'])
            # 通过requestId获取接口内容
            content = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': message['requestId']})
            pattern = re.compile(r'callback_\d+\((.+)\);')
            newstr = re.search(pattern, content['body'])
            jsonDate = json.loads(newstr.group(1))
            print(jsonDate)
            if jsonDate['data']['lishi']['data']['data']['yy'] == '中文':
                if int(jsonDate['data']['lishi']['data']['data']['nl']) > 5:
                    if int(jsonDate['data']['lishi']['data']['data']['jls']) > 10:
                        return domain
    driver.close()


if __name__ == '__main__':
    rows = importCsv('Export.csv')
    for row in rows:
        newDomain = checkDomain(row)
        if newDomain is not None:
            print(newDomain)