Comment remplacer Quantstrat 'for loop' par mclapply [parallélisé]?

Aug 16 2020

Je voudrais paralléliser quantstrat. Mon code n'est pas exactement comme ça, mais cela montre le problème. Le problème que je crois est que le .blotter env est initialisé à une adresse mémoire de pointeur et je suis incapable d'initialiser un tableau/matrice de new.env().

Ce que je voudrais faire, c'est remplacer la boucle for par un mclapply afin de pouvoir exécuter plusieurs applyStrategies avec des dates/symboles variables (seuls des symboles variables sont affichés ici). Mon objectif final est un cluster beowulf (makeCluster) et je prévois de les exécuter en parallèle en utilisant jusqu'à 252 jours de bourse (fenêtre glissante) avec des symboles variables par itération (mais je n'ai pas besoin de tout cela. Je demande simplement s'il y a un façon de travailler avec l'attribution du portefeuille et l'objet mémoire .blotter suivant de manière à pouvoir utiliser mclapply)

#Load quantstrat in your R environment.

rm(list = ls())

local()

library(quantstrat) 
library(parallel)

# The search command lists all attached packages.
search()

symbolstring1 <- c('QQQ','GOOG')
#symbolstring <- c('QQQ','GOOG')

#for(i in 1:length(symbolstring1))
  mlapply(symbolstring1, function(symbolstring)
{
  #local()
  #i=2
  #symbolstring=as.character(symbolstring1[i])
  
  .blotter <- new.env()
  .strategy <- new.env()
  
  try(rm.strat(strategyName),silent=TRUE)
  try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
  for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
  print(symbolstring)

currency('USD')

stock(symbolstring,currency='USD',multiplier=1)

# Currency and trading instrument objects stored in the 
# .instrument environment

print("FI")
ls(envir=FinancialInstrument:::.instrument)

# blotter functions used for instrument initialization 
# quantstrat creates a private storage area called .strategy

ls(all=T)

# The initDate should be lower than the startDate. The initDate will be used later while initializing the strategy.

initDate <- '2010-01-01'

startDate <- '2011-01-01'

endDate <- '2019-08-10'

init_equity <- 50000

# Set UTC TIME

Sys.setenv(TZ="UTC")

getSymbols(symbolstring,from=startDate,to=endDate,adjust=TRUE,src='yahoo')

# Define names for portfolio, account and strategy. 

#portfolioName <- accountName <- strategyName <- "FirstPortfolio"
portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",symbolstring)

print(portfolioName)
# The function rm.strat removes any strategy, portfolio, account, or order book object with the given name. This is important

#rm.strat(strategyName)

print("port")
initPortf(name = portfolioName,
          symbols = symbolstring,
          initDate = initDate)

initAcct(name = accountName,
         portfolios = portfolioName,
         initDate = initDate,
         initEq = init_equity)

initOrders(portfolio = portfolioName,
           symbols = symbolstring,
           initDate = initDate)



# name: the string name of the strategy

# assets: optional list of assets to apply the strategy to.  

# Normally these are defined in the portfolio object

# contstrains: optional portfolio constraints

# store: can be True or False. If True store the strategy in the environment. Default is False
print("strat")
strategy(strategyName, store = TRUE)

ls(all=T)

# .blotter holds the portfolio and account object 

ls(.blotter)

# .strategy holds the orderbook and strategy object

print(ls(.strategy))

print("ind")
add.indicator(strategy = strategyName, 
              name = "EMA", 
              arguments = list(x = quote(Cl(mktdata)), 
                               n = 10), label = "nFast")

add.indicator(strategy = strategyName, 
              name = "EMA", 
              arguments = list(x = quote(Cl(mktdata)), 
                               n = 30), 
              label = "nSlow")

# Add long signal when the fast EMA crosses over slow EMA.

print("sig")
add.signal(strategy = strategyName,
           name="sigCrossover",
           arguments = list(columns = c("nFast", "nSlow"),
                            relationship = "gte"),
           label = "longSignal")

# Add short signal when the fast EMA goes below slow EMA.

add.signal(strategy = strategyName, 
           name = "sigCrossover",
           arguments = list(columns = c("nFast", "nSlow"),
                            relationship = "lt"),
           label = "shortSignal")

# go long when 10-period EMA (nFast) >= 30-period EMA (nSlow)

print("rul")
add.rule(strategyName,
         name= "ruleSignal",
         arguments=list(sigcol="longSignal",
                        sigval=TRUE,
                        orderqty=100,
                        ordertype="market",
                        orderside="long",
                        replace = TRUE, 
                        TxnFees = -10),
         type="enter",
         label="EnterLong") 

# go short when 10-period EMA (nFast) < 30-period EMA (nSlow)

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "shortSignal", 
                          sigval = TRUE, 
                          orderside = "short", 
                          ordertype = "market", 
                          orderqty = -100, 
                          TxnFees = -10,                     
                          replace = TRUE), 
         type = "enter", 
         label = "EnterShort")

