Python中的单变量线性回归
单变量数据是结果仅取决于一个变量的数据类型。例如,一条线上的点数据集可以被认为是一个单变量数据,其中横坐标可以被认为是输入特征,纵坐标可以被认为是输出/结果。例如:
对于线Y = 2X + 3 ;
输入特征将是 X,Y 将是结果。
X | Y |
---|---|
1 | 5 |
2 | 7 |
3 | 9 |
4 | 11 |
5 | 13 |
概念:
对于单变量线性回归,只有一个输入特征向量。回归线将采用以下形式:
Y = b0 + b1 * X
Where,
b0 and b1 are the coefficients of regression.
因此,正在尝试通过训练模型来预测回归系数 b0 和 b1。
实用功能
- 预测
def predict(x, b0, b1): """Predicts the value of prediction based on current value of regression coefficients when input is x""" # Y = b0 + b1 * X return b0 + b1 * x
- 成本函数:
成本函数使用回归系数的当前值计算误差百分比。它定量地定义了模型与实际回归系数的误差率最低的程度。def cost(x, y, b0, b1): # y is a list of expected value errors = [] for x, y in zip(x, y): prediction = predict(x, b0, b1) expected = y difference = prediction-expected errors.append(difference) # Now, we have errors for all the observations, # for some input, the value of error might be positive # and for some input might be negative, # and if we directly add them up, # the values might cancel out leading to wrong output." # Hence, we use concept of mean squared error. # in mse, we return mean of square of all the errors. mse = sum([e * e for e in errors])/len(errors) return mse
- 成本导数
每次迭代后,成本与误差成正比升级。错误的性质对数据非常敏感。数据敏感是指误差值变化非常快,因为我们在误差函数中有平方。因此,为了使其更能容忍高误差值,我们推导了误差函数。
数学如下:代码:
def cost_derivative(x, y, b0, b1, i): return sum([ 2*(predict(xi, b0, b1)-yi)*1 if i == 0 else 2*(predict(xi, b0, b1)-yi)*xi for xi, yi in zip(x, y) ])/len(x)
- 更新系数:
在每次迭代(epoch)中,回归系数的值都会更新一个特定的值 wrt 到上一次迭代的误差。此更新非常重要,并且是您编写的机器学习应用程序的关键。
使用系数的精确更新来更新系数是通过用其先前值引起的一小部分误差来惩罚其值来完成的。
这个分数称为学习率。这定义了我们的模型达到收敛点的速度(理想情况下误差为 0 的点)。相同的Python函数如下:
def update_coeff(x, y, b0, b1, i, alpha): bi -= alpha * cost_derivative(x, y, b0, b1, i) return bi
- 停止迭代:
这是用于指定何时应该停止迭代的函数。
根据用户,算法 stop_iteration 通常在以下情况下返回 true:- Max Iteration :模型经过指定迭代次数的训练。
- 错误值:根据先前错误的值,算法决定是继续还是停止。
- Accuracy :取决于模型的最后一个准确度,如果它大于提到的准确度,则算法返回 True,
- 混合:这更常用。这结合了多个上述条件以及一个特殊的中断选项。特殊休息是指训练一直持续到坏事发生的情况。不好的事情可能包括结果溢出、超出时间限制等。
定义了所有实用函数后,让我们看看伪代码及其实现:
代码 :
x, y is the given data.
(b0, b1) <-- (0, 0)
i = 0
while True:
if stop_iteration(i):
break
else:
b0 = update_coeff(x, y, b0, b1, 0, alpha)
b1 = update_coeff(x, y, b0, b1, 1, alpha)
最终的 Oop 实现:
class LinearRegressor:
def __init__(self, x, y, alpha = 0.01, b0 = 0, b1 = 0):
"""
x: input feature
y: result / target
alpha: learning rate, default is 0.01
b0, b1: linear regression coefficient.
"""
self.i = 0
self.x = x
self.y = y
self.alpha = alpha
self.b0 = b0
self.b1 = b1
if len(x) != len(y):
raise TypeError("x and y should have same number of rows.")
def predict(model, x):
"""Predicts the value of prediction based on
current value of regression coefficients when input is x"""
# Y = b0 + b1 * X
return model.b0 + model.b1 * x
def cost_derivative(model, i):
x, y, b0, b1 = model.x, model.y, model.b0, model.b1
predict = model.predict
return sum([
2 * (predict(xi) - yi) * 1
if i == 0
else (predict(xi) - yi) * xi
for xi, yi in zip(x, y)
]) / len(x)
def update_coeff(model, i):
cost_derivative = model.cost_derivative
if i == 0:
model.b0 -= model.alpha * cost_derivative(i)
elif i == 1:
model.b1 -= model.alpha * cost_derivative(i)
def stop_iteration(model, max_epochs = 1000):
model.i += 1
if model.i == max_epochs:
return True
else:
return False
def fit(model):
update_coeff = model.update_coeff
model.i = 0
while True:
if model.stop_iteration():
break
else:
update_coeff(0)
update_coeff(1)
if __name__ == '__main__':
linearRegressor = LinearRegressor(
x =[i for i in range(12)],
y =[2 * i + 3 for i in range(12)],
alpha = 0.03
)
linearRegressor.fit()
print(linearRegressor.predict(12))
# expects 2 * 12 + 3 = 27