Wie ersetze ich Quantstrat 'for loop' durch mclapply [parallelisiert]?

Aug 16 2020

Ich möchte Quantstrat parallelisieren. Mein Code ist nicht genau so, aber dies zeigt das Problem. Ich glaube, das Problem ist, dass die .blotter env auf eine Zeigerspeicheradresse initialisiert wird und ich kein Array/Matrix von new.env() initialisieren kann.

Ich möchte die for-Schleife durch ein mclapply ersetzen, damit ich mehrere applyStrategies mit unterschiedlichen Daten/Symbolen ausführen kann (hier werden nur unterschiedliche Symbole angezeigt). Mein Endziel ist ein Beowulf-Cluster (makeCluster) und plane, diese parallel mit bis zu 252 Handelstagen (rollierendes Fenster) mit unterschiedlichen Symbolen pro Iteration auszuführen (aber ich brauche das alles nicht. Ich frage einfach, ob es einen gibt Weg, mit der Zuweisung von Portfolio und dem anschließenden .blotter-Speicherobjekt so zu arbeiten, dass ich mclapply verwenden kann)

#Load quantstrat in your R environment.

rm(list = ls())

local()

library(quantstrat) 
library(parallel)

# The search command lists all attached packages.
search()

symbolstring1 <- c('QQQ','GOOG')
#symbolstring <- c('QQQ','GOOG')

#for(i in 1:length(symbolstring1))
  mlapply(symbolstring1, function(symbolstring)
{
  #local()
  #i=2
  #symbolstring=as.character(symbolstring1[i])
  
  .blotter <- new.env()
  .strategy <- new.env()
  
  try(rm.strat(strategyName),silent=TRUE)
  try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
  for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
  print(symbolstring)

currency('USD')

stock(symbolstring,currency='USD',multiplier=1)

# Currency and trading instrument objects stored in the 
# .instrument environment

print("FI")
ls(envir=FinancialInstrument:::.instrument)

# blotter functions used for instrument initialization 
# quantstrat creates a private storage area called .strategy

ls(all=T)

# The initDate should be lower than the startDate. The initDate will be used later while initializing the strategy.

initDate <- '2010-01-01'

startDate <- '2011-01-01'

endDate <- '2019-08-10'

init_equity <- 50000

# Set UTC TIME

Sys.setenv(TZ="UTC")

getSymbols(symbolstring,from=startDate,to=endDate,adjust=TRUE,src='yahoo')

# Define names for portfolio, account and strategy. 

#portfolioName <- accountName <- strategyName <- "FirstPortfolio"
portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",symbolstring)

print(portfolioName)
# The function rm.strat removes any strategy, portfolio, account, or order book object with the given name. This is important

#rm.strat(strategyName)

print("port")
initPortf(name = portfolioName,
          symbols = symbolstring,
          initDate = initDate)

initAcct(name = accountName,
         portfolios = portfolioName,
         initDate = initDate,
         initEq = init_equity)

initOrders(portfolio = portfolioName,
           symbols = symbolstring,
           initDate = initDate)



# name: the string name of the strategy

# assets: optional list of assets to apply the strategy to.  

# Normally these are defined in the portfolio object

# contstrains: optional portfolio constraints

# store: can be True or False. If True store the strategy in the environment. Default is False
print("strat")
strategy(strategyName, store = TRUE)

ls(all=T)

# .blotter holds the portfolio and account object 

ls(.blotter)

# .strategy holds the orderbook and strategy object

print(ls(.strategy))

print("ind")
add.indicator(strategy = strategyName, 
              name = "EMA", 
              arguments = list(x = quote(Cl(mktdata)), 
                               n = 10), label = "nFast")

add.indicator(strategy = strategyName, 
              name = "EMA", 
              arguments = list(x = quote(Cl(mktdata)), 
                               n = 30), 
              label = "nSlow")

# Add long signal when the fast EMA crosses over slow EMA.

print("sig")
add.signal(strategy = strategyName,
           name="sigCrossover",
           arguments = list(columns = c("nFast", "nSlow"),
                            relationship = "gte"),
           label = "longSignal")

# Add short signal when the fast EMA goes below slow EMA.

add.signal(strategy = strategyName, 
           name = "sigCrossover",
           arguments = list(columns = c("nFast", "nSlow"),
                            relationship = "lt"),
           label = "shortSignal")

# go long when 10-period EMA (nFast) >= 30-period EMA (nSlow)

print("rul")
add.rule(strategyName,
         name= "ruleSignal",
         arguments=list(sigcol="longSignal",
                        sigval=TRUE,
                        orderqty=100,
                        ordertype="market",
                        orderside="long",
                        replace = TRUE, 
                        TxnFees = -10),
         type="enter",
         label="EnterLong") 

# go short when 10-period EMA (nFast) < 30-period EMA (nSlow)

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "shortSignal", 
                          sigval = TRUE, 
                          orderside = "short", 
                          ordertype = "market", 
                          orderqty = -100, 
                          TxnFees = -10,                     
                          replace = TRUE), 
         type = "enter", 
         label = "EnterShort")

