这篇文章中包含了基础python中的常用代码小抄,包括:

  • python中文件的按顺序列出;
  • 写入、写出txt,json和pkl、移动文件的常用代码;
  • 常用正则表达;
  • 进程池的示例代码。

按顺序列出文件

在os.path相关使用中,我们可以看到有几个关于获取文件时间的函数:

os.path.getctime():获取文件创建(create)时间
os.path.getatime():获取文件最近访问(access)时间
os.path.getmtime():获取文件最近修改(modify)时间

import os

dir_name = "./tmp"

file_list = os.listdir(dir_name)
# 获取按照文件时间创建排序的列表,默认是按时间升序
new_file_list = sorted(file_list, key=lambda file: os.path.getctime(os.path.join(dir_name, file)))

文件写入写出移动

1. txt

numpy数组写入到txt

g = np.array([1,2,3])
np.savetxt(path, np.c_[g], fmt='%d', delimiter=',')

# or
with open(path,'w') as f: 
    np.savetxt(f, np.c_[g], fmt='%d', delimiter=',')

读取到列表

# 方法一
with open("train.txt", 'r') as f:
    img_name = [x.strip() for x in f]
# 方法二
with open("train.txt", 'r', encoding='utf-8') as f:
    lines = f.readlines()

列表写入txt

# 如果filename不存在会自动创建, 'w'表示写数据,写之前会清空文件中的原有数据!
filename = "info.txt"
l=["A","B","C","D"]
with open(filename,'w') as f: 
    f.writelines(l)

# 如果要格式化输出列表的内容,需要把原来的数据转化为字符串组成的列表,再用上述方法写入
data = [[1,2][3,4]]
l = []
for i in range(len(data)):
    l.append(','.join([str(data[i][0]), str(data[i][1]), '\n']))

2. json

json 模块提供了一种很简单的方式来编码和解码JSON数据。 其中两个主要的函数是 json.dumps() 和 json.loads()。

json.dumps将一个Python数据结构转换为JSON:

import json
data = {
    'name' : 'myname',
    'age' : 100,
}
json_str = json.dumps(data)

json.loads将一个JSON编码的字符串转换回一个Python数据结构:

data = json.loads(json_str)

而json.dump() 和 json.load() 来编码和解码JSON数据,用于处理文件。

json.dump(json_dict, f, indent=4)可以增加缩进效果

with open('test.json', 'w', encoding="utf-8") as f:
    json.dump(data, f, ensure_ascii=False, indent=1)
with open('test.json', 'r', encoding="utf-8") as f:
    data = json.load(f) 

对于GB级别的大json,直接load对内存的占用比较大,可以通过ijson流式读取,示例如下:

import ijson
with open(anno_file_path, 'r', encoding='utf-8') as f:
    objects = ijson.items(f, 'item')
    while True:
        try:
            imgcase = objects.__next__()
        except StopIteration as e:
            print("数据读取完成")
            break

JSONL(JSON Lines)是一种将多个JSON对象按行分隔存储的格式,每行都是一个独立的JSON对象。这样方便逐行读取:

import json

data = [
    {
        'name': 'John',
        'age': 30,
        'city': 'New York'
    },
    {
        'name': 'Alice',
        'age': 25,
        'city': 'London'
    },
    {
        'name': 'Bob',
        'age': 35,
        'city': 'Paris'
    }
]

# 将数据保存为JSONL文件
with open('data.jsonl', 'w') as f:
    for item in data:
        f.write(json.dumps(item) + '\n')

# 从JSONL文件逐行读取数据
with open('data.jsonl', 'r') as f:
    for line in f:
        data = json.loads(line)
        print(data)

3. 保存成 excel文件.xlsx

import xlsxwriter

workbook = xlsxwriter.Workbook(xlsx_file)
worksheet = workbook.add_worksheet()
bold = workbook.add_format({'bold': True})
normal = workbook.add_format({'bold': False})

titles = ["logid", "lang", "history_massage"]
# 写入标题行
for col_num, title in enumerate(titles):
    worksheet.write(0, col_num, title)

# 写入数据
for row_num, row_data in enumerate(data, start=1):   # start=1 表示从第二行开始写入数据
    case_list = []
    case_list.append(row_data["logid"])
    case_list.append(row_data["lang"])
    case_list.append(row_data["text"])

    for col_num, cell_data in enumerate(case_list):
        if col_num == 2:  # 第三列
            # 创建富文本字符串
            msg = cell_data.split('current_query')
            rich_text = (
                normal, msg[0],
                bold, 'Current_query' + msg[1]
            )
            worksheet.write_rich_string(row_num, col_num, *rich_text)
        else:
            worksheet.write(row_num, col_num, cell_data)


workbook.close()

4. 序列化文件

import pickle
with open('a.pkl', 'rb') as f:
    a = pickle.load(f)

with open(path_i, 'wb') as f:
    pickle.dump((train_tasks[i], test_tasks[i]), f)

5. 移动、删除、复制文件夹

import shutil

if os.path.exists(target_path):
    shutil.rmtree(target_path)

# 复制和移动文件,如果目的文件夹不存在,或者要复制和移动的文件不存在,则都会报错。
# 如果同名文件存在,文件可以成功复制,并覆盖目标文件。而移动文件的话,同名文件存在,则报错。
shutil.move('file1.txt','新的文件夹')
shutil.move('file2.txt','新的文件夹/new2.txt')
shutil.copy('demo.txt','新的文件夹')
shutil.copy('demo.txt','新的文件夹/new1.txt')
# 复制文件夹,目标路径下有同名文件夹,会报错。
shutil.copytree('要复制的文件夹', '目的文件夹/要复制的文件夹')
shutil.copytree('要复制的文件夹', '目的文件夹/新文件夹')

正则表达式(youtube 视频讲解):

推荐的测试页面:https://regex101.com/

  • 限定符:前面的字符出现0次或1次?, 匹配0个或多个前面的字符*, 前面的字符出现1次或多次+
  • 括号的区别:用()扩起需要匹配的字符串, 输入匹配次数的范围用{}, 匹配字符集合用[]
  • 常用元字符:数字字符\d, 单词字符(英文、数字、下划线)\w, 空白符(空格、Tab、换行)\s,对应的大写\D为非数字字符, \W为非单词字符, \S为非空白符。.代表不包括换行符的任意字符,\b标柱字符的边界。^匹配行首, $匹配行尾。
  • 贪婪匹配与懒惰匹配:例如<.+>与<.+?>
简单正则表达

多进程与进程池

import os
import math
import argparse
from tqdm import tqdm
from multiprocessing import Pool

@staticmethod
def err_call_back(err):
    print(f'出错啦~ error:{str(err)}')

def split_list(lst, n):
    """Split a list into n (roughly) equal-sized chunks"""
    chunk_size = math.ceil(len(lst) / n)  # integer division
    return [lst[i:i+chunk_size] for i in range(0, len(lst), chunk_size)]


def get_chunk(lst, n, k):
    chunks = split_list(lst, n)
    return chunks[k]

def run_demo(chunk_id, num_chunks):
    pages = os.listdir(root_path)
    pages.sort()
    pages_chunk = get_chunk(pages, num_chunks, chunk_id)

    for page in tqdm(pages_chunk):
        # do your code


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--num-chunks", type=int, default=8)
    args = parser.parse_args()

    num_chunks = args.num_chunks

    with Pool(num_chunks) as p:
        for i in range(num_chunks):
            chunk_id = i
            p.apply_async(run_demo, (chunk_id, num_chunks), error_callback=err_call_back)
        p.close()
        p.join()

标签: python

添加新评论