Numpy
Basic
| 类别 | 方法列表 | 
|---|---|
| 创建 | array, zeros, ones, empty, arange, linspace, logspace, eye, diag, frombuffer, fromfunction | 
| 变形 | reshape, resize, flatten, ravel, transpose, swapaxes, broadcast_to | 
| 数学 | add, subtract, multiply, divide, power, sqrt, exp, log, sin, cos, tan, arcsin | 
| 统计 | sum, mean, std, var, min, max, argmin, argmax, median, percentile, corrcoef | 
| 线性代数 | dot, matmul, inner, outer, linalg.inv, linalg.det, linalg.eig, linalg.svd | 
| 逻辑 | logical_and, logical_or, logical_not, all, any, isnan, isinf | 
| 集合 | unique, in1d, intersect1d, union1d, setdiff1d, setxor1d | 
| 文件 | save, load, savez, savetxt, loadtxt, genfromtxt | 
| 随机 | random.rand, random.randn, random.randint, random.choice, random.shuffle | 
创建
import numpy as np
 
# ---------------- 创建 ----------------
a  = np.array([1, 2, 3])            # 列表 → 数组
z  = np.zeros((2, 3))               # 全 0
o  = np.ones((2, 3))                # 全 1
e  = np.empty((2, 2))               # 未初始化
ar = np.arange(0, 10, 2)            # 等差
li = np.linspace(0, 1, 5)           # 均分
lo = np.logspace(0, 2, 3)           # 10^0, 10^1, 10^2
I  = np.eye(3)                      # 单位矩阵
d  = np.diag([1, 2, 3])             # 对角矩阵
buf = np.frombuffer(b'\x01\x02\x03\x04', dtype='uint8')  # bytes → 数组
fun = np.fromfunction(lambda i, j: i + j, (3, 3))        # 函数生成
 
print("创建示例汇总:\n", a, z.shape, ar, li, lo, d, buf, sep="\n")数组操作
拼接分割
a = np.array([[1,2], [3,4]])
b = np.array([[5,6]])
 
# 垂直拼接
np.vstack((a, b))    # [[1,2],[3,4],[5,6]]
 
# 水平拼接
np.hstack((a, b.T))  # [[1,2,5],[3,4,6]]
 
# 分割数组
first, second = np.hsplit(a, 2)索引切片
arr = np.array([[1,2,3], [4,5,6], [7,8,9]])
 
# 基本索引
arr[1, 2]    # 6 (第1行第2列)
arr[0:2, 1:] # [[2,3],[5,6]]
 
# 布尔索引
mask = arr > 4
arr[mask]     # [5,6,7,8,9]
 
# 花式索引
arr[[0,2], [1,0]]  # [2,7] (0行1列, 2行0列)变形
x = np.arange(12).reshape(3, 4)
 
# ---------------- 变形 ----------------
r1 = x.reshape(4, 3)          # 改变形状(返回视图/复制)
r2 = np.resize(x, (2, 2))     # 强制裁剪/填充
f1 = x.flatten()              # 展平(拷贝)
f2 = x.ravel()                # 展平(视图)
t  = x.transpose()            # 转置
sw = x.swapaxes(0, 1)         # 轴交换
bt = np.broadcast_to(x, (2, 3, 4))  # 广播到目标形状
 
print("变形后形状:", r1.shape, t.shape, bt.shape)数学
a = np.array([1, 4, 9])
b = np.array([2, 2, 2])
 
# ---------------- 数学 ----------------
add = np.add(a, b)         # 加
sub = np.subtract(a, b)    # 减
mul = np.multiply(a, b)    # 乘
div = np.divide(a, b)      # 除
pow = np.power(a, 2)       # 幂
sq  = np.sqrt(a)           # 平方根
exp = np.exp(a[:2])        # e^x
log = np.log(a[1:])        # 自然对数
sin = np.sin(np.pi/6 * np.ones(3))
cos = np.cos(np.pi/3 * np.ones(3))
tan = np.tan(np.array([0, np.pi/4]))
arcs = np.arcsin([0, 1])
 
