如何用mclapply [parallelized]替换Quantstrat'for loop'?

我想并行化Quantstrat。我的代码并不完全像这样,但这可以说明问题。我相信的问题是.blotter env初始化为指针内存地址,而我无法初始化new.env()的数组/矩阵。

我想做的是用mclapply替换for循环,这样我可以运行具有不同日期/符号的多个applyStrategies(此处仅显示不同的符号)。我的最终目标是一个beowulf群集(makecluster),并计划在最多252个交易日(滚动窗口)中并行运行它们,并且每次迭代使用不同的符号(但我并不需要所有这些。我只是问是否有一个可以使用mclapply的方式来分配投资组合和随后的.blotter内存对象的方法

#Load quantstrat in your R environment.

rm(list = ls())

local()

library(quantstrat) 
library(parallel)

# The search command lists all attached packages.
search()

symbolstring1 <- c('qqQ','GOOG')
#symbolstring <- c('qqQ','GOOG')

#for(i in 1:length(symbolstring1))
  mlapply(symbolstring1,function(symbolstring)
{
  #local()
  #i=2
  #symbolstring=as.character(symbolstring1[i])
  
  .blotter <- new.env()
  .strategy <- new.env()
  
  try(rm.strat(strategyName),silent=TRUE)
  try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
  for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
  print(symbolstring)

currency('USD')

stock(symbolstring,currency='USD',multiplier=1)

# Currency and trading instrument objects stored in the 
# .instrument environment

print("FI")
ls(envir=FinancialInstrument:::.instrument)

# blotter functions used for instrument initialization 
# quantstrat creates a private storage area called .strategy

ls(all=T)

# The initDate should be lower than the startDate. The initDate will be used later while initializing the strategy.

initDate <- '2010-01-01'

startDate <- '2011-01-01'

endDate <- '2019-08-10'

init_equity <- 50000

# Set UTC TIME

Sys.setenv(TZ="UTC")

getSymbols(symbolstring,from=startDate,to=endDate,adjust=TRUE,src='yahoo')

# Define names for portfolio,account and strategy. 

#portfolioName <- accountName <- strategyName <- "FirstPortfolio"
portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",symbolstring)

print(portfolioName)
# The function rm.strat removes any strategy,portfolio,account,or order book object with the given name. This is important

#rm.strat(strategyName)

print("port")
initPortf(name = portfolioName,symbols = symbolstring,initDate = initDate)

initacct(name = accountName,portfolios = portfolioName,initDate = initDate,initEq = init_equity)

initOrders(portfolio = portfolioName,initDate = initDate)



# name: the string name of the strategy

# assets: optional list of assets to apply the strategy to.  

# Normally these are defined in the portfolio object

# contstrains: optional portfolio constraints

# store: can be True or False. If True store the strategy in the environment. Default is False
print("strat")
strategy(strategyName,store = TRUE)

ls(all=T)

# .blotter holds the portfolio and account object 

ls(.blotter)

# .strategy holds the orderbook and strategy object

print(ls(.strategy))

print("ind")
add.indicator(strategy = strategyName,name = "EMA",arguments = list(x = quote(Cl(mktdata)),n = 10),label = "nFast")

add.indicator(strategy = strategyName,n = 30),label = "nSlow")

# Add long signal when the fast EMA crosses over slow EMA.

print("sig")
add.signal(strategy = strategyName,name="sigCrossover",arguments = list(columns = c("nFast","nSlow"),relationship = "gte"),label = "longSignal")

# Add short signal when the fast EMA goes below slow EMA.

add.signal(strategy = strategyName,name = "sigCrossover",relationship = "lt"),label = "shortSignal")

# go long when 10-period EMA (nFast) >= 30-period EMA (nSlow)

print("rul")
add.rule(strategyName,name= "ruleSignal",arguments=list(sigcol="longSignal",sigval=TRUE,orderqty=100,ordertype="market",orderside="long",replace = TRUE,TxnFees = -10),type="enter",label="EnterLong") 

# go short when 10-period EMA (nFast) < 30-period EMA (nSlow)

add.rule(strategyName,name = "ruleSignal",arguments = list(sigcol = "shortSignal",sigval = TRUE,orderside = "short",ordertype = "market",orderqty = -100,TxnFees = -10,replace = TRUE),type = "enter",label = "EnterShort")

# Close long positions when the shortSignal column is True

add.rule(strategyName,orderside = "long",orderqty = "all",type = "exit",label = "ExitLong")

# Close Short positions when the longSignal column is True

add.rule(strategyName,arguments = list(sigcol = "longSignal",label = "ExitShort")

print("summary")
summary(getStrategy(strategyName))

# Summary results are produced below

print("results")
results <- applyStrategy(strategy= strategyName,symbols=symbolstring)

# The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below

getTxns(Portfolio=portfolioName,Symbol=symbolstring)

mktdata

updatePortf(portfolioName)

dateRange <- time(getPortfolio(portfolioName)$summary)[-1]

updateacct(portfolioName,dateRange)

updateEndEq(accountName)

print(plot(tail(getaccount(portfolioName)$summary$End.Eq,-1),main = "Portfolio Equity"))

#cleanup
for (name in symbolstring) rm(list = name)
#rm(.blotter)
rm(.stoploss)
rm(.txnfees)
#rm(.strategy)
rm(symbols)

}
)

