Как заменить цикл for в Quantstrat на mclapply [parallelized]?

Aug 16 2020

Я хочу распараллелить квантстрат. Мой код не совсем такой, но он демонстрирует проблему. Я считаю, что проблема заключается в том, что .blotter env инициализируется адресом памяти указателя, и я не могу инициализировать массив / матрицу new.env ().

Я хотел бы заменить цикл for на mclapply, чтобы я мог запускать несколько applyStrategies с разными датами / символами (здесь показаны только разные символы). Моя конечная цель - кластер beowulf (makeCluster), и я планирую запускать их параллельно, используя до 252 торговых дней (скользящее окно) с различными символами на итерацию (но мне все это не нужно. Я просто спрашиваю, есть ли способ работы с назначением портфолио и последующего объекта памяти .blotter таким образом, чтобы я мог использовать mclapply)

#Load quantstrat in your R environment.

rm(list = ls())

local()

library(quantstrat) 
library(parallel)

# The search command lists all attached packages.
search()

symbolstring1 <- c('QQQ','GOOG')
#symbolstring <- c('QQQ','GOOG')

#for(i in 1:length(symbolstring1))
  mlapply(symbolstring1, function(symbolstring)
{
  #local()
  #i=2
  #symbolstring=as.character(symbolstring1[i])
  
  .blotter <- new.env()
  .strategy <- new.env()
  
  try(rm.strat(strategyName),silent=TRUE)
  try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
  for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
  print(symbolstring)

currency('USD')

stock(symbolstring,currency='USD',multiplier=1)

# Currency and trading instrument objects stored in the 
# .instrument environment

print("FI")
ls(envir=FinancialInstrument:::.instrument)

# blotter functions used for instrument initialization 
# quantstrat creates a private storage area called .strategy

ls(all=T)

# The initDate should be lower than the startDate. The initDate will be used later while initializing the strategy.

initDate <- '2010-01-01'

startDate <- '2011-01-01'

endDate <- '2019-08-10'

init_equity <- 50000

# Set UTC TIME

Sys.setenv(TZ="UTC")

getSymbols(symbolstring,from=startDate,to=endDate,adjust=TRUE,src='yahoo')

# Define names for portfolio, account and strategy. 

#portfolioName <- accountName <- strategyName <- "FirstPortfolio"
portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",symbolstring)

print(portfolioName)
# The function rm.strat removes any strategy, portfolio, account, or order book object with the given name. This is important

#rm.strat(strategyName)

print("port")
initPortf(name = portfolioName,
          symbols = symbolstring,
          initDate = initDate)

initAcct(name = accountName,
         portfolios = portfolioName,
         initDate = initDate,
         initEq = init_equity)

initOrders(portfolio = portfolioName,
           symbols = symbolstring,
           initDate = initDate)



# name: the string name of the strategy

# assets: optional list of assets to apply the strategy to.  

# Normally these are defined in the portfolio object

# contstrains: optional portfolio constraints

# store: can be True or False. If True store the strategy in the environment. Default is False
print("strat")
strategy(strategyName, store = TRUE)

ls(all=T)

# .blotter holds the portfolio and account object 

ls(.blotter)

# .strategy holds the orderbook and strategy object

print(ls(.strategy))

print("ind")
add.indicator(strategy = strategyName, 
              name = "EMA", 
              arguments = list(x = quote(Cl(mktdata)), 
                               n = 10), label = "nFast")

add.indicator(strategy = strategyName, 
              name = "EMA", 
              arguments = list(x = quote(Cl(mktdata)), 
                               n = 30), 
              label = "nSlow")

# Add long signal when the fast EMA crosses over slow EMA.

print("sig")
add.signal(strategy = strategyName,
           name="sigCrossover",
           arguments = list(columns = c("nFast", "nSlow"),
                            relationship = "gte"),
           label = "longSignal")

# Add short signal when the fast EMA goes below slow EMA.

add.signal(strategy = strategyName, 
           name = "sigCrossover",
           arguments = list(columns = c("nFast", "nSlow"),
                            relationship = "lt"),
           label = "shortSignal")

# go long when 10-period EMA (nFast) >= 30-period EMA (nSlow)

print("rul")
add.rule(strategyName,
         name= "ruleSignal",
         arguments=list(sigcol="longSignal",
                        sigval=TRUE,
                        orderqty=100,
                        ordertype="market",
                        orderside="long",
                        replace = TRUE, 
                        TxnFees = -10),
         type="enter",
         label="EnterLong") 

# go short when 10-period EMA (nFast) < 30-period EMA (nSlow)

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "shortSignal", 
                          sigval = TRUE, 
                          orderside = "short", 
                          ordertype = "market", 
                          orderqty = -100, 
                          TxnFees = -10,                     
                          replace = TRUE), 
         type = "enter", 
         label = "EnterShort")

