Wie ersetze ich Quantstrat 'for loop' durch mclapply [parallelisiert]?
Ich möchte Quantstrat parallelisieren. Mein Code ist nicht genau so, aber dies zeigt das Problem. Ich glaube, das Problem ist, dass die .blotter env auf eine Zeigerspeicheradresse initialisiert wird und ich kein Array/Matrix von new.env() initialisieren kann.
Ich möchte die for-Schleife durch ein mclapply ersetzen, damit ich mehrere applyStrategies mit unterschiedlichen Daten/Symbolen ausführen kann (hier werden nur unterschiedliche Symbole angezeigt). Mein Endziel ist ein Beowulf-Cluster (makeCluster) und plane, diese parallel mit bis zu 252 Handelstagen (rollierendes Fenster) mit unterschiedlichen Symbolen pro Iteration auszuführen (aber ich brauche das alles nicht. Ich frage einfach, ob es einen gibt Weg, mit der Zuweisung von Portfolio und dem anschließenden .blotter-Speicherobjekt so zu arbeiten, dass ich mclapply verwenden kann)
#Load quantstrat in your R environment.
rm(list = ls())
local()
library(quantstrat)
library(parallel)
# The search command lists all attached packages.
search()
symbolstring1 <- c('QQQ','GOOG')
#symbolstring <- c('QQQ','GOOG')
#for(i in 1:length(symbolstring1))
mlapply(symbolstring1, function(symbolstring)
{
#local()
#i=2
#symbolstring=as.character(symbolstring1[i])
.blotter <- new.env()
.strategy <- new.env()
try(rm.strat(strategyName),silent=TRUE)
try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
print(symbolstring)
currency('USD')
stock(symbolstring,currency='USD',multiplier=1)
# Currency and trading instrument objects stored in the
# .instrument environment
print("FI")
ls(envir=FinancialInstrument:::.instrument)
# blotter functions used for instrument initialization
# quantstrat creates a private storage area called .strategy
ls(all=T)
# The initDate should be lower than the startDate. The initDate will be used later while initializing the strategy.
initDate <- '2010-01-01'
startDate <- '2011-01-01'
endDate <- '2019-08-10'
init_equity <- 50000
# Set UTC TIME
Sys.setenv(TZ="UTC")
getSymbols(symbolstring,from=startDate,to=endDate,adjust=TRUE,src='yahoo')
# Define names for portfolio, account and strategy.
#portfolioName <- accountName <- strategyName <- "FirstPortfolio"
portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",symbolstring)
print(portfolioName)
# The function rm.strat removes any strategy, portfolio, account, or order book object with the given name. This is important
#rm.strat(strategyName)
print("port")
initPortf(name = portfolioName,
symbols = symbolstring,
initDate = initDate)
initAcct(name = accountName,
portfolios = portfolioName,
initDate = initDate,
initEq = init_equity)
initOrders(portfolio = portfolioName,
symbols = symbolstring,
initDate = initDate)
# name: the string name of the strategy
# assets: optional list of assets to apply the strategy to.
# Normally these are defined in the portfolio object
# contstrains: optional portfolio constraints
# store: can be True or False. If True store the strategy in the environment. Default is False
print("strat")
strategy(strategyName, store = TRUE)
ls(all=T)
# .blotter holds the portfolio and account object
ls(.blotter)
# .strategy holds the orderbook and strategy object
print(ls(.strategy))
print("ind")
add.indicator(strategy = strategyName,
name = "EMA",
arguments = list(x = quote(Cl(mktdata)),
n = 10), label = "nFast")
add.indicator(strategy = strategyName,
name = "EMA",
arguments = list(x = quote(Cl(mktdata)),
n = 30),
label = "nSlow")
# Add long signal when the fast EMA crosses over slow EMA.
print("sig")
add.signal(strategy = strategyName,
name="sigCrossover",
arguments = list(columns = c("nFast", "nSlow"),
relationship = "gte"),
label = "longSignal")
# Add short signal when the fast EMA goes below slow EMA.
add.signal(strategy = strategyName,
name = "sigCrossover",
arguments = list(columns = c("nFast", "nSlow"),
relationship = "lt"),
label = "shortSignal")
# go long when 10-period EMA (nFast) >= 30-period EMA (nSlow)
print("rul")
add.rule(strategyName,
name= "ruleSignal",
arguments=list(sigcol="longSignal",
sigval=TRUE,
orderqty=100,
ordertype="market",
orderside="long",
replace = TRUE,
TxnFees = -10),
type="enter",
label="EnterLong")
# go short when 10-period EMA (nFast) < 30-period EMA (nSlow)
add.rule(strategyName,
name = "ruleSignal",
arguments = list(sigcol = "shortSignal",
sigval = TRUE,
orderside = "short",
ordertype = "market",
orderqty = -100,
TxnFees = -10,
replace = TRUE),
type = "enter",
label = "EnterShort")
# Close long positions when the shortSignal column is True
add.rule(strategyName,
name = "ruleSignal",
arguments = list(sigcol = "shortSignal",
sigval = TRUE,
orderside = "long",
ordertype = "market",
orderqty = "all",
TxnFees = -10,
replace = TRUE),
type = "exit",
label = "ExitLong")
# Close Short positions when the longSignal column is True
add.rule(strategyName,
name = "ruleSignal",
arguments = list(sigcol = "longSignal",
sigval = TRUE,
orderside = "short",
ordertype = "market",
orderqty = "all",
TxnFees = -10,
replace = TRUE),
type = "exit",
label = "ExitShort")
print("summary")
summary(getStrategy(strategyName))
# Summary results are produced below
print("results")
results <- applyStrategy(strategy= strategyName, portfolios = portfolioName,symbols=symbolstring)
# The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below
getTxns(Portfolio=portfolioName, Symbol=symbolstring)
mktdata
updatePortf(portfolioName)
dateRange <- time(getPortfolio(portfolioName)$summary)[-1]
updateAcct(portfolioName,dateRange)
updateEndEq(accountName)
print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = "Portfolio Equity"))
#cleanup
for (name in symbolstring) rm(list = name)
#rm(.blotter)
rm(.stoploss)
rm(.txnfees)
#rm(.strategy)
rm(symbols)
}
)
Aber es wird ein Fehler ausgegeben Fehler in get(symbol, envir = envir) : object 'QQQ' not found
Das Problem ist insbesondere, dass FinancialInstrument:::.instrument auf eine Speicheradresse zeigt, die nicht mit meinen gekapselten Variablenaufrufen (Symbolstring) aktualisiert wird.
Antworten
apply.paramset
in quantstrat
verwendet bereits ein foreach
Konstrukt zur Parallelisierung der Ausführung von applyStrategy
.
apply.paramset
muss ziemlich viel Arbeit leisten, um sicherzustellen, dass die Umgebungen in den Workern verfügbar sind, um die Arbeit zu erledigen, und um die richtigen Ergebnisse zu sammeln, um sie an den aufrufenden Prozess zurückzusenden.
Das Einfachste für Sie wäre wahrscheinlich die Verwendung von apply.paramset
. Machen Sie Ihre Datums- und Symbolparameter und lassen Sie die Funktion normal laufen.
Alternativ schlage ich vor, dass Sie sich die Schritte ansehen, die erforderlich sind, um eine parallele foreach
Konstruktion apply.paramset
zu verwenden, um sie an Ihren vorgeschlagenen Fall anzupassen.
Beachten Sie auch, dass Ihre Frage nach der Verwendung eines Beowulf-Clusters und mclapply
. Das wird nicht funktionieren. mclapply
funktioniert nur in einem einzigen Speicherplatz. Beowulf-Cluster teilen sich normalerweise keinen einzigen Speicher- und Prozessbereich. Sie verteilen Jobs typischerweise über parallele Bibliotheken wie MPI. apply.paramset
konnte bereits auf einem Beowulf-Cluster verteilt werden, indem ein doMPI
Backend verwendet wurde, um foreach
. Das ist einer der Gründe, warum wir verwendet haben foreach
: die Vielzahl verschiedener paralleler Backends, die verfügbar sind. Das doMC
Backend für foreach
eigentliche Einsätze mclapply
hinter den Kulissen.
Ich glaube, das parallelisiert den Code. Ich habe sowohl die Indikatoren als auch die Symbole ausgetauscht, aber die Logik der Verwendung unterschiedlicher Symbole und Daten ist darin enthalten
Im Grunde habe ich hinzugefügt
Dates=paste0(startDate,"::",endDate)
rm(list = ls())
library(lubridate)
library(parallel)
autoregressor1 = function(x){
if(NROW(x)<12){ result = NA} else{
y = Vo(x)*Ad(x)
#y = ROC(Ad(x))
y = ROC(y)
y = na.omit(y)
step1 = ar.yw(y)
step2 = predict(step1,newdata=y,n.ahead=1)
step3 = step2$pred[1]+1
step4 = (step3*last(Ad(x))) - last(Ad(x))
result = step4
}
return(result)
}
autoregressor = function(x){
ans = rollapply(x,26,FUN = autoregressor1,by.column=FALSE)
return (ans)}
########################indicators#############################
library(quantstrat)
library(future.apply)
library(scorecard)
reset_quantstrat <- function() {
if (! exists(".strategy")) .strategy <<- new.env(parent = .GlobalEnv)
if (! exists(".blotter")) .blotter <<- new.env(parent = .GlobalEnv)
if (! exists(".audit")) .audit <<- new.env(parent = .GlobalEnv)
suppressWarnings(rm(list = ls(.strategy), pos = .strategy))
suppressWarnings(rm(list = ls(.blotter), pos = .blotter))
suppressWarnings(rm(list = ls(.audit), pos = .audit))
FinancialInstrument::currency("USD")
}
reset_quantstrat()
initDate <- '2010-01-01'
endDate <- as.Date(Sys.Date())
startDate <- endDate %m-% years(3)
symbolstring1 <- c('SSO','GOLD')
getSymbols(symbolstring1,from=startDate,to=endDate,adjust=TRUE,src='yahoo')
#symbolstring1 <- c('SP500TR','GOOG')
.orderqty <- 1
.txnfees <- 0
#random <- sample(1:2, 2, replace=FALSE)
random <- (1:2)
equity <- lapply(random, function(x)
{#x=1
try(rm("account.Snazzy","portfolio.Snazzy",pos=.GlobalEnv$.blotter),silent=TRUE)
rm(.blotter)
rm(.strategy)
portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",x+2)
#endDate <- as.Date(Sys.Date())
startDate <- endDate %m-% years(1+x)
#Load quantstrat in your R environment.
reset_quantstrat()
# The search command lists all attached packages.
search()
symbolstring=as.character(symbolstring1[x])
print(symbolstring)
try(rm.strat(strategyName),silent=TRUE)
try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
print(symbolstring)
currency('USD')
stock(symbolstring,currency='USD',multiplier=1)
# Currency and trading instrument objects stored in the
# .instrument environment
print("FI")
ls(envir=FinancialInstrument:::.instrument)
# blotter functions used for instrument initialization
# quantstrat creates a private storage area called .strategy
ls(all=T)
init_equity <- 10000
Sys.setenv(TZ="UTC")
print(portfolioName)
print("port")
try(initPortf(name = portfolioName,
symbols = symbolstring,
initDate = initDate))
try(initAcct(name = accountName,
portfolios = portfolioName,
initDate = initDate,
initEq = init_equity))
try(initOrders(portfolio = portfolioName,
symbols = symbolstring,
initDate = initDate))
# name: the string name of the strategy
# assets: optional list of assets to apply the strategy to.
# Normally these are defined in the portfolio object
# contstrains: optional portfolio constraints
# store: can be True or False. If True store the strategy in the environment. Default is False
print("strat")
strategy(strategyName, store = TRUE)
ls(all=T)
# .blotter holds the portfolio and account object
ls(.blotter)
# .strategy holds the orderbook and strategy object
print(ls(.strategy))
print("ind")
#ARIMA
add.indicator(
strategy = strategyName,
name = "autoregressor",
arguments = list(
x = quote(mktdata)),
label = "arspread")
################################################ Signals #############################
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = 0.25,
column = "arspread",
relationship = "gte",
cross = TRUE),
label = "Selltime")
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = 0.1,
column = "arspread",
relationship = "lt",
cross = TRUE),
label = "cashtime")
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = -0.1,
column = "arspread",
relationship = "gt",
cross = TRUE),
label = "cashtime")
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = -0.25,
column = "arspread",
relationship = "lte",
cross = TRUE),
label = "Buytime")
######################################## Rules #################################################
#Entry Rule Long
add.rule(strategyName,
name = "ruleSignal",
arguments = list(
sigcol = "Buytime",
sigval = TRUE,
orderqty = .orderqty,
ordertype = "market",
orderside = "long",
pricemethod = "market",
replace = TRUE,
TxnFees = -.txnfees
#,
#osFUN = osMaxPos
),
type = "enter",
path.dep = TRUE,
label = "Entry")
#Entry Rule Short
add.rule(strategyName,
name = "ruleSignal",
arguments = list(
sigcol = "Selltime",
sigval = TRUE,
orderqty = .orderqty,
ordertype = "market",
orderside = "short",
pricemethod = "market",
replace = TRUE,
TxnFees = -.txnfees
#,
#osFUN = osMaxPos
),
type = "enter",
path.dep = TRUE,
label = "Entry")
#Exit Rules
print("summary")
summary(getStrategy(strategyName))
# Summary results are produced below
print("results")
results <- applyStrategy(strategy= strategyName, portfolios = portfolioName)
# The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below
getTxns(Portfolio=portfolioName, Symbol=symbolstring)
mktdata
updatePortf(portfolioName,Dates=paste0(startDate,"::",endDate))
dateRange <- time(getPortfolio(portfolioName)$summary)
updateAcct(portfolioName,dateRange[which(dateRange >= startDate & dateRange <= endDate)])
updateEndEq(accountName, Dates=paste0(startDate,"::",endDate))
print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = symbolstring))
tStats <- tradeStats(Portfolios = portfolioName, use="trades", inclZeroDays=FALSE,Dates=paste0(startDate,"::",endDate))
final_acct <- getAccount(portfolioName)
#final_acct
#View(final_acct)
options(width=70)
print(plot(tail(final_acct$summary$End.Eq,-1), main = symbolstring))
#dev.off()
tail(final_acct$summary$End.Eq)
rets <- PortfReturns(Account = accountName)
#rownames(rets) <- NULL
tab.perf <- table.Arbitrary(rets,
metrics=c(
"Return.cumulative",
"Return.annualized",
"SharpeRatio.annualized",
"CalmarRatio"),
metricsNames=c(
"Cumulative Return",
"Annualized Return",
"Annualized Sharpe Ratio",
"Calmar Ratio"))
tab.perf
tab.risk <- table.Arbitrary(rets,
metrics=c(
"StdDev.annualized",
"maxDrawdown"
),
metricsNames=c(
"Annualized StdDev",
"Max DrawDown"))
tab.risk
return (as.numeric(tail(final_acct$summary$End.Eq,1))-init_equity)
#reset_quantstrat()
}
)
es scheint parallisiert zu sein, aber es aktualisiert init_equity nicht korrekt