Apache Storm di Yahoo! Keuangan

Yahoo! Keuangan adalah situs web berita bisnis dan data keuangan terkemuka di Internet. Ini adalah bagian dari Yahoo! dan memberikan informasi tentang berita keuangan, statistik pasar, data pasar internasional, dan informasi lain tentang sumber daya keuangan yang dapat diakses siapa saja.

Jika Anda adalah Yahoo! pengguna, lalu Anda dapat menyesuaikan Yahoo! Keuangan untuk memanfaatkan penawaran tertentu. Yahoo! Finance API digunakan untuk menanyakan data keuangan dari Yahoo!

API ini menampilkan data yang tertunda selama 15 menit dari waktu nyata, dan memperbarui database-nya setiap 1 menit, untuk mengakses informasi terkait saham saat ini. Sekarang mari kita ambil skenario waktu nyata dari sebuah perusahaan dan lihat bagaimana cara meningkatkan peringatan ketika nilai sahamnya turun di bawah 100.

Pembuatan Cerat

Tujuan cerat adalah untuk mendapatkan detail perusahaan dan mengirimkan harga ke baut. Anda dapat menggunakan kode program berikut untuk membuat cerat.

Coding: YahooFinanceSpout.java

import java.util.*;
import java.io.*;
import java.math.BigDecimal;

//import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

public class YahooFinanceSpout implements IRichSpout {
   private SpoutOutputCollector collector;
   private boolean completed = false;
   private TopologyContext context;
	
   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      try {
         Stock stock = YahooFinance.get("INTC");
         BigDecimal price = stock.getQuote().getPrice();

         this.collector.emit(new Values("INTC", price.doubleValue()));
         stock = YahooFinance.get("GOOGL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("GOOGL", price.doubleValue()));
         stock = YahooFinance.get("AAPL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("AAPL", price.doubleValue()));
      } catch(Exception e) {}
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("company", "price"));
   }

   @Override
   public void close() {}
	
   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Pembuatan Baut

Di sini tujuan baut adalah untuk memproses harga perusahaan tertentu ketika harga jatuh di bawah 100. Ini menggunakan objek Peta Jawa untuk mengatur peringatan batas harga cutoff sebagai truesaat harga saham turun di bawah 100; jika tidak salah. Kode program lengkapnya adalah sebagai berikut -

Coding: PriceCutOffBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class PriceCutOffBolt implements IRichBolt {
   Map<String, Integer> cutOffMap;
   Map<String, Boolean> resultMap;
	
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.cutOffMap = new HashMap <String, Integer>();
      this.cutOffMap.put("INTC", 100);
      this.cutOffMap.put("AAPL", 100);
      this.cutOffMap.put("GOOGL", 100);

      this.resultMap = new HashMap<String, Boolean>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String company = tuple.getString(0);
      Double price = tuple.getDouble(1);

      if(this.cutOffMap.containsKey(company)){
         Integer cutOffPrice = this.cutOffMap.get(company);

         if(price < cutOffPrice) {
            this.resultMap.put(company, true);
         } else {
            this.resultMap.put(company, false);
         }
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("cut_off_price"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Mengirimkan Topologi

Ini adalah aplikasi utama tempat YahooFinanceSpout.java dan PriceCutOffBolt.java terhubung bersama dan menghasilkan topologi. Kode program berikut menunjukkan bagaimana Anda dapat mengirimkan topologi.

Pengkodean: YahooFinanceStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class YahooFinanceStorm {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

      builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
         .fieldsGrouping("yahoo-finance-spout", new Fields("company"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Membangun dan Menjalankan Aplikasi

Aplikasi lengkap memiliki tiga kode Java. Mereka adalah sebagai berikut -

  • YahooFinanceSpout.java
  • PriceCutOffBolt.java
  • YahooFinanceStorm.java

Aplikasi dapat dibangun menggunakan perintah berikut -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java

Aplikasi dapat dijalankan menggunakan perintah berikut -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm

Keluaran

Outputnya akan mirip dengan berikut -

GOOGL : false
AAPL : false
INTC : true