print("数学示例:\n", add, sub, mul, div, pow, sq, exp, log, sin, cos, tan, arcs)统计
x = np.array([1, 2, 3, 4, 5])
 
# ---------------- 统计 ----------------
s  = x.sum()
m  = x.mean()
st = x.std()
v  = x.var()
mn = x.min()
mx = x.max()
ami = x.argmin()
amx = x.argmax()
med = np.median(x)
pct = np.percentile(x, 75)
cor = np.corrcoef(x, x**2)[0, 1]
 
print("统计结果:sum", s, "mean", m, "std", st, "corr", cor)线性代数
A = np.array([[1, 2], [3, 4]])
B = np.array([[2, 0], [1, 2]])
v = np.array([1, 2])
 
# ---------------- 线性代数 ----------------
dot  = np.dot(A, v)          # 矩阵-向量乘
mat  = A @ B                 # 矩阵乘
inn  = np.inner(v, v)        # 内积
out  = np.outer(v, v)        # 外积
inv  = np.linalg.inv(A)      # 逆矩阵
det  = np.linalg.det(A)      # 行列式
eig  = np.linalg.eigvals(A)  # 特征值
svd  = np.linalg.svd(A)      # 奇异值分解
 
print("线性代数示例:dot", dot, "det", det, "eig", eig)逻辑
a = np.array([1, np.nan, 3])
b = np.array([2, 2, 2])
 
# ---------------- 逻辑 ----------------
land = np.logical_and(a>1, b>1)
lor  = np.logical_or(a>2, b>2)
lnot = np.logical_not(a>1)
alla = np.all(b>0)
anya = np.any(np.isnan(a))
nan  = np.isnan(a)
inf  = np.isinf(np.array([1, np.inf]))
 
print("逻辑结果:and", land, "or", lor, "not", lnot, "all", alla, "any", anya)集合
x = np.array([1, 2, 3, 3, 4])
y = np.array([3, 4, 5])
 
# ---------------- 集合 ----------------
un  = np.unique(x)              # 唯一值
in1 = np.in1d(x, y)             # x 的元素是否在 y
int1 = np.intersect1d(x, y)     # 交集
uni1 = np.union1d(x, y)         # 并集
diff = np.setdiff1d(x, y)       # 差集
xor  = np.setxor1d(x, y)        # 对称差
 
print("集合示例:\n unique", un, "in1d", in1, "intersect", int1, "union", uni1)文件
x = np.arange(6).reshape(2, 3)
 
# ---------------- 文件 ----------------
np.save('tmp.npy', x)                   # 二进制 .npy
np.savetxt('tmp.csv', x, fmt='%d')      # csv
np.savez('tmp.npz', x=x)                # 压缩多数组
 
y1 = np.load('tmp.npy')
y2 = np.loadtxt('tmp.csv', dtype=int)
y3 = np.load('tmp.npz')['x']
 
print("文件读回:\n", y1, y2, y3)随机
# ---------------- 随机 ----------------
r1 = np.random.rand(2, 2)               # 均匀 [0,1)
r2 = np.random.randn(2, 2)              # 标准正态
r3 = np.random.randint(0, 10, 5)        # 随机整数
r4 = np.random.choice(10, 5, replace=False)  # 无放回抽样
arr = np.arange(10)
np.random.shuffle(arr)                   # 原地洗牌
 
print("随机示例:\n", r1, r2, r3, r4, arr)广播机制
A = np.ones((3,3))
B = np.array([1,2,3])
A + B  # 自动扩展为:[[2,3,4],[2,3,4],[2,3,4]]结构化数组
# 创建表格数据
dtype = [('name', 'S10'), ('age', 'i4'), ('height', 'f8')]
people = np.array([('Alice', 25, 1.65), ('Bob', 30, 1.8)], dtype=dtype)
 