# Close long positions when the shortSignal column is True

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "shortSignal", 
                          sigval = TRUE, 
                          orderside = "long", 
                          ordertype = "market", 
                          orderqty = "all", 
                          TxnFees = -10, 
                          replace = TRUE), 
         type = "exit", 
         label = "ExitLong")

# Close Short positions when the longSignal column is True

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "longSignal", 
                          sigval = TRUE, 
                          orderside = "short", 
                          ordertype = "market", 
                          orderqty = "all", 
                          TxnFees = -10, 
                          replace = TRUE), 
         type = "exit", 
         label = "ExitShort")

print("summary")
summary(getStrategy(strategyName))

# Summary results are produced below

print("results")
results <- applyStrategy(strategy= strategyName, portfolios = portfolioName,symbols=symbolstring)

# The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below

getTxns(Portfolio=portfolioName, Symbol=symbolstring)

mktdata

updatePortf(portfolioName)

dateRange <- time(getPortfolio(portfolioName)$summary)[-1]

updateAcct(portfolioName,dateRange)

updateEndEq(accountName)

print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = "Portfolio Equity"))

#cleanup
for (name in symbolstring) rm(list = name)
#rm(.blotter)
rm(.stoploss)
rm(.txnfees)
#rm(.strategy)
rm(symbols)

}
)

Mais une erreur est renvoyée Erreur dans get(symbol, envir = envir) : object 'QQQ' not found

Plus précisément, le problème est que FinancialInstrument ::: .instrument pointe vers une adresse mémoire qui n'est pas mise à jour avec mes appels de variables encapsulées (chaîne de symboles)

Réponses

3 BrianG.Peterson Aug 17 2020 at 20:37

apply.paramsetin quantstratutilise déjà une foreachconstruction pour paralléliser l'exécution de applyStrategy.

apply.paramsetdoit effectuer une bonne quantité de travail pour s'assurer que les environnements sont disponibles dans les nœuds de calcul pour effectuer le travail, et pour collecter les résultats appropriés pour les renvoyer au processus d'appel.

La chose la plus simple à faire pour vous serait probablement d'utiliser apply.paramset. Définissez vos paramètres de dates et de symboles et faites en sorte que la fonction s'exécute normalement.

Alternativement, je vous suggère de regarder les étapes nécessaires pour utiliser une foreachconstruction parallèle apply.paramsetafin de la modifier selon votre cas suggéré.

Notez également que votre question porte sur l'utilisation d'un cluster Beowulf et mclapply. Cela ne fonctionnera pas. mclapplyne fonctionne que dans un seul espace mémoire. Les clusters Beowulf ne partagent normalement pas un seul espace de mémoire et de processus. Ils distribuent généralement les travaux via des bibliothèques parallèles telles que MPI. apply.paramsetpourrait déjà distribuer sur un cluster Beowulf en utilisant un doMPIbackend pour foreach. C'est l'une des raisons que nous avons utilisées foreach: la multitude de backends parallèles différents qui sont disponibles. Le doMCbackend foreachutilise réellement mclapplyles coulisses.

1 thistleknot Aug 19 2020 at 20:43