# Close long positions when the shortSignal column is True

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "shortSignal", 
                          sigval = TRUE, 
                          orderside = "long", 
                          ordertype = "market", 
                          orderqty = "all", 
                          TxnFees = -10, 
                          replace = TRUE), 
         type = "exit", 
         label = "ExitLong")

# Close Short positions when the longSignal column is True

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "longSignal", 
                          sigval = TRUE, 
                          orderside = "short", 
                          ordertype = "market", 
                          orderqty = "all", 
                          TxnFees = -10, 
                          replace = TRUE), 
         type = "exit", 
         label = "ExitShort")

print("summary")
summary(getStrategy(strategyName))

# Summary results are produced below

print("results")
results <- applyStrategy(strategy= strategyName, portfolios = portfolioName,symbols=symbolstring)

# The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below

getTxns(Portfolio=portfolioName, Symbol=symbolstring)

mktdata

updatePortf(portfolioName)

dateRange <- time(getPortfolio(portfolioName)$summary)[-1] updateAcct(portfolioName,dateRange) updateEndEq(accountName) print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = "Portfolio Equity"))

#cleanup
for (name in symbolstring) rm(list = name)
#rm(.blotter)
rm(.stoploss)
rm(.txnfees)
#rm(.strategy)
rm(symbols)

}
)

Но возникает ошибка Ошибка в get (symbol, envir = envir): объект QQQ не найден

В частности, проблема заключается в том, что инструмент FinancialInstrument :::. Указывает на адрес памяти, который не обновляется с моими инкапсулированными вызовами переменных (строка символов)

Ответы

3 BrianG.Peterson Aug 17 2020 at 20:37

apply.paramsetin quantstratуже использует foreachконструкцию для распараллеливания выполнения applyStrategy.

apply.paramset необходимо проделать изрядный объем работы, чтобы убедиться, что среды доступны в рабочих процессах для выполнения работы, и собрать правильные результаты, чтобы отправить их обратно в вызывающий процесс.

Вероятно, самое простое, что вам нужно сделать, - это использовать apply.paramset. Задайте параметры дат и символов, и функция будет работать нормально.

В качестве альтернативы я предлагаю вам взглянуть на шаги, необходимые для использования параллельной foreachконструкции, apply.paramsetчтобы изменить ее в соответствии с вашим предложенным случаем.

Также обратите внимание, что ваш вопрос касается использования кластера Beowulf и mclapply. Это не сработает. mclapplyработает только в одном пространстве памяти. Кластеры Beowulf обычно не разделяют единую память и пространство процессов. Обычно они распределяют задания через параллельные библиотеки, такие как MPI. apply.paramsetуже может распространяться в кластере Beowulf с помощью doMPIсерверной части foreach. Это одна из причин, по которой мы использовали foreach: доступно множество различных параллельных бэкендов. doMCБэкенд для foreachфактически использует mclapplyза кулисами.

1 thistleknot Aug 19 2020 at 20:43

