import backtrader as bt
import backtrader.feeds as btfeeds
import datetime
import pandas as pd
import numpy as np # 用于生成更真实的数据
# ----------------------------------------------------
# 1. 定义策略 (保持不变)
# ----------------------------------------------------
class SMACross(bt.Strategy):
params = (
('fast_period', 10),
('slow_period', 30),
)
def __init__(self):
self.dataclose = self.datas[0].close
self.order = None
# 指标线
self.sma_fast = bt.indicators.SimpleMovingAverage(self.datas[0], period=self.p.fast_period)
self.sma_slow = bt.indicators.SimpleMovingAverage(self.datas[0], period=self.p.slow_period)
self.crossover = bt.indicators.CrossOver(self.sma_fast, self.sma_slow)
# 简化 next 和 notify_order 方法以专注于回测核心
def next(self):
if self.order: return
if not self.position:
if self.crossover > 0: self.order = self.buy()
else:
if self.crossover < 0: self.order = self.sell()
def notify_order(self, order):
if order.status in [order.Completed]:
# print(f'订单执行: {"买入" if order.isbuy() else "卖出"}, 价格: {order.executed.price:.2f}')
pass
self.order = None
def notify_trade(self, trade):
# 打印交易信息 (可选)
if trade.isclosed:
print(f'交易已平仓. 毛利润: {trade.pnl:.2f}, 净利润: {trade.pnlcomm:.2f}')
# ----------------------------------------------------
# 2. 准备模拟数据
# ----------------------------------------------------
def generate_simulated_data():
dates = pd.date_range(start='2020-01-01', periods=200)
# 模拟价格随机游走
base_price = 100
price_changes = np.random.randn(200).cumsum() * 0.5
close_prices = base_price + price_changes
data = {
# 注意:这里我们使用小写的列名
'open': close_prices - np.random.rand(200) * 0.5,
'high': close_prices + np.random.rand(200) * 0.5,
'low': close_prices - np.random.rand(200) * 1.0,
'close': close_prices,
'volume': np.random.randint(50000, 150000, 200)
}
# 将日期设置为索引
data_df = pd.DataFrame(data, index=dates)
# **关键修正 1:** 将索引名称设为 backtrader 兼容的 'datetime'
data_df.index.name = 'datetime'
print("已生成模拟数据 (200 天).")
return data_df
# ----------------------------------------------------
# ----------------------------------------------------
def run_backtest_with_simulated_data():
data_df = generate_simulated_data()
cerebro = bt.Cerebro()
cerebro.addstrategy(SMACross)
# ** 使用 PandasData 加载数据 **
data = btfeeds.PandasData(
dataname=data_df,
# 确保 OHLCV 列名与 DataFrame 的列名一致(这里是小写)
open='open',
high='high',
low='low',
close='close',
volume='volume',
openinterest=-1
)
cerebro.adddata(data)
# 资金和佣金设置
cerebro.broker.setcash(100000.0)
cerebro.broker.setcommission(commission=0.001)
# 添加一个分析器来计算总收益
cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
print(f'起始资金: {cerebro.broker.getvalue():.2f}')
# ** 运行回测 **
strategies = cerebro.run()
# 打印最终结果
final_value = cerebro.broker.getvalue()
returns = strategies[0].analyzers.returns.get_analysis()
print("-" * 30)
print(f'最终资金: {final_value:.2f}')
print(f'总收益 (百分比): {returns["rtot"] * 100:.2f}%')
print("-" * 30)
return cerebro, strategies
cerebro, _ = run_backtest_with_simulated_data()