# Close long positions when the shortSignal column is True

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "shortSignal", 
                          sigval = TRUE, 
                          orderside = "long", 
                          ordertype = "market", 
                          orderqty = "all", 
                          TxnFees = -10, 
                          replace = TRUE), 
         type = "exit", 
         label = "ExitLong")

# Close Short positions when the longSignal column is True

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "longSignal", 
                          sigval = TRUE, 
                          orderside = "short", 
                          ordertype = "market", 
                          orderqty = "all", 
                          TxnFees = -10, 
                          replace = TRUE), 
         type = "exit", 
         label = "ExitShort")

print("summary")
summary(getStrategy(strategyName))

# Summary results are produced below

print("results")
results <- applyStrategy(strategy= strategyName, portfolios = portfolioName,symbols=symbolstring)

# The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below

getTxns(Portfolio=portfolioName, Symbol=symbolstring)

mktdata

updatePortf(portfolioName)

dateRange <- time(getPortfolio(portfolioName)$summary)[-1]

updateAcct(portfolioName,dateRange)

updateEndEq(accountName)

print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = "Portfolio Equity"))

#cleanup
for (name in symbolstring) rm(list = name)
#rm(.blotter)
rm(.stoploss)
rm(.txnfees)
#rm(.strategy)
rm(symbols)

}
)

Aber es wird ein Fehler ausgegeben Fehler in get(symbol, envir = envir) : object 'QQQ' not found

Das Problem ist insbesondere, dass FinancialInstrument:::.instrument auf eine Speicheradresse zeigt, die nicht mit meinen gekapselten Variablenaufrufen (Symbolstring) aktualisiert wird.

Antworten

3 BrianG.Peterson Aug 17 2020 at 20:37

apply.paramsetin quantstratverwendet bereits ein foreachKonstrukt zur Parallelisierung der Ausführung von applyStrategy.

apply.paramsetmuss ziemlich viel Arbeit leisten, um sicherzustellen, dass die Umgebungen in den Workern verfügbar sind, um die Arbeit zu erledigen, und um die richtigen Ergebnisse zu sammeln, um sie an den aufrufenden Prozess zurückzusenden.

Das Einfachste für Sie wäre wahrscheinlich die Verwendung von apply.paramset. Machen Sie Ihre Datums- und Symbolparameter und lassen Sie die Funktion normal laufen.

Alternativ schlage ich vor, dass Sie sich die Schritte ansehen, die erforderlich sind, um eine parallele foreachKonstruktion apply.paramsetzu verwenden, um sie an Ihren vorgeschlagenen Fall anzupassen.

Beachten Sie auch, dass Ihre Frage nach der Verwendung eines Beowulf-Clusters und mclapply. Das wird nicht funktionieren. mclapplyfunktioniert nur in einem einzigen Speicherplatz. Beowulf-Cluster teilen sich normalerweise keinen einzigen Speicher- und Prozessbereich. Sie verteilen Jobs typischerweise über parallele Bibliotheken wie MPI. apply.paramsetkonnte bereits auf einem Beowulf-Cluster verteilt werden, indem ein doMPIBackend verwendet wurde, um foreach. Das ist einer der Gründe, warum wir verwendet haben foreach: die Vielzahl verschiedener paralleler Backends, die verfügbar sind. Das doMCBackend für foreacheigentliche Einsätze mclapplyhinter den Kulissen.