Je crois que cela parallélise le code. J'ai échangé les indicateurs ainsi que les symboles, mais la logique d'utiliser différents symboles et dates est là

En gros j'ai ajouté

Dates=paste0(startDate,"::",endDate)

rm(list = ls())

library(lubridate)
library(parallel)

autoregressor1  = function(x){
  if(NROW(x)<12){ result = NA} else{
    y = Vo(x)*Ad(x)
    #y = ROC(Ad(x))
    y = ROC(y)
    y = na.omit(y)
    step1 = ar.yw(y)
    step2 = predict(step1,newdata=y,n.ahead=1)
    step3 = step2$pred[1]+1
    step4 = (step3*last(Ad(x))) - last(Ad(x))
    
    result = step4
  }
  return(result)
}

autoregressor = function(x){
  ans = rollapply(x,26,FUN = autoregressor1,by.column=FALSE)
  return (ans)}

########################indicators#############################

library(quantstrat) 
library(future.apply)
library(scorecard)

reset_quantstrat <- function() {
  if (! exists(".strategy")) .strategy <<- new.env(parent = .GlobalEnv)
  if (! exists(".blotter")) .blotter <<- new.env(parent = .GlobalEnv)
  if (! exists(".audit")) .audit <<- new.env(parent = .GlobalEnv)
  suppressWarnings(rm(list = ls(.strategy), pos = .strategy))
  suppressWarnings(rm(list = ls(.blotter), pos = .blotter))
  suppressWarnings(rm(list = ls(.audit), pos = .audit))
  FinancialInstrument::currency("USD")
}

reset_quantstrat()

initDate <- '2010-01-01'

endDate <- as.Date(Sys.Date())
startDate <- endDate %m-% years(3)

symbolstring1 <- c('SSO','GOLD')

getSymbols(symbolstring1,from=startDate,to=endDate,adjust=TRUE,src='yahoo')

#symbolstring1 <- c('SP500TR','GOOG')

.orderqty <- 1
.txnfees <- 0

#random <- sample(1:2, 2, replace=FALSE)

random <- (1:2)