Я считаю, что это распараллеливает код. Я поменял местами индикаторы и символы, но логика использования разных символов и дат там есть.

В основном я добавил

Dates=paste0(startDate,"::",endDate)

rm(list = ls())

library(lubridate)
library(parallel)

autoregressor1  = function(x){
  if(NROW(x)<12){ result = NA} else{
    y = Vo(x)*Ad(x)
    #y = ROC(Ad(x))
    y = ROC(y)
    y = na.omit(y)
    step1 = ar.yw(y)
    step2 = predict(step1,newdata=y,n.ahead=1)
    step3 = step2$pred[1]+1 step4 = (step3*last(Ad(x))) - last(Ad(x)) result = step4 } return(result) } autoregressor = function(x){ ans = rollapply(x,26,FUN = autoregressor1,by.column=FALSE) return (ans)} ########################indicators############################# library(quantstrat) library(future.apply) library(scorecard) reset_quantstrat <- function() { if (! exists(".strategy")) .strategy <<- new.env(parent = .GlobalEnv) if (! exists(".blotter")) .blotter <<- new.env(parent = .GlobalEnv) if (! exists(".audit")) .audit <<- new.env(parent = .GlobalEnv) suppressWarnings(rm(list = ls(.strategy), pos = .strategy)) suppressWarnings(rm(list = ls(.blotter), pos = .blotter)) suppressWarnings(rm(list = ls(.audit), pos = .audit)) FinancialInstrument::currency("USD") } reset_quantstrat() initDate <- '2010-01-01' endDate <- as.Date(Sys.Date()) startDate <- endDate %m-% years(3) symbolstring1 <- c('SSO','GOLD') getSymbols(symbolstring1,from=startDate,to=endDate,adjust=TRUE,src='yahoo') #symbolstring1 <- c('SP500TR','GOOG') .orderqty <- 1 .txnfees <- 0 #random <- sample(1:2, 2, replace=FALSE) random <- (1:2) equity <- lapply(random, function(x) {#x=1 try(rm("account.Snazzy","portfolio.Snazzy",pos=.GlobalEnv$.blotter),silent=TRUE)
  rm(.blotter)
  rm(.strategy)
  portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",x+2)
  #endDate <- as.Date(Sys.Date())
  startDate <- endDate %m-% years(1+x)
 
  #Load quantstrat in your R environment.
  reset_quantstrat()
  
  # The search command lists all attached packages.
  search()

  symbolstring=as.character(symbolstring1[x])
  print(symbolstring)
  
  try(rm.strat(strategyName),silent=TRUE)
  try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
  for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
  print(symbolstring)
  
  currency('USD')
  
  stock(symbolstring,currency='USD',multiplier=1)
  
  # Currency and trading instrument objects stored in the 
  # .instrument environment
  
  print("FI")
  ls(envir=FinancialInstrument:::.instrument)
  
  # blotter functions used for instrument initialization 
  # quantstrat creates a private storage area called .strategy
  
  ls(all=T)
  
  init_equity <- 10000
  
  Sys.setenv(TZ="UTC")
  
  print(portfolioName)
 
  print("port")

  try(initPortf(name = portfolioName,
            symbols = symbolstring,
            initDate = initDate))
  
 
  try(initAcct(name = accountName,
           portfolios = portfolioName,
           initDate = initDate,
           initEq = init_equity))
  
  try(initOrders(portfolio = portfolioName,
             symbols = symbolstring,
             initDate = initDate))
  
  # name: the string name of the strategy
  
  # assets: optional list of assets to apply the strategy to.  
  
  # Normally these are defined in the portfolio object
  
  # contstrains: optional portfolio constraints
  
  # store: can be True or False. If True store the strategy in the environment. Default is False
  print("strat")
  strategy(strategyName, store = TRUE)
  
  ls(all=T)
  
  # .blotter holds the portfolio and account object 
  
  ls(.blotter)
  
  # .strategy holds the orderbook and strategy object
  
  print(ls(.strategy))
  
  print("ind")
  #ARIMA
    
    add.indicator(
      strategy  =   strategyName, 
      name      =   "autoregressor", 
      arguments =   list(
        x       =   quote(mktdata)),
      label     =   "arspread")
    
    ################################################ Signals #############################
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = 0.25,
        column          = "arspread",
        relationship    = "gte",
        cross           = TRUE),
      label             = "Selltime")
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = 0.1,
        column          = "arspread",
        relationship    = "lt",
        cross           = TRUE),
      label             = "cashtime")
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = -0.1,
        column          = "arspread",
        relationship    = "gt",
        cross           = TRUE),
      label             = "cashtime")
    
    add.signal(
      strategy          = strategyName,
      name              = "sigThreshold",
      arguments         = list(
        threshold       = -0.25,
        column          = "arspread",
        relationship    = "lte",
        cross           = TRUE),
      label             = "Buytime")
    
    ######################################## Rules #################################################
    
    #Entry Rule Long
    add.rule(strategyName,
             name               =   "ruleSignal",
             arguments          =   list(
               sigcol           =   "Buytime",
               sigval           =   TRUE,
               orderqty     =   .orderqty,
               ordertype        =   "market",
               orderside        =   "long",
               pricemethod      =   "market",
               replace          =   TRUE,
               TxnFees              =   -.txnfees
               #,
               #osFUN               =   osMaxPos
             ), 
             type               =   "enter",
             path.dep           =   TRUE,
             label              =   "Entry")
    
    #Entry Rule Short
    
    add.rule(strategyName,
             name           =   "ruleSignal",
             arguments          =   list(
               sigcol           =   "Selltime",
               sigval           =   TRUE,
               orderqty     =   .orderqty,
               ordertype        =   "market",
               orderside        =   "short",
               pricemethod      =   "market",
               replace          =   TRUE,
               TxnFees              =   -.txnfees
               #,
               #osFUN               =   osMaxPos
             ), 
             type               =   "enter",
             path.dep           =   TRUE,
             label              =   "Entry")
    
    #Exit Rules
    
  print("summary")
  summary(getStrategy(strategyName))
  
  # Summary results are produced below
  
  print("results")
  
  results <- applyStrategy(strategy= strategyName, portfolios = portfolioName)
  
  # The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below
  
  getTxns(Portfolio=portfolioName, Symbol=symbolstring)
  
  mktdata
  
  updatePortf(portfolioName,Dates=paste0(startDate,"::",endDate))
  
  dateRange <- time(getPortfolio(portfolioName)$summary) updateAcct(portfolioName,dateRange[which(dateRange >= startDate & dateRange <= endDate)]) updateEndEq(accountName, Dates=paste0(startDate,"::",endDate)) print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = symbolstring)) tStats <- tradeStats(Portfolios = portfolioName, use="trades", inclZeroDays=FALSE,Dates=paste0(startDate,"::",endDate)) final_acct <- getAccount(portfolioName) #final_acct #View(final_acct) options(width=70) print(plot(tail(final_acct$summary$End.Eq,-1), main = symbolstring)) #dev.off() tail(final_acct$summary$End.Eq) rets <- PortfReturns(Account = accountName) #rownames(rets) <- NULL tab.perf <- table.Arbitrary(rets, metrics=c( "Return.cumulative", "Return.annualized", "SharpeRatio.annualized", "CalmarRatio"), metricsNames=c( "Cumulative Return", "Annualized Return", "Annualized Sharpe Ratio", "Calmar Ratio")) tab.perf tab.risk <- table.Arbitrary(rets, metrics=c( "StdDev.annualized", "maxDrawdown" ), metricsNames=c( "Annualized StdDev", "Max DrawDown")) tab.risk return (as.numeric(tail(final_acct$summary$End.Eq,1))-init_equity)

  #reset_quantstrat()
  
}
)

кажется, что он параллелен, но он не обновляет init_equity правильно