Apache Storm trong Yahoo! Tài chính
Yahoo! Finance là trang web dữ liệu tài chính và tin tức kinh doanh hàng đầu trên Internet. Nó là một phần của Yahoo! và cung cấp thông tin về tin tức tài chính, thống kê thị trường, dữ liệu thị trường quốc tế và các thông tin khác về nguồn tài chính mà bất kỳ ai cũng có thể truy cập.
Nếu bạn là người đăng ký Yahoo! người dùng, sau đó bạn có thể tùy chỉnh Yahoo! Tài chính để tận dụng các dịch vụ nhất định của nó. Yahoo! API tài chính được sử dụng để truy vấn dữ liệu tài chính từ Yahoo!
API này hiển thị dữ liệu bị trễ 15 phút so với thời gian thực và cập nhật cơ sở dữ liệu của nó sau mỗi 1 phút, để truy cập thông tin liên quan đến chứng khoán hiện tại. Bây giờ chúng ta hãy xem một kịch bản thời gian thực của một công ty và xem cách tăng cảnh báo khi giá trị cổ phiếu của công ty đó xuống dưới 100.
Tạo vòi
Mục đích của vòi là để lấy các chi tiết của ty và phát ra các giá vào bu lông. Bạn có thể sử dụng mã chương trình sau để tạo vòi.
Mã hóa: YahooFinanceSpout.java
import java.util.*;
import java.io.*;
import java.math.BigDecimal;
//import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
public class YahooFinanceSpout implements IRichSpout {
private SpoutOutputCollector collector;
private boolean completed = false;
private TopologyContext context;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
try {
Stock stock = YahooFinance.get("INTC");
BigDecimal price = stock.getQuote().getPrice();
this.collector.emit(new Values("INTC", price.doubleValue()));
stock = YahooFinance.get("GOOGL");
price = stock.getQuote().getPrice();
this.collector.emit(new Values("GOOGL", price.doubleValue()));
stock = YahooFinance.get("AAPL");
price = stock.getQuote().getPrice();
this.collector.emit(new Values("AAPL", price.doubleValue()));
} catch(Exception e) {}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("company", "price"));
}
@Override
public void close() {}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {}
@Override
public void deactivate() {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Tạo bu lông
Ở đây, mục đích của bolt là xử lý giá của công ty nhất định khi giá giảm xuống dưới 100. Nó sử dụng đối tượng Bản đồ Java để đặt cảnh báo giới hạn giá cắt như truekhi giá cổ phiếu giảm xuống dưới 100; nếu không thì sai. Mã chương trình hoàn chỉnh như sau:
Mã hóa: PriceCutOffBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class PriceCutOffBolt implements IRichBolt {
Map<String, Integer> cutOffMap;
Map<String, Boolean> resultMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.cutOffMap = new HashMap <String, Integer>();
this.cutOffMap.put("INTC", 100);
this.cutOffMap.put("AAPL", 100);
this.cutOffMap.put("GOOGL", 100);
this.resultMap = new HashMap<String, Boolean>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String company = tuple.getString(0);
Double price = tuple.getDouble(1);
if(this.cutOffMap.containsKey(company)){
Integer cutOffPrice = this.cutOffMap.get(company);
if(price < cutOffPrice) {
this.resultMap.put(company, true);
} else {
this.resultMap.put(company, false);
}
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("cut_off_price"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Gửi một cấu trúc liên kết
Đây là ứng dụng chính nơi YahooFinanceSpout.java và PriceCutOffBolt.java được kết nối với nhau và tạo ra cấu trúc liên kết. Mã chương trình sau đây cho thấy cách bạn có thể gửi một cấu trúc liên kết.
Mã hóa: YahooFinanceStorm.java
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class YahooFinanceStorm {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());
builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
.fieldsGrouping("yahoo-finance-spout", new Fields("company"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
Xây dựng và chạy ứng dụng
Ứng dụng hoàn chỉnh có ba mã Java. Chúng như sau:
- YahooFinanceSpout.java
- PriceCutOffBolt.java
- YahooFinanceStorm.java
Ứng dụng có thể được tạo bằng lệnh sau:
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java
Ứng dụng có thể được chạy bằng lệnh sau:
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm
Đầu ra
Đầu ra sẽ tương tự như sau:
GOOGL : false
AAPL : false
INTC : true