# 按字段查询
people[people['age'] > 27]  # [('Bob', 30, 1.8)]向量化函数
# 创建自定义向量函数
def custom_func(x):
    return x**2 + 2*x + 1
 
vectorized_func = np.vectorize(custom_func)
vectorized_func(np.array([1,2,3]))  # [4,9,16]内存映射
# 处理超大文件
large_data = np.memmap('bigdata.dat', dtype='float32', mode='r', shape=(10000, 10000))应用
金融量化:分钟线批量因子
数据:随机生成 4 年、3000 只股票、1 分钟 K 线(OHLCV),约 2 GB→ 这里只用 10 只 × 1 天做演示。 因子:日内波动率、10 分钟 VWAP、相对开盘涨跌幅。
import numpy as np, pandas as pd, datetime as dt
 
# 1. 生成 mock 数据 ----------------------------------------------------------
n_stocks, days = 10, 1
mins = 240 * days
rng = np.random.default_rng(42)
 
dates = pd.date_range('2024-07-01', periods=mins, freq='1min')
stock_ids = [f'S{i:04d}' for i in range(n_stocks)]
 
# 用结构化数组一次生成
dtype = np.dtype([('open','f4'),('high','f4'),('low','f4'),('close','f4'),('vol','i4')])
bars = np.zeros((n_stocks, mins), dtype=dtype)
 
for i in range(n_stocks):
    drift = rng.normal(0, 0.0005, mins).cumsum()
    noise = rng.normal(0, 0.001, mins)
    close = 10 * np.exp(drift + noise)
    high  = np.maximum(close, close*rng.uniform(1, 1.02))
    low   = np.minimum(close, close*rng.uniform(0.98, 1))
    open_ = np.roll(close, 1); open_[0] = close[0]
    bars[i]['open']=open_; bars[i]['high']=high
    bars[i]['low']=low; bars[i]['close']=close
    bars[i]['vol']=rng.integers(1000, 10000, mins)
 
# 2. 向量化计算因子 ---------------------------------------------------------
close = bars['close']            # shape (10, 240)
vol   = bars['vol']
 
# 日内波动率
intraday_vol = close.std(axis=1)
 
# 10 分钟 VWAP
window = 10
vwap = (close * vol).cumsum(axis=1) / vol.cumsum(axis=1)
vwap10 = vwap[:, window-1]
 
# 相对开盘涨幅
ret_vs_open = (close[:, -1] / close[:, 0]) - 1
 
result = pd.DataFrame({
    'stock': stock_ids,
    'ivol': intraday_vol,
    'vwap10': vwap10,
    'ret_open': ret_vs_open
})
print(result.head())电商 RFM
数据:1 亿条订单记录(user_id, amount, ts),用内存映射避免一次性读入。 RFM:R=最近一次距今天数,F=90 天订单数,M=90 天金额。
import numpy as np, time, datetime as dt
 
# 0. 生成 1 亿条 mock 数据 ----------------------------------------------------
N = 100_000_000
rng = np.random.default_rng(42)
user_ids = rng.integers(0, 5_000_000, N)      # 500 万用户
amounts  = rng.lognormal(4, 1.2, N).astype('float32')
# 时间戳:最近 90 天
ts_max = np.datetime64('2024-07-28')
timestamps = ts_max - rng.integers(0, 90*24*3600, N).astype('timedelta64[s]')
 
# 1. 内存映射方式保存 *.npy ---------------------------------------------------
np.save('orders_user.npy', user_ids)
np.save('orders_amt.npy', amounts)
np.save('orders_ts.npy', timestamps.view('int64'))
 
# 2. 使用 NumPy 计算 RFM ------------------------------------------------------
tic = time.time()
uids = np.load('orders_user.npy', mmap_mode='r')
amts = np.load('orders_amt.npy',  mmap_mode='r')
tss  = np.load('orders_ts.npy',   mmap_mode='r').view('datetime64[s]')
 
