Come posso sostituire Quantstrat 'for loop' con mclapply [parallelizzato]?
Vorrei parallelizzare quantstrat. Il mio codice non è esattamente così, ma questo mostra il problema. Il problema che credo sia che .blotter env è inizializzato su un indirizzo di memoria del puntatore e non sono in grado di inizializzare un array/matrice di new.env().
Quello che vorrei fare è sostituire il ciclo for con un mclapply in modo da poter eseguire più strategie di applicazione con date/simboli variabili (qui vengono mostrati solo simboli variabili). Il mio obiettivo finale è un cluster beowulf (makeCluster) e ho intenzione di eseguirli in parallelo utilizzando fino a 252 giorni di negoziazione (finestra mobile) con simboli variabili per iterazione (ma non ho bisogno di tutto ciò. Chiedo semplicemente se esiste un modo di lavorare con l'assegnazione del portfolio e il successivo oggetto di memoria .blotter in modo tale da poter usare mclapply)
#Load quantstrat in your R environment.
rm(list = ls())
local()
library(quantstrat)
library(parallel)
# The search command lists all attached packages.
search()
symbolstring1 <- c('QQQ','GOOG')
#symbolstring <- c('QQQ','GOOG')
#for(i in 1:length(symbolstring1))
mlapply(symbolstring1, function(symbolstring)
{
#local()
#i=2
#symbolstring=as.character(symbolstring1[i])
.blotter <- new.env()
.strategy <- new.env()
try(rm.strat(strategyName),silent=TRUE)
try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
print(symbolstring)
currency('USD')
stock(symbolstring,currency='USD',multiplier=1)
# Currency and trading instrument objects stored in the
# .instrument environment
print("FI")
ls(envir=FinancialInstrument:::.instrument)
# blotter functions used for instrument initialization
# quantstrat creates a private storage area called .strategy
ls(all=T)
# The initDate should be lower than the startDate. The initDate will be used later while initializing the strategy.
initDate <- '2010-01-01'
startDate <- '2011-01-01'
endDate <- '2019-08-10'
init_equity <- 50000
# Set UTC TIME
Sys.setenv(TZ="UTC")
getSymbols(symbolstring,from=startDate,to=endDate,adjust=TRUE,src='yahoo')
# Define names for portfolio, account and strategy.
#portfolioName <- accountName <- strategyName <- "FirstPortfolio"
portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",symbolstring)
print(portfolioName)
# The function rm.strat removes any strategy, portfolio, account, or order book object with the given name. This is important
#rm.strat(strategyName)
print("port")
initPortf(name = portfolioName,
symbols = symbolstring,
initDate = initDate)
initAcct(name = accountName,
portfolios = portfolioName,
initDate = initDate,
initEq = init_equity)
initOrders(portfolio = portfolioName,
symbols = symbolstring,
initDate = initDate)
# name: the string name of the strategy
# assets: optional list of assets to apply the strategy to.
# Normally these are defined in the portfolio object
# contstrains: optional portfolio constraints
# store: can be True or False. If True store the strategy in the environment. Default is False
print("strat")
strategy(strategyName, store = TRUE)
ls(all=T)
# .blotter holds the portfolio and account object
ls(.blotter)
# .strategy holds the orderbook and strategy object
print(ls(.strategy))
print("ind")
add.indicator(strategy = strategyName,
name = "EMA",
arguments = list(x = quote(Cl(mktdata)),
n = 10), label = "nFast")
add.indicator(strategy = strategyName,
name = "EMA",
arguments = list(x = quote(Cl(mktdata)),
n = 30),
label = "nSlow")
# Add long signal when the fast EMA crosses over slow EMA.
print("sig")
add.signal(strategy = strategyName,
name="sigCrossover",
arguments = list(columns = c("nFast", "nSlow"),
relationship = "gte"),
label = "longSignal")
# Add short signal when the fast EMA goes below slow EMA.
add.signal(strategy = strategyName,
name = "sigCrossover",
arguments = list(columns = c("nFast", "nSlow"),
relationship = "lt"),
label = "shortSignal")
# go long when 10-period EMA (nFast) >= 30-period EMA (nSlow)
print("rul")
add.rule(strategyName,
name= "ruleSignal",
arguments=list(sigcol="longSignal",
sigval=TRUE,
orderqty=100,
ordertype="market",
orderside="long",
replace = TRUE,
TxnFees = -10),
type="enter",
label="EnterLong")
# go short when 10-period EMA (nFast) < 30-period EMA (nSlow)
add.rule(strategyName,
name = "ruleSignal",
arguments = list(sigcol = "shortSignal",
sigval = TRUE,
orderside = "short",
ordertype = "market",
orderqty = -100,
TxnFees = -10,
replace = TRUE),
type = "enter",
label = "EnterShort")
# Close long positions when the shortSignal column is True
add.rule(strategyName,
name = "ruleSignal",
arguments = list(sigcol = "shortSignal",
sigval = TRUE,
orderside = "long",
ordertype = "market",
orderqty = "all",
TxnFees = -10,
replace = TRUE),
type = "exit",
label = "ExitLong")
# Close Short positions when the longSignal column is True
add.rule(strategyName,
name = "ruleSignal",
arguments = list(sigcol = "longSignal",
sigval = TRUE,
orderside = "short",
ordertype = "market",
orderqty = "all",
TxnFees = -10,
replace = TRUE),
type = "exit",
label = "ExitShort")
print("summary")
summary(getStrategy(strategyName))
# Summary results are produced below
print("results")
results <- applyStrategy(strategy= strategyName, portfolios = portfolioName,symbols=symbolstring)
# The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below
getTxns(Portfolio=portfolioName, Symbol=symbolstring)
mktdata
updatePortf(portfolioName)
dateRange <- time(getPortfolio(portfolioName)$summary)[-1]
updateAcct(portfolioName,dateRange)
updateEndEq(accountName)
print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = "Portfolio Equity"))
#cleanup
for (name in symbolstring) rm(list = name)
#rm(.blotter)
rm(.stoploss)
rm(.txnfees)
#rm(.strategy)
rm(symbols)
}
)
Ma viene generato un errore Errore in get(simbolo, envir = envir): oggetto 'QQQ' non trovato
Nello specifico il problema è FinancialInstrument:::.instrument che punta a un indirizzo di memoria che non viene aggiornato con le mie chiamate variabili incapsulate (stringa di simboli)
Risposte
apply.paramsetin quantstratutilizza già un foreachcostrutto per parallelizzare l'esecuzione di applyStrategy.
apply.paramsetha bisogno di fare una discreta quantità di lavoro per assicurarsi che gli ambienti siano disponibili nei lavoratori per svolgere il lavoro e per raccogliere i risultati appropriati per rimandarli al processo di chiamata.
La cosa più semplice da fare sarebbe probabilmente usare apply.paramset. Crea i tuoi parametri di date e simboli e fai in modo che la funzione venga eseguita normalmente.
In alternativa, ti suggerisco di esaminare i passaggi necessari per utilizzare una foreachcostruzione parallela apply.paramsetper modificarla nel caso suggerito.
Si noti inoltre che la domanda riguarda l'utilizzo di un cluster Beowulf e mclapply. Questo non funzionerà. mclapplyfunziona solo in un singolo spazio di memoria. I cluster Beowulf normalmente non condividono una singola memoria e spazio di elaborazione. In genere distribuiscono i lavori tramite librerie parallele come MPI. apply.paramsetpotrebbe già distribuire su un cluster Beowulf utilizzando un doMPIback-end per foreach. Questo è uno dei motivi che abbiamo utilizzato foreach: la moltitudine di diversi backend paralleli disponibili. Il doMCbackend per foreachutilizza effettivamente mclapplydietro le quinte.
Credo che questo parallelizzi il codice. Ho scambiato gli indicatori così come i simboli, ma la logica dell'uso di simboli e date diversi è lì
Fondamentalmente ho aggiunto
Dates=paste0(startDate,"::",endDate)
rm(list = ls())
library(lubridate)
library(parallel)
autoregressor1 = function(x){
if(NROW(x)<12){ result = NA} else{
y = Vo(x)*Ad(x)
#y = ROC(Ad(x))
y = ROC(y)
y = na.omit(y)
step1 = ar.yw(y)
step2 = predict(step1,newdata=y,n.ahead=1)
step3 = step2$pred[1]+1
step4 = (step3*last(Ad(x))) - last(Ad(x))
result = step4
}
return(result)
}
autoregressor = function(x){
ans = rollapply(x,26,FUN = autoregressor1,by.column=FALSE)
return (ans)}
########################indicators#############################
library(quantstrat)
library(future.apply)
library(scorecard)
reset_quantstrat <- function() {
if (! exists(".strategy")) .strategy <<- new.env(parent = .GlobalEnv)
if (! exists(".blotter")) .blotter <<- new.env(parent = .GlobalEnv)
if (! exists(".audit")) .audit <<- new.env(parent = .GlobalEnv)
suppressWarnings(rm(list = ls(.strategy), pos = .strategy))
suppressWarnings(rm(list = ls(.blotter), pos = .blotter))
suppressWarnings(rm(list = ls(.audit), pos = .audit))
FinancialInstrument::currency("USD")
}
reset_quantstrat()
initDate <- '2010-01-01'
endDate <- as.Date(Sys.Date())
startDate <- endDate %m-% years(3)
symbolstring1 <- c('SSO','GOLD')
getSymbols(symbolstring1,from=startDate,to=endDate,adjust=TRUE,src='yahoo')
#symbolstring1 <- c('SP500TR','GOOG')
.orderqty <- 1
.txnfees <- 0
#random <- sample(1:2, 2, replace=FALSE)
random <- (1:2)
equity <- lapply(random, function(x)
{#x=1
try(rm("account.Snazzy","portfolio.Snazzy",pos=.GlobalEnv$.blotter),silent=TRUE)
rm(.blotter)
rm(.strategy)
portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",x+2)
#endDate <- as.Date(Sys.Date())
startDate <- endDate %m-% years(1+x)
#Load quantstrat in your R environment.
reset_quantstrat()
# The search command lists all attached packages.
search()
symbolstring=as.character(symbolstring1[x])
print(symbolstring)
try(rm.strat(strategyName),silent=TRUE)
try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
print(symbolstring)
currency('USD')
stock(symbolstring,currency='USD',multiplier=1)
# Currency and trading instrument objects stored in the
# .instrument environment
print("FI")
ls(envir=FinancialInstrument:::.instrument)
# blotter functions used for instrument initialization
# quantstrat creates a private storage area called .strategy
ls(all=T)
init_equity <- 10000
Sys.setenv(TZ="UTC")
print(portfolioName)
print("port")
try(initPortf(name = portfolioName,
symbols = symbolstring,
initDate = initDate))
try(initAcct(name = accountName,
portfolios = portfolioName,
initDate = initDate,
initEq = init_equity))
try(initOrders(portfolio = portfolioName,
symbols = symbolstring,
initDate = initDate))
# name: the string name of the strategy
# assets: optional list of assets to apply the strategy to.
# Normally these are defined in the portfolio object
# contstrains: optional portfolio constraints
# store: can be True or False. If True store the strategy in the environment. Default is False
print("strat")
strategy(strategyName, store = TRUE)
ls(all=T)
# .blotter holds the portfolio and account object
ls(.blotter)
# .strategy holds the orderbook and strategy object
print(ls(.strategy))
print("ind")
#ARIMA
add.indicator(
strategy = strategyName,
name = "autoregressor",
arguments = list(
x = quote(mktdata)),
label = "arspread")
################################################ Signals #############################
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = 0.25,
column = "arspread",
relationship = "gte",
cross = TRUE),
label = "Selltime")
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = 0.1,
column = "arspread",
relationship = "lt",
cross = TRUE),
label = "cashtime")
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = -0.1,
column = "arspread",
relationship = "gt",
cross = TRUE),
label = "cashtime")
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = -0.25,
column = "arspread",
relationship = "lte",
cross = TRUE),
label = "Buytime")
######################################## Rules #################################################
#Entry Rule Long
add.rule(strategyName,
name = "ruleSignal",
arguments = list(
sigcol = "Buytime",
sigval = TRUE,
orderqty = .orderqty,
ordertype = "market",
orderside = "long",
pricemethod = "market",
replace = TRUE,
TxnFees = -.txnfees
#,
#osFUN = osMaxPos
),
type = "enter",
path.dep = TRUE,
label = "Entry")
#Entry Rule Short
add.rule(strategyName,
name = "ruleSignal",
arguments = list(
sigcol = "Selltime",
sigval = TRUE,
orderqty = .orderqty,
ordertype = "market",
orderside = "short",
pricemethod = "market",
replace = TRUE,
TxnFees = -.txnfees
#,
#osFUN = osMaxPos
),
type = "enter",
path.dep = TRUE,
label = "Entry")
#Exit Rules
print("summary")
summary(getStrategy(strategyName))
# Summary results are produced below
print("results")
results <- applyStrategy(strategy= strategyName, portfolios = portfolioName)
# The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below
getTxns(Portfolio=portfolioName, Symbol=symbolstring)
mktdata
updatePortf(portfolioName,Dates=paste0(startDate,"::",endDate))
dateRange <- time(getPortfolio(portfolioName)$summary)
updateAcct(portfolioName,dateRange[which(dateRange >= startDate & dateRange <= endDate)])
updateEndEq(accountName, Dates=paste0(startDate,"::",endDate))
print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = symbolstring))
tStats <- tradeStats(Portfolios = portfolioName, use="trades", inclZeroDays=FALSE,Dates=paste0(startDate,"::",endDate))
final_acct <- getAccount(portfolioName)
#final_acct
#View(final_acct)
options(width=70)
print(plot(tail(final_acct$summary$End.Eq,-1), main = symbolstring))
#dev.off()
tail(final_acct$summary$End.Eq)
rets <- PortfReturns(Account = accountName)
#rownames(rets) <- NULL
tab.perf <- table.Arbitrary(rets,
metrics=c(
"Return.cumulative",
"Return.annualized",
"SharpeRatio.annualized",
"CalmarRatio"),
metricsNames=c(
"Cumulative Return",
"Annualized Return",
"Annualized Sharpe Ratio",
"Calmar Ratio"))
tab.perf
tab.risk <- table.Arbitrary(rets,
metrics=c(
"StdDev.annualized",
"maxDrawdown"
),
metricsNames=c(
"Annualized StdDev",
"Max DrawDown"))
tab.risk
return (as.numeric(tail(final_acct$summary$End.Eq,1))-init_equity)
#reset_quantstrat()
}
)
sembra essere paralizzato ma non aggiorna correttamente init_equity