1 thistleknot Aug 19 2020 at 20:43

Ich glaube, das parallelisiert den Code. Ich habe sowohl die Indikatoren als auch die Symbole ausgetauscht, aber die Logik der Verwendung unterschiedlicher Symbole und Daten ist darin enthalten

Im Grunde habe ich hinzugefügt

Dates=paste0(startDate,"::",endDate)

rm(list = ls())

library(lubridate)
library(parallel)

autoregressor1  = function(x){
  if(NROW(x)<12){ result = NA} else{
    y = Vo(x)*Ad(x)
    #y = ROC(Ad(x))
    y = ROC(y)
    y = na.omit(y)
    step1 = ar.yw(y)
    step2 = predict(step1,newdata=y,n.ahead=1)
    step3 = step2$pred[1]+1
    step4 = (step3*last(Ad(x))) - last(Ad(x))
    
    result = step4
  }
  return(result)
}

autoregressor = function(x){
  ans = rollapply(x,26,FUN = autoregressor1,by.column=FALSE)
  return (ans)}

########################indicators#############################

library(quantstrat) 
library(future.apply)
library(scorecard)

reset_quantstrat <- function() {
  if (! exists(".strategy")) .strategy <<- new.env(parent = .GlobalEnv)
  if (! exists(".blotter")) .blotter <<- new.env(parent = .GlobalEnv)
  if (! exists(".audit")) .audit <<- new.env(parent = .GlobalEnv)
  suppressWarnings(rm(list = ls(.strategy), pos = .strategy))
  suppressWarnings(rm(list = ls(.blotter), pos = .blotter))
  suppressWarnings(rm(list = ls(.audit), pos = .audit))
  FinancialInstrument::currency("USD")
}

reset_quantstrat()

initDate <- '2010-01-01'

endDate <- as.Date(Sys.Date())
startDate <- endDate %m-% years(3)

symbolstring1 <- c('SSO','GOLD')

getSymbols(symbolstring1,from=startDate,to=endDate,adjust=TRUE,src='yahoo')

#symbolstring1 <- c('SP500TR','GOOG')

.orderqty <- 1
.txnfees <- 0

#random <- sample(1:2, 2, replace=FALSE)

random <- (1:2)

