import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import cauchy, gamma, expon, lognorm, loglaplace
# Define the x range for the plot
= np.linspace(0, 15, 1000)
x
# Create a single row with 5 subplots
= plt.subplots(1, 5, figsize=(10, 3))
fig, axs
# Cauchy Distribution
= cauchy.pdf(x)
cauchy_pdf 0].plot(x, cauchy_pdf)
axs[0].set_title("Cauchy")
axs[
# Gamma Distribution
= gamma.pdf(x, a=2)
gamma_pdf 1].plot(x, gamma_pdf)
axs[1].set_title("Gamma")
axs[
# Exponential Distribution
= expon.pdf(x)
exponential_pdf 2].plot(x, exponential_pdf)
axs[2].set_title("Exponential")
axs[
# Log Normal Distribution
= lognorm.pdf(x, s=0.7)
log_normal_pdf 3].plot(x, log_normal_pdf)
axs[3].set_title("Log Normal")
axs[
# Log Logistic Distribution
= loglaplace.pdf(x, c=0.5)
log_logistic_pdf 4].plot(x, log_logistic_pdf)
axs[4].set_title("Log Logistic")
axs[
# Adjust layout
plt.tight_layout()
# Display the plot
plt.show()
Fit Distribution
Dạng hàm phân phối
Phân phối bao gồm: Cauchy, Gamma, Exponential, Log-Logistic, Log-Normal Hàm mât đô xác suất (PDF) của phân phối Cauchy:
\[ f\left(x ; x_{0} ; \gamma\right)=\frac{1}{\pi \gamma\left[1+\left(\frac{x-x_{0}}{\gamma}\right)^{2}\right]}=\frac{1}{\pi \gamma}\left[\frac{\gamma^{2}}{\left(x-x_{0}\right)^{2}+\gamma^{2}}\right] \]
Trong đó:
- \(x_{0}\) : là thông số vị trí, chỉ định vị trí đỉnh của phân phối
- \(\gamma\) : là thông số tỷ lệ chỉ định nửa chiều rộng ở nửa tối đa (HWHM), \(2 \gamma\) là toàn bộ chiều rộng ở mức tối đa một nửa (FWHM). \(\gamma\) là một nửa phạm vi liên phần và đôi khi được gọi là lỗi có thể xảy ra
Hàm mât đô xác suất (PDF) của phân phối Gamma:
\[ f(x ; k ; \theta)=\frac{x^{k-1} e^{-\frac{x}{\theta}}}{\theta^{k} \Gamma(k)}, \text { for } x>0 \text { and } k, \theta>0 \]
- \(k\) : là tham số hình dạng
- \(\theta\) : là tham số tỷ lệ
- \(x\) : là biến ngẫu nhiên
- \(\Gamma(k)\) : là hàm gamma được đánh giá tại \(\mathrm{k}\)
Hàm mật đô xác suất (PDF) của phân phối Exponential:
\[ f(x ; \lambda)=\left\{\begin{array}{rr} 0, & x<0 \\ \lambda e^{-\lambda x}, & x \geq 0 \end{array}\right. \]
- \(\lambda\) : là tham số của phân phối, thường được gọi là tham số tỷ lệ.
- \(x\) : là biến ngẫu nhiên
Hàm mật độ xác suất (PDF) của phân phối Log-Logistic:
\[ f(x ; \alpha ; \beta)=\frac{\left(\frac{\beta}{\alpha}\right)\left(\frac{x}{\alpha}\right)^{\beta-1}}{\left(1+\left(\frac{x}{\alpha}\right)^{\beta}\right)^{2}}, \text { where } x>0, \alpha>0, \beta>0 \]
- \(\alpha\) : là tham số tỷ lệ và là giá trị trung bình của phân phối
- \(\beta\) : là tham số hình dạng
Hàm mât độ xác suất (PDF) của phân phối Lognormal \(\left(\mu, \sigma^{2}\right)\) :
\[ f_{x}(x)=\frac{d}{d x} \operatorname{Pr}(X \leq x)=\frac{1}{x \sigma \sqrt{2 \pi}} e^{\left(-\frac{(\ln x-\mu)^{2}}{2 \sigma^{2}}\right)}, \text { where } x>0, \sigma>0 \]
- \(x\) là biến ngẫu nhiên
- Một biến ngẫu nhiên \(X\) tuân theo phân phối Log-normal \(\left(X \sim \operatorname{Lognormal}\left(\mu_{x}, \sigma_{x}^{2}\right)\right)\) nếu \(\ln (X)\) tuân theo phân phối chuẩn với giá trị trung bình là \(\mu\) và phương sai \(\sigma^{2}\)
Để lựa chọn phân phối phù hợp cho từng phân nhóm, Nhóm mô hình lựa chọn mô hình có SSE (Sum of square error - tổng của sai số bình phương) nhỏ nhất với SSE được tính theo công thức sau:
\[ \left.S S E=\sum \text { (Giá trị thực tế - Giá trị được tính ra từ mô hình phân phối }\right)^{2} \]
Cuối cùng, từ mô hình phân phối vừa được lựa chọn (là mô hình có SSE nhỏ nhất).
Python Example
import numpy as np
import pandas as pd
import scipy
from scipy import stats
import scipy.optimize as optimize
1. Các hàm liên quan
1.1 Hàm phân phối gamma
class opt_gamma:
def __init__(self, actual_pd):
self.actual_pd = actual_pd
self.x_input = range(1, len(actual_pd) + 1)
def func(self, x, scale, a, b):
= stats.gamma.pdf(x, a = a, scale = b) * scale
predict return predict
def sse(self, params, xobs, yobs):
= self.func(xobs, *params)
ynew = np.sum((ynew - yobs) ** 2)
mse return mse
def solver(self):
= [1,1,1]
p0 = [(0.0001, 2), (0.0001, 10), (0.0001, 10)]
bounds = scipy.optimize.minimize(self.sse, p0, args=(self.x_input, self.actual_pd), bounds= bounds)
res # res = scipy.optimize.minimize(self.sse, p0, args=(self.x_input, self.actual_pd), method='Nelder-Mead')
return res
def predict(self, t=30):
= self.solver()
res = self.func(range(1, t+1), *res.x)
ypred return ypred
1.2 Hàm Phân phối mũ Exponential
class opt_exponential:
def __init__(self, actual_pd):
self.actual_pd = actual_pd
self.x_input = range(1, len(actual_pd) + 1)
def func(self, x, scale, a):
# change function here
= stats.expon.pdf(x, scale = 1/a) * scale
pred return pred
def sse(self, params, xobs, yobs):
= self.func(xobs, *params)
ynew = np.sum((ynew - yobs) ** 2)
mse return mse
def solver(self):
# change initial guess here
= [1,2]
p0 = [(0.0001, 2), (0.0001, 0.5)]
bounds = scipy.optimize.minimize(self.sse, p0, args=(self.x_input, self.actual_pd), bounds = bounds)
res return res
def predict(self, t=30):
= self.solver()
res = self.func(range(1, t+1), *res.x)
ypred return ypred
1.3 Hàm phân phối Cauchy
class opt_cauchy:
def __init__(self, actual_pd):
self.actual_pd = actual_pd
self.x_input = range(1, len(actual_pd) + 1)
def func(self, x, scale, a, b):
# change function here
= stats.cauchy.pdf(x, loc = b, scale = a) * scale
predict return predict
def sse(self, params, xobs, yobs):
= self.func(xobs, *params)
ynew = np.sum((ynew - yobs) ** 2)
mse return mse
def solver(self):
# change initial guess here
= [1,1,1]
p0 = [(0.0001, 2), (0.0001, 15), (-30, 30)]
bounds = scipy.optimize.minimize(self.sse, p0, args=(self.x_input, self.actual_pd), bounds = bounds)
res return res
def predict(self, t=30):
= self.solver()
res = self.func(range(1, t+1), *res.x)
ypred return ypred
1.4 Hàm phân phối log logistic
class opt_log_logistic:
def __init__(self, actual_pd):
self.actual_pd = actual_pd
self.x_input = range(1, len(actual_pd) + 1)
def func(self, x, scale, a, b):
# change function here
= scale*(a/b)*(x/b)**(a-1)/(1+(x/b)**a)**2
predict return predict
def sse(self, params, xobs, yobs):
= self.func(xobs, *params)
ynew = np.sum((ynew - yobs) ** 2)
mse return mse
def solver(self):
# change initial guess here
= [2,3,1]
p0 = [(0.0001, 2), (0.0001, 10), (0.0001, 10)]
bounds = scipy.optimize.minimize(self.sse, p0, args=(self.x_input, self.actual_pd), bounds=bounds)
res return res
def predict(self, t=30):
= self.solver()
res = self.func(range(1, t+1), *res.x)
ypred return ypred
1.5 Hàm phân phối log normal
class opt_log_normal:
def __init__(self, actual_pd):
self.actual_pd = actual_pd
self.x_input = range(1, len(actual_pd) + 1)
def func(self, x, scale, a, b):
# change function here
= (np.exp(-(np.log(x) - a)**2 / (2 * b**2)) / (x * b * np.sqrt(2 * np.pi)))*scale
predict return predict
def sse(self, params, xobs, yobs):
= self.func(xobs, *params)
ynew = np.sum((ynew - yobs) ** 2)
mse return mse
def solver(self):
# change initial guess here
= [1,1,1]
p0 = [(0.0001, 2), (0.0001, 0.5), (0.0001, 0.5)]
bounds = scipy.optimize.minimize(self.sse, p0, args=(self.x_input, self.actual_pd), bounds=bounds)
res return res
def predict(self, t=30):
= self.solver()
res = self.func(range(1, t+1), *res.x)
ypred return ypred
2. Fit models
2.1 Đặt giá trị initial
Gamma = 1,1,1
Cauchy = 1,1,1
Expo = 1,2
Log Logistic = 2,3,1
Log Normal = 1,1,1
2.2 Đọc dữ liệu
= pd.read_excel('results/Cohort Analysis.xlsb', engine='pyxlsb', sheet_name='1.2 Visualize', usecols = 'O:Z')
data data.tail()
2.3 Fit models
def fn_export_by_segment(segment, period):
# segment = segment.upper()
= data[data.Segment_Group.isin([segment])]
actual_pd = actual_pd.iloc[0, 2:]
actual_pd = actual_pd.values
actual_pd
= opt_gamma(actual_pd)
res_gamma = opt_exponential(actual_pd)
res_exponential = opt_cauchy(actual_pd)
res_cauchy = opt_log_logistic(actual_pd)
res_log_logistic = opt_log_normal(actual_pd)
res_log_normal
= pd.DataFrame({
params 'Segment': segment,
'Distribution': ['Gamma', 'Exponential', 'Cauchy', 'Log-Logistic', 'Log-Normal'],
'SSE': [res_gamma.solver().fun, res_exponential.solver().fun, res_cauchy.solver().fun, res_log_logistic.solver().fun, res_log_normal.solver().fun],
'params': [res_gamma.solver().x, res_exponential.solver().x, res_cauchy.solver().x, res_log_logistic.solver().x, res_log_normal.solver().x]
})
'scale', 'a', 'b']] = pd.DataFrame(params.params.to_list())
params[[del params['params']
= {
dict_predict 'Segment': segment,
'Period': range(1, period+1),
'Gamma': res_gamma.predict(period),
'Exponential': res_exponential.predict(period),
'Cauchy': res_cauchy.predict(period),
'Log-Logistic': res_log_logistic.predict(period),
'Log-Normal': res_log_normal.predict(period)
}
= params.sort_values('SSE').head(1)
best_distribution 'Tag best distribution'] = 'Best distribution'
best_distribution[
print('Best distribution of ' + segment + ' is '+ best_distribution['Distribution'].item())
return params, best_distribution, pd.DataFrame(dict_predict)
= []
pred_best_fit = []
params_frame = []
pred_all_curve for seg in data.Segment_Group:
= fn_export_by_segment(seg, 30)
params, df_best_fit, df_all
pred_best_fit.append(df_best_fit)
params_frame.append(params)
pred_all_curve.append(df_all)
= pd.concat(pred_best_fit, axis=0)
pred_best_fit = pd.concat(params_frame, axis=0)
params_frame = pd.concat(pred_all_curve, axis=0) pred_all_curve
= pd.melt(pred_all_curve,
pivot_pred_all_curve =['Segment', 'Period'],
id_vars=['Gamma', 'Exponential', 'Cauchy', 'Log-Logistic', 'Log-Normal'],
value_vars= 'Distribution').pivot(
var_name = ['Segment', 'Distribution'], columns='Period'
index
)=True, drop=False)
pivot_pred_all_curve.reset_index(inplace= ['Segment', 'Distribution', *range(1,31)]
pivot_pred_all_curve.columns = params_frame.merge(pivot_pred_all_curve, how='inner', on=['Segment', 'Distribution']).merge(
pivot_pred_all_curve 'Segment', 'Distribution', 'Tag best distribution']], how='left', on=['Segment', 'Distribution']
pred_best_fit[[ )
Xuất OutPut ra Excel
=True, drop=True)
pred_best_fit.reset_index(inplace=True, drop=True)
params_frame.reset_index(inplace=True, drop=True)
pivot_pred_all_curve.reset_index(inplacewith pd.ExcelWriter('cohort_fitted.xlsx') as writer:
='pred_best_fit')
pred_best_fit.to_excel(writer, sheet_name='params_frame')
params_frame.to_excel(writer, sheet_name='pivot_pred_all_curve') pivot_pred_all_curve.to_excel(writer, sheet_name