mask = (ts_max - tss).astype('int64') < 90*24*3600  # 最近 90 天
f = np.bincount(uids[mask], minlength=5_000_000)
m = np.bincount(uids[mask], weights=amts[mask], minlength=5_000_000)
r_days = (ts_max - np.minimum.reduceat(tss, np.unique(uids, return_index=True)[1])) / np.timedelta64(1,'D')
 
# 打包结果
rfm = np.vstack([r_days, f, m]).T
print('耗时:', time.time()-tic, '秒')  # ~60 s on 8-core医疗影像
CT 三维体素 Sobel 边缘
数据:使用 scikit-image 自带 3D 体数据(512×512×300)→ 演示用 128×128×64 子块。
from skimage import data
import numpy as np
from scipy.ndimage import sobel
 
# 1. 加载 3D 体数据 ----------------------------------------------------------
# 这里用 Shepp-Logan head phantom
vol = data.shepp_logan_phantom()
vol = vol[::4, ::4, ::4]          # 下采样 128x128x64
 
# 2. Sobel 边缘检测 ----------------------------------------------------------
gx = sobel(vol, axis=0)
gy = sobel(vol, axis=1)
gz = sobel(vol, axis=2)
g = np.sqrt(gx**2 + gy**2 + gz**2)
 
print('边缘体素占比:', (g>0.1).mean()*100, '%')
 工业振动
实时 FFT 故障频率 数据:模拟 10 kHz 采样,1 s 窗口,100 条通道。
import numpy as np, time
from numba import njit
 
fs = 10_000
N = 100_000
channels = 100
rng = np.random.default_rng(42)
 
# 1. 生成带故障频率的振动信号 -----------------------------------------------
t = np.arange(N)/fs
signal = rng.normal(0, 1, (channels, N))
# 加 250 Hz 故障谐波
signal += 0.3*np.sin(2*np.pi*250*t)
 
# 2. 实时 FFT ---------------------------------------------------------------
@njit(cache=True, fastmath=True)
def batch_fft(x):
    return np.abs(np.fft.rfft(x, axis=1))
 
tic = time.time()
spec = batch_fft(signal)     # 100×5001
print('100 通道 1 s FFT 耗时:', time.time()-tic, '秒')
 
# 3. 找出故障
freqs = np.fft.rfftfreq(N, 1/fs)
peak = freqs[np.argmax(spec, axis=1)]
print('故障频率检测均值:', peak.mean(), 'Hz')光伏预测
气象格点到电站插值
数据:ERA5 0.25° 网格 2 m 温度、10 m 风速,插值到 3000 个电站坐标。
import numpy as np
from scipy.interpolate import RegularGridInterpolator
 
# 1. mock 气象格点 -----------------------------------------------------------
lats = np.linspace(15, 55, 161)   # 0.25°
lons = np.linspace(70, 140, 281)
temp = 20 + 10*np.sin(np.pi*lats/90)[:,None] + 5*np.cos(2*np.pi*lons/180)[None,:]
wind = 3 + 1.5*np.random.rand(*temp.shape)
 
# 2. mock 电站坐标 -----------------------------------------------------------
n_plants = 3000
rng = np.random.default_rng(42)
plant_lat = rng.uniform(15, 55, n_plants)
plant_lon = rng.uniform(70, 140, n_plants)
 
# 3. 双线性插值 --------------------------------------------------------------
interp_T = RegularGridInterpolator((lats, lons), temp, bounds_error=False, fill_value=None)
interp_W = RegularGridInterpolator((lats, lons), wind, bounds_error=False, fill_value=None)
 
T_site = interp_T(np.c_[plant_lat, plant_lon])
W_site = interp_W(np.c_[plant_lat, plant_lon])
 
# 4. 简单线性回归预测功率(MW) ----------------------------------------------
power = 0.15*W_site + 0.4*T_site - 2  # 伪模型
print('预测功率均值:', power.mean(), 'MW')←  Previous postPython类库Pandas
Next post  →Python码农们最容易犯的错(千万要规避)

