Apache Storm trong Yahoo! Tài chính

Yahoo! Finance là trang web dữ liệu tài chính và tin tức kinh doanh hàng đầu trên Internet. Nó là một phần của Yahoo! và cung cấp thông tin về tin tức tài chính, thống kê thị trường, dữ liệu thị trường quốc tế và các thông tin khác về nguồn tài chính mà bất kỳ ai cũng có thể truy cập.

Nếu bạn là người đăng ký Yahoo! người dùng, sau đó bạn có thể tùy chỉnh Yahoo! Tài chính để tận dụng các dịch vụ nhất định của nó. Yahoo! API tài chính được sử dụng để truy vấn dữ liệu tài chính từ Yahoo!

API này hiển thị dữ liệu bị trễ 15 phút so với thời gian thực và cập nhật cơ sở dữ liệu của nó sau mỗi 1 phút, để truy cập thông tin liên quan đến chứng khoán hiện tại. Bây giờ chúng ta hãy xem một kịch bản thời gian thực của một công ty và xem cách tăng cảnh báo khi giá trị cổ phiếu của công ty đó xuống dưới 100.

Tạo vòi

Mục đích của vòi là để lấy các chi tiết của ty và phát ra các giá vào bu lông. Bạn có thể sử dụng mã chương trình sau để tạo vòi.

Mã hóa: YahooFinanceSpout.java

import java.util.*;
import java.io.*;
import java.math.BigDecimal;

//import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

public class YahooFinanceSpout implements IRichSpout {
   private SpoutOutputCollector collector;
   private boolean completed = false;
   private TopologyContext context;
	
   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      try {
         Stock stock = YahooFinance.get("INTC");
         BigDecimal price = stock.getQuote().getPrice();

         this.collector.emit(new Values("INTC", price.doubleValue()));
         stock = YahooFinance.get("GOOGL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("GOOGL", price.doubleValue()));
         stock = YahooFinance.get("AAPL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("AAPL", price.doubleValue()));
      } catch(Exception e) {}
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("company", "price"));
   }

   @Override
   public void close() {}
	
   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Tạo bu lông

Ở đây, mục đích của bolt là xử lý giá của công ty nhất định khi giá giảm xuống dưới 100. Nó sử dụng đối tượng Bản đồ Java để đặt cảnh báo giới hạn giá cắt như truekhi giá cổ phiếu giảm xuống dưới 100; nếu không thì sai. Mã chương trình hoàn chỉnh như sau:

Mã hóa: PriceCutOffBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class PriceCutOffBolt implements IRichBolt {
   Map<String, Integer> cutOffMap;
   Map<String, Boolean> resultMap;
	
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.cutOffMap = new HashMap <String, Integer>();
      this.cutOffMap.put("INTC", 100);
      this.cutOffMap.put("AAPL", 100);
      this.cutOffMap.put("GOOGL", 100);

      this.resultMap = new HashMap<String, Boolean>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String company = tuple.getString(0);
      Double price = tuple.getDouble(1);

      if(this.cutOffMap.containsKey(company)){
         Integer cutOffPrice = this.cutOffMap.get(company);

         if(price < cutOffPrice) {
            this.resultMap.put(company, true);
         } else {
            this.resultMap.put(company, false);
         }
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("cut_off_price"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Gửi một cấu trúc liên kết

Đây là ứng dụng chính nơi YahooFinanceSpout.java và PriceCutOffBolt.java được kết nối với nhau và tạo ra cấu trúc liên kết. Mã chương trình sau đây cho thấy cách bạn có thể gửi một cấu trúc liên kết.

Mã hóa: YahooFinanceStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class YahooFinanceStorm {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

      builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
         .fieldsGrouping("yahoo-finance-spout", new Fields("company"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Xây dựng và chạy ứng dụng

Ứng dụng hoàn chỉnh có ba mã Java. Chúng như sau:

  • YahooFinanceSpout.java
  • PriceCutOffBolt.java
  • YahooFinanceStorm.java

Ứng dụng có thể được tạo bằng lệnh sau:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java

Ứng dụng có thể được chạy bằng lệnh sau:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm

Đầu ra

Đầu ra sẽ tương tự như sau:

GOOGL : false
AAPL : false
INTC : true