equity <- lapply(random, function(x)
{#x=1
  try(rm("account.Snazzy","portfolio.Snazzy",pos=.GlobalEnv$.blotter),silent=TRUE)
  rm(.blotter)
  rm(.strategy)
  portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",x+2)
  #endDate <- as.Date(Sys.Date())
  startDate <- endDate %m-% years(1+x)
 
  #Load quantstrat in your R environment.
  reset_quantstrat()
  
  # The search command lists all attached packages.
  search()

  symbolstring=as.character(symbolstring1[x])
  print(symbolstring)
  
  try(rm.strat(strategyName),silent=TRUE)
  try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
  for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
  print(symbolstring)
  
  currency('USD')
  
  stock(symbolstring,currency='USD',multiplier=1)
  
  # Currency and trading instrument objects stored in the 
  # .instrument environment
  
  print("FI")
  ls(envir=FinancialInstrument:::.instrument)
  
  # blotter functions used for instrument initialization 
  # quantstrat creates a private storage area called .strategy
  
  ls(all=T)
  
  init_equity <- 10000
  
  Sys.setenv(TZ="UTC")
  
  print(portfolioName)
 
  print("port")

  try(initPortf(name = portfolioName,
            symbols = symbolstring,
            initDate = initDate))
  
 
  try(initAcct(name = accountName,
           portfolios = portfolioName,
           initDate = initDate,
           initEq = init_equity))
  
  try(initOrders(portfolio = portfolioName,
             symbols = symbolstring,
             initDate = initDate))
  
  # name: the string name of the strategy
  
  # assets: optional list of assets to apply the strategy to.  
  
  # Normally these are defined in the portfolio object
  
  # contstrains: optional portfolio constraints
  
  # store: can be True or False. If True store the strategy in the environment. Default is False
  print("strat")
  strategy(strategyName, store = TRUE)
  
  ls(all=T)
  
  # .blotter holds the portfolio and account object 
  
  ls(.blotter)
  
  # .strategy holds the orderbook and strategy object
  
  print(ls(.strategy))
  
  print("ind")
  #ARIMA
    
    add.indicator(
      strategy  =   strategyName, 
      name      =   "autoregressor", 
      arguments =   list(
        x       =   quote(mktdata)),
      label     =   "arspread")
    
    ################################################ Signals #############################
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = 0.25,
        column          = "arspread",
        relationship    = "gte",
        cross           = TRUE),
      label             = "Selltime")
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = 0.1,
        column          = "arspread",
        relationship    = "lt",
        cross           = TRUE),
      label             = "cashtime")
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = -0.1,
        column          = "arspread",
        relationship    = "gt",
        cross           = TRUE),
      label             = "cashtime")
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = -0.25,
        column          = "arspread",
        relationship    = "lte",
        cross           = TRUE),
      label             = "Buytime")
    
    ######################################## Rules #################################################
    
    #Entry Rule Long
    add.rule(strategyName,
             name               =   "ruleSignal",
             arguments          =   list(
               sigcol           =   "Buytime",
               sigval           =   TRUE,
               orderqty     =   .orderqty,
               ordertype        =   "market",
               orderside        =   "long",
               pricemethod      =   "market",
               replace          =   TRUE,
               TxnFees              =   -.txnfees
               #,
               #osFUN               =   osMaxPos
             ), 
             type               =   "enter",
             path.dep           =   TRUE,
             label              =   "Entry")
    
    #Entry Rule Short
    
    add.rule(strategyName,
             name           =   "ruleSignal",
             arguments          =   list(
               sigcol           =   "Selltime",
               sigval           =   TRUE,
               orderqty     =   .orderqty,
               ordertype        =   "market",
               orderside        =   "short",
               pricemethod      =   "market",
               replace          =   TRUE,
               TxnFees              =   -.txnfees
               #,
               #osFUN               =   osMaxPos
             ), 
             type               =   "enter",
             path.dep           =   TRUE,
             label              =   "Entry")
    
    #Exit Rules
    
  print("summary")
  summary(getStrategy(strategyName))
  
  # Summary results are produced below
  
  print("results")
  
  results <- applyStrategy(strategy= strategyName, portfolios = portfolioName)
  
  # The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below
  
  getTxns(Portfolio=portfolioName, Symbol=symbolstring)
  
  mktdata
  
  updatePortf(portfolioName,Dates=paste0(startDate,"::",endDate))
  
  dateRange <- time(getPortfolio(portfolioName)$summary)
  
  updateAcct(portfolioName,dateRange[which(dateRange >= startDate & dateRange <= endDate)])
  
  updateEndEq(accountName, Dates=paste0(startDate,"::",endDate))
  
  print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = symbolstring))
  
  tStats <- tradeStats(Portfolios = portfolioName, use="trades", inclZeroDays=FALSE,Dates=paste0(startDate,"::",endDate))
  
  final_acct <- getAccount(portfolioName)
  
  #final_acct
  #View(final_acct)
  
  options(width=70)
  
  print(plot(tail(final_acct$summary$End.Eq,-1), main = symbolstring))
  #dev.off()
  
  tail(final_acct$summary$End.Eq)
  
  rets <- PortfReturns(Account = accountName)
  
  #rownames(rets) <- NULL
  
  tab.perf <- table.Arbitrary(rets,
                              metrics=c(
                                "Return.cumulative",
                                "Return.annualized",
                                "SharpeRatio.annualized",
                                "CalmarRatio"),
                              metricsNames=c(
                                "Cumulative Return",
                                "Annualized Return",
                                "Annualized Sharpe Ratio",
                                "Calmar Ratio"))
  tab.perf
  
  tab.risk <- table.Arbitrary(rets,
                              metrics=c(
                                "StdDev.annualized",
                                "maxDrawdown"
                              ),
                              metricsNames=c(
                                "Annualized StdDev",
                                "Max DrawDown"))
  
  tab.risk
  
  return (as.numeric(tail(final_acct$summary$End.Eq,1))-init_equity)

  #reset_quantstrat()
  
}
)

es scheint parallisiert zu sein, aber es aktualisiert init_equity nicht korrekt