equity <- lapply(random, function(x)
{#x=1
  try(rm("account.Snazzy","portfolio.Snazzy",pos=.GlobalEnv$.blotter),silent=TRUE)
  rm(.blotter)
  rm(.strategy)
  portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",x+2)
  #endDate <- as.Date(Sys.Date())
  startDate <- endDate %m-% years(1+x)
 
  #Load quantstrat in your R environment.
  reset_quantstrat()
  
  # The search command lists all attached packages.
  search()

  symbolstring=as.character(symbolstring1[x])
  print(symbolstring)
  
  try(rm.strat(strategyName),silent=TRUE)
  try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
  for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
  print(symbolstring)
  
  currency('USD')
  
  stock(symbolstring,currency='USD',multiplier=1)
  
  # Currency and trading instrument objects stored in the 
  # .instrument environment
  
  print("FI")
  ls(envir=FinancialInstrument:::.instrument)
  
  # blotter functions used for instrument initialization 
  # quantstrat creates a private storage area called .strategy
  
  ls(all=T)
  
  init_equity <- 10000
  
  Sys.setenv(TZ="UTC")
  
  print(portfolioName)
 
  print("port")

  try(initPortf(name = portfolioName,
            symbols = symbolstring,
            initDate = initDate))
  
 
  try(initAcct(name = accountName,
           portfolios = portfolioName,
           initDate = initDate,
           initEq = init_equity))
  
  try(initOrders(portfolio = portfolioName,
             symbols = symbolstring,
             initDate = initDate))
  
  # name: the string name of the strategy
  
  # assets: optional list of assets to apply the strategy to.  
  
  # Normally these are defined in the portfolio object
  
  # contstrains: optional portfolio constraints
  
  # store: can be True or False. If True store the strategy in the environment. Default is False
  print("strat")
  strategy(strategyName, store = TRUE)
  
  ls(all=T)
  
  # .blotter holds the portfolio and account object 
  
  ls(.blotter)
  
  # .strategy holds the orderbook and strategy object
  
  print(ls(.strategy))
  
  print("ind")
  #ARIMA
    
    add.indicator(
      strategy  =   strategyName, 
      name      =   "autoregressor", 
      arguments =   list(
        x       =   quote(mktdata)),
      label     =   "arspread")
    
    ################################################ Signals #############################
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = 0.25,
        column          = "arspread",
        relationship    = "gte",
        cross           = TRUE),
      label             = "Selltime")
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = 0.1,
        column          = "arspread",
        relationship    = "lt",
        cross           = TRUE),
      label             = "cashtime")
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = -0.1,
        column          = "arspread",
        relationship    = "gt",
        cross           = TRUE),
      label             = "cashtime")
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = -0.25,
        column          = "arspread",
        relationship    = "lte",
        cross           = TRUE),
      label             = "Buytime")
    
    ######################################## Rules #################################################
    
    #Entry Rule Long
    add.rule(strategyName,
             name               =   "ruleSignal",
             arguments          =   list(
               sigcol           =   "Buytime",
               sigval           =   TRUE,
               orderqty     =   .orderqty,
               ordertype        =   "market",
               orderside        =   "long",
               pricemethod      =   "market",
               replace          =   TRUE,
               TxnFees              =   -.txnfees
               #,
               #osFUN               =   osMaxPos
             ), 
             type               =   "enter",
             path.dep           =   TRUE,
             label              =   "Entry")
    
    #Entry Rule Short
    
    add.rule(strategyName,
             name           =   "ruleSignal",
             arguments          =   list(
               sigcol           =   "Selltime",
               sigval           =   TRUE,
               orderqty     =   .orderqty,
               ordertype        =   "market",
               orderside        =   "short",
               pricemethod      =   "market",
               replace          =   TRUE,
               TxnFees              =   -.txnfees
               #,
               #osFUN               =   osMaxPos
             ), 
             type               =   "enter",
             path.dep           =   TRUE,
             label              =   "Entry")
    
    #Exit Rules
    
  print("summary")
  summary(getStrategy(strategyName))
  
  # Summary results are produced below
  
  print("results")
  
  results <- applyStrategy(strategy= strategyName, portfolios = portfolioName)
  
  # The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below
  
  getTxns(Portfolio=portfolioName, Symbol=symbolstring)
  
  mktdata
  
  updatePortf(portfolioName,Dates=paste0(startDate,"::",endDate))
  
  dateRange <- time(getPortfolio(portfolioName)$summary)
  
  updateAcct(portfolioName,dateRange[which(dateRange >= startDate & dateRange <= endDate)])
  
  updateEndEq(accountName, Dates=paste0(startDate,"::",endDate))
  
  print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = symbolstring))
  
  tStats <- tradeStats(Portfolios = portfolioName, use="trades", inclZeroDays=FALSE,Dates=paste0(startDate,"::",endDate))
  
  final_acct <- getAccount(portfolioName)
  
  #final_acct
  #View(final_acct)
  
  options(width=70)
  
  print(plot(tail(final_acct$summary$End.Eq,-1), main = symbolstring))
  #dev.off()
  
  tail(final_acct$summary$End.Eq)
  
  rets <- PortfReturns(Account = accountName)
  
  #rownames(rets) <- NULL
  
  tab.perf <- table.Arbitrary(rets,
                              metrics=c(
                                "Return.cumulative",
                                "Return.annualized",
                                "SharpeRatio.annualized",
                                "CalmarRatio"),
                              metricsNames=c(
                                "Cumulative Return",
                                "Annualized Return",
                                "Annualized Sharpe Ratio",
                                "Calmar Ratio"))
  tab.perf
  
  tab.risk <- table.Arbitrary(rets,
                              metrics=c(
                                "StdDev.annualized",
                                "maxDrawdown"
                              ),
                              metricsNames=c(
                                "Annualized StdDev",
                                "Max DrawDown"))
  
  tab.risk
  
  return (as.numeric(tail(final_acct$summary$End.Eq,1))-init_equity)

  #reset_quantstrat()
  
}
)

il semble être parallélisé mais il ne met pas correctement à jour init_equity