但是抛出一个错误 get(symbol,envir = envir)中的错误:找不到对象'qqQ'

具体来说,问题是FinancialInstrument :::。instrument指向的内存地址未使用我的封装变量调用(符号字符串)更新

iCMS 回答:如何用mclapply [parallelized]替换Quantstrat'for loop'?

apply.paramset中的

quantstrat已经使用foreach构造来并行执行applyStrategy

apply.paramset需要做大量工作,以确保工作人员可以使用环境进行工作,并收集适当的结果以将其发送回调用过程。

最简单的操作可能是使用apply.paramset。设置日期和符号参数,并使该功能正常运行。

或者,我建议您查看在foreach中使用并行apply.paramset构造将其修改为建议的情况所需的步骤。

还要注意,您的问题询问有关使用Beowulf群集和mclapply的问题。这行不通。 mclapply仅在单个内存空间中工作。 Beowulf群集通常不共享单个内存和进程空间。它们通常通过并行库(如MPI)分发作业。通过使用apply.paramset的{​​{1}}后端,doMPI已经可以在Beowulf群集上进行分发。这就是我们使用foreach的原因之一:众多可用的并行后端。 foreach的{​​{1}}后端实际上在幕后使用doMC

,

我相信这会使代码并行化。我已经交换了指标和符号,但是使用了不同的符号和日期的逻辑就在那里

基本上我添加了

Dates=paste0(startDate,"::",endDate)

rm(list = ls())

library(lubridate)
library(parallel)

autoregressor1  = function(x){
  if(NROW(x)<12){ result = NA} else{
    y = Vo(x)*Ad(x)
    #y = ROC(Ad(x))
    y = ROC(y)
    y = na.omit(y)
    step1 = ar.yw(y)
    step2 = predict(step1,newdata=y,n.ahead=1)
    step3 = step2$pred[1]+1
    step4 = (step3*last(Ad(x))) - last(Ad(x))
    
    result = step4
  }
  return(result)
}

autoregressor = function(x){
  ans = rollapply(x,26,FUN = autoregressor1,by.column=FALSE)
  return (ans)}

########################indicators#############################

library(quantstrat) 
library(future.apply)
library(scorecard)

reset_quantstrat <- function() {
  if (! exists(".strategy")) .strategy <<- new.env(parent = .GlobalEnv)
  if (! exists(".blotter")) .blotter <<- new.env(parent = .GlobalEnv)
  if (! exists(".audit")) .audit <<- new.env(parent = .GlobalEnv)
  suppressWarnings(rm(list = ls(.strategy),pos = .strategy))
  suppressWarnings(rm(list = ls(.blotter),pos = .blotter))
  suppressWarnings(rm(list = ls(.audit),pos = .audit))
  FinancialInstrument::currency("USD")
}

reset_quantstrat()

initDate <- '2010-01-01'

endDate <- as.Date(Sys.Date())
startDate <- endDate %m-% years(3)

symbolstring1 <- c('SSO','GOLD')

getSymbols(symbolstring1,from=startDate,to=endDate,adjust=TRUE,src='yahoo')

#symbolstring1 <- c('SP500TR','GOOG')

.orderqty <- 1
.txnfees <- 0

#random <- sample(1:2,2,replace=FALSE)

random <- (1:2)

