|
- import numpy as np
- import pandas as pd
- from easy_forecast.config.naming_specification import DATETIME_STAMP, OBJ_NO, Y, Y_HAT, Y_HAT_LOWER, Y_HAT_UPPER, BT_DP_PARAMS, \
- BT_MP_PARAMS
- from easy_forecast.log.global_logger import log, func_log
- from .basic_acc_metric import BasicAccMetric
- from easy_forecast.validation.model_selection_strategies import ModelSelectionStrategies
- from easy_forecast.batch.simple_data_process_strategies import SimpleBatchDataProcess
-
-
- class BackValidation(object):
- OUTPUT_COLS = [DATETIME_STAMP, Y, Y_HAT, OBJ_NO]
-
- def __init__(self, df, data_process_class, forecast_strategies_class, cutoff_date, forecast_freq, df_x=None,
- test_period_n=1, **kwargs):
- """
- :param df: 主数据,包含预测y
- :param data_process_class: 数据预处理
- :param forecast_strategies_class: 预测策略
- :param cutoff_date:
- :param forecast_freq:
- :param df_x:
- :param test_period_n:
- :param kwargs: 接受需要定义的其他参数
- """
- self._df = df
- self._df_x = df_x
- self._data_process_class = data_process_class
- self._forecast_strategies_class = forecast_strategies_class
- self._cutoff_date = cutoff_date
- self._test_period_n = test_period_n
- self._forecast_freq = forecast_freq
- self._kwargs_ds = kwargs[BT_DP_PARAMS] if BT_DP_PARAMS in kwargs.keys() else {}
- self._kwargs_ms = kwargs[BT_MP_PARAMS] if BT_MP_PARAMS in kwargs.keys() else {}
- self.__init_data()
-
- def __init_data(self):
- """
- 数据校验
- :return:
- """
- if self._df is None or self._df.shape[0] == 0:
- raise ValueError("data frame should not be none!")
- if not isinstance(self._df, pd.DataFrame):
- raise TypeError("data frame type should be pandas.DataFrame!")
-
- if DATETIME_STAMP not in self._df and OBJ_NO not in self._df:
- raise ValueError(
- "endogenous data frame must have column {0} which will be "
- "used to split data frame into a number of batches!".format(DATETIME_STAMP))
- if self._df_x is not None or self._df.shape[0] == 0:
- if DATETIME_STAMP not in self._df_x and OBJ_NO not in self._df_x:
- raise ValueError(
- "exogenous data frame must have column {0} which will be "
- "used to split data frame into a number of batches!".format(DATETIME_STAMP))
-
- # 回测长度要大于0
- if self._test_period_n <= 0:
- raise ValueError("length of testing periods should be greater than 0!")
-
- @func_log
- def back_testing(self, use_pool=True, loop=False):
- # 获取数数据预处理类、预测类
- batch_data_process = self._data_process_class
- batch_forecast_strategy = self._forecast_strategies_class
- lines = self._df
- if batch_data_process is not None:
- # 开始预处理
- log.info("数据预处理开始")
- raw_data_process = batch_data_process(lines, cutoff_date=self._cutoff_date, freq=self._forecast_freq,
- **self._kwargs_ds)
- lines = raw_data_process.compute(use_pool=use_pool)
- log.info("数据预处理结束")
- lines_x = None
- back_dates = np.sort(lines[DATETIME_STAMP].unique())[-self._test_period_n:]
- print(back_dates)
- rs_list = []
- log.info("回测建模开始")
- # loop 是否滚动预测
- if loop is True:
- # 如果是滚动预测,单步长滚动预测,滚动过程中会用到真实值
- for date_tmp in back_dates:
- lines_tmp = lines[lines[DATETIME_STAMP] < date_tmp]
- lines_tmp_x = None
- if lines_x is not None:
- lines_tmp_x = lines_x[lines_x[DATETIME_STAMP] < date_tmp]
- batch = batch_forecast_strategy(lines_tmp, df_x=lines_tmp_x,
- freq=self._forecast_freq,
- forecast_periods=1, **self._kwargs_ms)
- rs_tmp = batch.compute(use_pool=use_pool)
- rs_list.append(rs_tmp)
- if len(rs_list) <= 0:
- raise ValueError("result is wrong!")
- rs = rs_list[0]
- for i in range(1, len(rs_list)):
- rs_tmp = rs_list[i]
- rs_tmp = rs_tmp[rs_tmp[DATETIME_STAMP] == back_dates[i]]
- rs = rs.append(rs_tmp)
- cols = [OBJ_NO, DATETIME_STAMP]
- if Y in rs:
- rs = rs.drop([Y], axis=1)
- lines[DATETIME_STAMP] = pd.to_datetime(lines[DATETIME_STAMP])
- rs[DATETIME_STAMP] = pd.to_datetime(rs[DATETIME_STAMP])
- rs = pd.merge(rs, lines, left_on=cols, right_on=cols, how="left")
- else:
- # 如果是非滚动预测,则为一次性多步长预测
- cutoff_date = back_dates[0]
- lines_tmp = lines[lines[DATETIME_STAMP] < cutoff_date]
- lines_tmp_x = None
- if lines_x is not None:
- lines_tmp_x = lines_x[lines_x[DATETIME_STAMP] < cutoff_date]
- batch = batch_forecast_strategy(lines_tmp, df_x=lines_tmp_x,
- freq=self._forecast_freq,
- forecast_periods=self._test_period_n, **self._kwargs_ms)
- rs_tmp = batch.compute(use_pool=use_pool)
- if Y in rs_tmp:
- rs_tmp = rs_tmp.drop([Y], axis=1)
- cols = [OBJ_NO, DATETIME_STAMP]
- rs = pd.merge(rs_tmp, lines[[OBJ_NO, DATETIME_STAMP, Y]], left_on=cols, right_on=cols, how="left")
- log.info("回测建模结束")
- # 排序
- rs = rs.sort_values([OBJ_NO, DATETIME_STAMP], ascending=True)
- # return rs[BackValidation.OUTPUT_COLS]
- return rs
-
-
- class CrossValidation(object):
- OUTPUT_COLS = [DATETIME_STAMP, Y, Y_HAT, OBJ_NO]
-
- def __init__(self, df, data_process_dict, forecast_strategy_dict, cutoff_date, ms_cutoff_date,
- forecast_freq, acc_metric, enable_softmax = False,
- df_x=None, ms_test_period_n=1):
- """
- :param df: 主数据,包含预测y
- :param data_process_dict: 数据预处理列表
- 形式如下:
- dp_dict = {
- 'dp_simple': {
- 'class_name': SimpleBatchDataProcess
- },
- 'dp_season': {
- 'class_name': dp_season_old.SimpleBatchDataProcess,
- 'md_list': ['strong_season_old']
- }
- }
- :param forecast_strategy_dict: 预测策略列表
- 形式如下:
- md_dict = {
- 'md_simple': {
- 'class_name': SimpleStrategies,
- 'enable_log': False
- },
- 'strong_season': {
- 'class_name': strategy_season.SimpleStrategies,
- 'cny_rec': False,
- 'sw_refer': w1
- }
- }
- :param acc_metric: 校验方法列表
- :param cutoff_date: 数据截止日期
- :param forecast_freq: 预测频率
- :param df_x: 额外数据
- :param test_period_n: 回测长度
- """
- self._ms_cutoff_date = ms_cutoff_date
- self._df = df
- self._df_x = df_x
- self._data_process_dict = data_process_dict
- self._forecast_strategy_dict = forecast_strategy_dict
- self._cutoff_date = cutoff_date
- self._ms_test_period_n = ms_test_period_n
- self._forecast_freq = forecast_freq
- self._enable_softmax = enable_softmax
- self._acc_metric = acc_metric
- self.__init_data()
-
- def __init_data(self):
- """
- 数据校验
- :return:
- """
- # 校验数据预处理、模型策略、准确率验证是否是字典类型
- if self._data_process_dict is None:
- raise ValueError("data_process_dict is none.")
- if self._forecast_strategy_dict is None:
- raise ValueError("forecast_strategy_dict is none.")
-
- if not isinstance(self._data_process_dict, dict):
- raise TypeError("data process strategy dict type is {}, should be a dictionary type!".format(
- type(self._data_process_dict)))
-
- if not isinstance(self._forecast_strategy_dict, dict):
- raise TypeError("forecast strategy dict type is {}, should be a dictionary type!".format(
- type(self._forecast_strategy_dict)))
-
- # 至少包含一个数据预处理和一个模型策略
- if len(self._data_process_dict.keys()) == 0:
- raise ValueError("at least one data process strategy is needed!")
- if len(self._forecast_strategy_dict.keys()) == 0:
- raise ValueError("at least one forecast strategy is needed!")
-
- @func_log
- def model_select(self, use_pool=True, loop=False):
- """
- 模型交叉验证
- :param use_pool:
- :param loop:
- :return: obj_no,预处理_模型名称1,准确率1[,准确率2,...] [预处理_模型名称1,...,]
- """
- # 定义结果
- # 遍历预处理和模型策略
- # 遍历预处理时,预处理可以指定服务于哪些model strategy
- obj_no_list = np.unique(self._df[OBJ_NO])
- total_rs = []
- # 遍历所有的预处理方法
- for dp_k in self._data_process_dict.keys():
- # 取出当前预处理配置
- dp_tmp = self._data_process_dict[dp_k]
- # 判断class_name是否在配置中
- if "class_name" not in dp_tmp.keys():
- log.warning("data process:{} do not be assigned class name.".format(dp_k))
- continue
- # 取出预处理类
- dp_class = dp_tmp['class_name']
- # 将class_name删除
- dp_tmp_param = dp_tmp.copy()
- dp_tmp_param.pop('class_name')
- md_list = self._forecast_strategy_dict.keys()
- # 判断是否定义与之匹配的模型列表,如果定义了则使用定义的列表
- if "md_list" in dp_tmp.keys():
- md_list = dp_tmp["md_list"]
- # 删除md_list键值,剩余的配置方便之后调用
- dp_tmp_param.pop('md_list')
- model_bt_res = pd.DataFrame()
- for md_tmp in md_list:
- # 判断ms_k是否存在
- if md_tmp not in self._forecast_strategy_dict.keys():
- log.warning("model strategy:{} do not be defined in strategy_dict.".format(md_tmp))
- continue
- # 取出预测策略
- ms_tmp = self._forecast_strategy_dict[md_tmp]
- if "class_name" not in ms_tmp.keys():
- log.warning("model strategy:{} do not be assigned class name.".format(md_tmp))
- continue
- # 取出预测策略类
- ms_class = ms_tmp['class_name']
- # 将class_name删除
- ms_tmp_param = ms_tmp.copy()
- ms_tmp_param.pop('class_name')
- other_params = {
- BT_DP_PARAMS: dp_tmp_param,
- BT_MP_PARAMS: ms_tmp_param
- }
- # 调用回测接口
- try:
- bt = BackValidation(df=self._df, data_process_class=dp_class, forecast_strategies_class=ms_class,
- cutoff_date=self._ms_cutoff_date, forecast_freq=self._forecast_freq,
- df_x=self._df_x,
- test_period_n=self._ms_test_period_n, **other_params)
- bt_rs = bt.back_testing(use_pool=use_pool, loop=loop)
- except Exception as e:
- log.error(e)
- continue
- # 取出预测结果
- bt_start = np.sort(bt_rs[DATETIME_STAMP].unique())[-self._ms_test_period_n]
- bt_rs = bt_rs[bt_rs[DATETIME_STAMP] >= bt_start]
-
- bt_rs['mod_n'] = md_tmp
- model_bt_res = pd.concat([model_bt_res, bt_rs])
- # 计算准确率
- for obj_no in obj_no_list:
- # obj_no、预处理、模型策略
- acc_rs = [obj_no, dp_k, md_tmp]
- lines_tmp = bt_rs[bt_rs[OBJ_NO] == obj_no]
- # 如果预测值为空
- if lines_tmp.shape[0] == 0:
- acc_rs = np.append(acc_rs, [None for i in range(len(self._acc_metric))]).tolist()
- else:
- acc = BasicAccMetric(lines_tmp[Y], lines_tmp[Y_HAT])
- for acc_type in self._acc_metric:
- acc_v = acc.compute(acc_type)
- acc_rs.append(acc_v)
- total_rs.append(acc_rs)
- # 定义列表头
- columns = [OBJ_NO, "dp_n", 'mod_n']
- # 添加预测评价指标
- for acc_type in self._acc_metric:
- columns.append(acc_type)
- log.debug("#####test total rs####")
- log.debug(total_rs)
- # 生成dataframe
- acc_df = pd.DataFrame(data=total_rs, columns=columns)
- model_bt_res = model_bt_res.loc[:, [OBJ_NO, DATETIME_STAMP, Y, Y_HAT, 'mod_n']]
- return acc_df, model_bt_res
-
- def forecast_cv(self, acc_metric, forecast_period, interval_width=0.8, use_pool=True, loop=False, top_k=1):
- """
- 通过交叉验证进行预测
- :param acc_metric: 准确率方法
- :param use_pool: 是否使用多进程并发
- :param loop: 是否单步长预测
- :param forecast_period: 预测长度
- :param interval_width: 置信区间
- :param top_k: 对前k个准确率最高的模型融合,最多允许3个组合
- :return:
- """
- # 预测长度检验
- if forecast_period < 1:
- raise ValueError("forecast period is {}, it should not be less than 1.".format(forecast_period))
- # 准确率评价方式
- if acc_metric not in self._acc_metric:
- raise ValueError("accuracy metric: {} is not in the validation accuracy metric list".format(acc_metric))
- # 数据处理字典
- dp_count = len(self._data_process_dict.keys())
- # 预测策略字典
- m_count = len(self._forecast_strategy_dict.keys())
- dp_m_cc = dp_count * m_count
- # 校验top k的值是否在正确的范围
- if top_k > dp_m_cc or top_k > 3:
- raise ValueError(
- "top k is {}, which is allowed at least 3 and less than (dp count * model count):{}".format(top_k,
- dp_m_cc))
- # 模型选择
- model_selection = self.model_select(use_pool=use_pool, loop=loop)
- model_selection = model_selection.dropna()
- if model_selection is None:
- raise ArithmeticError("no result from model selection, please check your operation!")
- model_selection['row_number'] = model_selection.groupby([OBJ_NO], as_index=False)[acc_metric].rank(
- ascending=True, method='first')
- model_selection = model_selection[model_selection['row_number'] <= top_k]
- model_selection = model_selection[[OBJ_NO, 'dp_n', 'mod_n']]
- log.debug("model_selection:{}".format(model_selection))
- # 调用模型选择策略,进行预测
- # 数据预处理
- dp = SimpleBatchDataProcess(self._df,self._cutoff_date,freq=self._forecast_freq)
- data_dp = dp.compute(use_pool=use_pool)
- mss = ModelSelectionStrategies(data_dp, freq=self._forecast_freq, data_process_dict=self._data_process_dict,
- forecast_strategy_dict=self._forecast_strategy_dict,
- model_selection=model_selection, df_x=self._df_x,
- forecast_periods=forecast_period, interval_width=interval_width)
- # 将预测结果求均值
- rs = mss.compute(use_pool=use_pool)
- rs = rs.replace(np.nan, 0)
- # rs = rs.drop(['dp_n', 'mod_n', ], axis=1)
- # rs = rs.groupby([OBJ_NO, DATETIME_STAMP, Y], as_index=False, sort=False).mean()
-
- rs = self.ensemble_model(rs, model_selection, acc_metric, enable_softmax=self._enable_softmax)
- return rs[[OBJ_NO, DATETIME_STAMP, Y, Y_HAT]]
-
- def fit(self, acc_metric, forecast_period, use_pool=True, loop=False, top_k=3):
- """
- 通过交叉验证进行预测
- :param acc_metric: 准确率方法
- :param use_pool: 是否使用多进程并发
- :param loop: 是否单步长预测
- :param forecast_period: 预测长度
- :param interval_width: 置信区间
- :param top_k: 对前k个准确率最高的模型融合,最多允许3个组合
- :return:
- """
- # 预测长度检验
- if forecast_period < 1:
- raise ValueError("forecast period is {}, it should not be less than 1.".format(forecast_period))
- # 准确率评价方式
- # if acc_metric not in self._acc_metric:
- # raise ValueError("accuracy metric: {} is not in the validation accuracy metric list".format(acc_metric))
- # 校验top k的值是否在正确的范围
- # 模型选择
- model_selection, model_res = self.model_select(use_pool=use_pool, loop=loop)
- model_selection = model_selection.dropna()
-
- model_selection['row_number'] = model_selection.groupby([OBJ_NO], as_index=False)[acc_metric].rank(
- ascending=True, method='first')
- model_selected = model_selection.loc[model_selection['row_number'] <= top_k, ['mod_n', 'obj_no', 'dp_n']]
- model_list = model_selected['mod_n'].unique()
- model_res = model_res.loc[model_res.mod_n.isin(model_list), :].copy()
- return model_res, model_selected
-
- def predict(self, acc_metric, forecast_period, model_selection: pd.DataFrame, interval_width=0.8, use_pool=True, top_k=1):
- """
- 通过交叉验证进行预测
- :param acc_metric: 准确率方法
- :param use_pool: 是否使用多进程并发
- :param loop: 是否单步长预测
- :param forecast_period: 预测长度
- :param interval_width: 置信区间
- :param top_k: 对前k个准确率最高的模型融合,最多允许3个组合
- :return:
- """
- # 预测长度检验
- if forecast_period < 1:
- raise ValueError("forecast period is {}, it should not be less than 1.".format(forecast_period))
-
- # 数据处理字典
- dp_count = len(self._data_process_dict.keys())
- # 预测策略字典
- m_count = len(self._forecast_strategy_dict.keys())
- dp_m_cc = dp_count * m_count
-
- # 模型选择
- if model_selection is None:
- raise ArithmeticError("no result from model selection, please check your operation!")
-
- model_selection = model_selection[[OBJ_NO, 'dp_n', 'mod_n']]
-
- # 调用模型选择策略,进行预测
- # 数据预处理
- dp = SimpleBatchDataProcess(self._df, self._cutoff_date, freq=self._forecast_freq)
- data_dp = dp.compute(use_pool=use_pool)
- mss = ModelSelectionStrategies(data_dp, freq=self._forecast_freq, data_process_dict=self._data_process_dict,
- forecast_strategy_dict=self._forecast_strategy_dict,
- model_selection=model_selection, df_x=self._df_x,
- forecast_periods=forecast_period, interval_width=interval_width)
- # 将预测结果求均值
- rs = mss.compute(use_pool=use_pool)
- rs = rs.replace(np.nan, 0)
- return rs[[OBJ_NO, DATETIME_STAMP, Y_HAT, 'mod_n']]
-
- def backtest_cv(self, acc_metric, bt_period, interval_width=0.8, use_pool=True, loop=False, top_k=1):
- """
- 通过交叉验证进行预测
- :param bt_period: 回测时长
- :param acc_metric: 准确率方法
- :param use_pool: 是否使用多进程并发
- :param loop: 是否单步长预测
- :param interval_width: 置信区间
- :param top_k: 对前k个准确率最高的模型融合,最多允许3个组合
- :return:
- """
- if bt_period < 1:
- raise ValueError("forecast period is {}, it should not be less than 1.".format(bt_period))
- if acc_metric not in self._acc_metric:
- raise ValueError("accuracy metric: {} is not in the validation accuracy metric list".format(acc_metric))
- dp_count = len(self._data_process_dict.keys())
- m_count = len(self._forecast_strategy_dict.keys())
- dp_m_cc = dp_count * m_count
- if top_k > dp_m_cc or top_k > 3:
- raise ValueError(
- "top k is {}, which is allowed at least 3 and less than (dp count * model count):{}".format(top_k,
- dp_m_cc))
- # 选择模型
- model_selection = self.model_select(use_pool=use_pool, loop=loop)
- log.debug("#######model selection all info:{}#########".format(model_selection))
- model_selection = model_selection.dropna()
- if model_selection is None:
- raise ArithmeticError("no result from model selection, please check your operation!")
- model_selection['row_number'] = model_selection.groupby([OBJ_NO], as_index=False)[acc_metric].rank(
- ascending=True, method='first')
- model_selection = model_selection[model_selection['row_number'] <= top_k]
- model_selection = model_selection[[OBJ_NO, 'dp_n', 'mod_n']]
- # log.debug("model_selection:{}".format(model_selection))
- # 使用上述的模型选择结果进行回测
- other_params = {
- BT_MP_PARAMS: {
- 'data_process_dict': self._data_process_dict,
- 'forecast_strategy_dict': self._forecast_strategy_dict,
- 'model_selection': model_selection
- }
- }
- bt = BackValidation(df=self._df, data_process_class=SimpleBatchDataProcess,
- forecast_strategies_class=ModelSelectionStrategies,
- cutoff_date=self._cutoff_date, forecast_freq=self._forecast_freq, df_x=self._df_x,
- test_period_n=bt_period, interval_width=interval_width, **other_params)
- bt_rs = bt.back_testing(use_pool=use_pool, loop=loop)
- # 将预测结果求均值
- rs = bt_rs.replace(np.nan, 0)
- rs = rs.drop(['dp_n', 'mod_n', ], axis=1)
- # print(rs)
- rs = rs.groupby([OBJ_NO, DATETIME_STAMP, Y], as_index=False, sort=False).mean()
- return rs[[OBJ_NO, DATETIME_STAMP, Y, Y_HAT]]
-
- def ensemble_model(self, fcst_res, model_selection_res, acc_metric, enable_softmax=False):
- """
- 模型结果融合
- """
- model_weight = model_selection_res.groupby([OBJ_NO], as_index=False).\
- apply(self.get_model_weight, acc_metric, enable_softmax)
- fcst_res = pd.merge(fcst_res, model_weight[[OBJ_NO, 'mod_n', 'weight']], on=[OBJ_NO, 'mod_n'], how='left')
- fcst_res['weight'].fillna(1.0, inplace=True)
- fcst_res = fcst_res.groupby([OBJ_NO, DATETIME_STAMP, Y], as_index=False, sort=False).apply(self.combine_yhat)
- return fcst_res
-
- # TODO 待使用softmax权重
- def get_model_weight(self, df, acc_metric, enable_softmax):
- df.loc[:, acc_metric] = 1 / (df[acc_metric] + 1)
- if enable_softmax:
- df[acc_metric] = df[acc_metric] - df[acc_metric].max()
- df['acc_metric_exp'] = np.exp(df[acc_metric])
- df['weight'] = df['acc_metric_exp'] / df['acc_metric_exp'].sum()
- else:
- weight_sum = df[acc_metric].sum()
- if weight_sum > 0:
- df['weight'] = df[acc_metric] / weight_sum
- else:
- df['weight'] = 1 / len(df)
- return df[[OBJ_NO, 'mod_n', 'weight']]
-
- def combine_yhat(self, df):
- weight_yhat = (df[Y_HAT] * df['weight']).sum()
- return pd.Series({Y_HAT: weight_yhat})
-
-
|