equity <- lapply(random,function(x)
{#x=1
  try(rm("account.Snazzy","portfolio.Snazzy",pos=.GlobalEnv$.blotter),silent=TRUE)
  rm(.blotter)
  rm(.strategy)
  portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",x+2)
  #endDate <- as.Date(Sys.Date())
  startDate <- endDate %m-% years(1+x)
 
  #Load quantstrat in your R environment.
  reset_quantstrat()
  
  # The search command lists all attached packages.
  search()

  symbolstring=as.character(symbolstring1[x])
  print(symbolstring)
  
  try(rm.strat(strategyName),silent=TRUE)
  try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
  for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
  print(symbolstring)
  
  currency('USD')
  
  stock(symbolstring,currency='USD',multiplier=1)
  
  # Currency and trading instrument objects stored in the 
  # .instrument environment
  
  print("FI")
  ls(envir=FinancialInstrument:::.instrument)
  
  # blotter functions used for instrument initialization 
  # quantstrat creates a private storage area called .strategy
  
  ls(all=T)
  
  init_equity <- 10000
  
  Sys.setenv(TZ="UTC")
  
  print(portfolioName)
 
  print("port")

  try(initPortf(name = portfolioName,symbols = symbolstring,initDate = initDate))
  
 
  try(initAcct(name = accountName,portfolios = portfolioName,initDate = initDate,initEq = init_equity))
  
  try(initOrders(portfolio = portfolioName,initDate = initDate))
  
  # name: the string name of the strategy
  
  # assets: optional list of assets to apply the strategy to.  
  
  # Normally these are defined in the portfolio object
  
  # contstrains: optional portfolio constraints
  
  # store: can be True or False. If True store the strategy in the environment. Default is False
  print("strat")
  strategy(strategyName,store = TRUE)
  
  ls(all=T)
  
  # .blotter holds the portfolio and account object 
  
  ls(.blotter)
  
  # .strategy holds the orderbook and strategy object
  
  print(ls(.strategy))
  
  print("ind")
  #ARIMA
    
    add.indicator(
      strategy  =   strategyName,name      =   "autoregressor",arguments =   list(
        x       =   quote(mktdata)),label     =   "arspread")
    
    ################################################ Signals #############################
    
    add.signal(
      strategy          = strategyName,name              = "sigThreshold",arguments         = list(
        threshold       = 0.25,column          = "arspread",relationship    = "gte",cross           = TRUE),label             = "Selltime")
    
    add.signal(
      strategy          = strategyName,arguments         = list(
        threshold       = 0.1,relationship    = "lt",label             = "cashtime")
    
    add.signal(
      strategy          = strategyName,arguments         = list(
        threshold       = -0.1,relationship    = "gt",arguments         = list(
        threshold       = -0.25,relationship    = "lte",label             = "Buytime")
    
    ######################################## Rules #################################################
    
    #Entry Rule Long
    add.rule(strategyName,name               =   "ruleSignal",arguments          =   list(
               sigcol           =   "Buytime",sigval           =   TRUE,orderqty     =   .orderqty,ordertype        =   "market",orderside        =   "long",pricemethod      =   "market",replace          =   TRUE,TxnFees              =   -.txnfees
               #,#osFUN               =   osMaxPos
             ),type               =   "enter",path.dep           =   TRUE,label              =   "Entry")
    
    #Entry Rule Short
    
    add.rule(strategyName,name           =   "ruleSignal",arguments          =   list(
               sigcol           =   "Selltime",orderside        =   "short",label              =   "Entry")
    
    #Exit Rules
    
  print("summary")
  summary(getStrategy(strategyName))
  
  # Summary results are produced below
  
  print("results")
  
  results <- applyStrategy(strategy= strategyName,portfolios = portfolioName)
  
  # The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below
  
  getTxns(Portfolio=portfolioName,Symbol=symbolstring)
  
  mktdata
  
  updatePortf(portfolioName,Dates=paste0(startDate,endDate))
  
  dateRange <- time(getPortfolio(portfolioName)$summary)
  
  updateAcct(portfolioName,dateRange[which(dateRange >= startDate & dateRange <= endDate)])
  
  updateEndEq(accountName,endDate))
  
  print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1),main = symbolstring))
  
  tStats <- tradeStats(Portfolios = portfolioName,use="trades",inclZeroDays=FALSE,endDate))
  
  final_acct <- getAccount(portfolioName)
  
  #final_acct
  #View(final_acct)
  
  options(width=70)
  
  print(plot(tail(final_acct$summary$End.Eq,main = symbolstring))
  #dev.off()
  
  tail(final_acct$summary$End.Eq)
  
  rets <- PortfReturns(Account = accountName)
  
  #rownames(rets) <- NULL
  
  tab.perf <- table.Arbitrary(rets,metrics=c(
                                "Return.cumulative","Return.annualized","SharpeRatio.annualized","CalmarRatio"),metricsNames=c(
                                "Cumulative Return","Annualized Return","Annualized Sharpe Ratio","Calmar Ratio"))
  tab.perf
  
  tab.risk <- table.Arbitrary(rets,metrics=c(
                                "StdDev.annualized","maxDrawdown"
                              ),metricsNames=c(
                                "Annualized StdDev","Max DrawDown"))
  
  tab.risk
  
  return (as.numeric(tail(final_acct$summary$End.Eq,1))-init_equity)

  #reset_quantstrat()
  
}
)

它似乎是并行的,但不能正确更新init_equity

本文链接:https://www.f2er.com/1739385.html

大家都在问