Apache Flume - คู่มือฉบับย่อ

Flume คืออะไร?

Apache Flume เป็นเครื่องมือ / บริการ / กลไกการนำเข้าข้อมูลสำหรับการรวบรวมการรวมและการขนส่งข้อมูลสตรีมมิ่งจำนวนมากเช่นไฟล์บันทึกเหตุการณ์ (ฯลฯ ... ) จากแหล่งต่างๆไปยังที่เก็บข้อมูลส่วนกลาง

Flume เป็นเครื่องมือที่เชื่อถือได้กระจายและกำหนดค่าได้สูง ได้รับการออกแบบมาโดยเฉพาะเพื่อคัดลอกข้อมูลสตรีมมิ่ง (ข้อมูลบันทึก) จากเว็บเซิร์ฟเวอร์ต่างๆไปยัง HDFS

การใช้งาน Flume

สมมติว่าเว็บแอปพลิเคชันอีคอมเมิร์ซต้องการวิเคราะห์พฤติกรรมของลูกค้าจากภูมิภาคใดภูมิภาคหนึ่ง ในการทำเช่นนั้นพวกเขาจะต้องย้ายข้อมูลบันทึกที่มีอยู่ไปยัง Hadoop เพื่อทำการวิเคราะห์ ที่นี่ Apache Flume มาช่วยเรา

Flume ใช้เพื่อย้ายข้อมูลบันทึกที่สร้างโดยแอ็พพลิเคชันเซิร์ฟเวอร์ไปยัง HDFS ด้วยความเร็วที่สูงขึ้น

ข้อดีของ Flume

นี่คือข้อดีของการใช้ Flume -

  • การใช้ Apache Flume เราสามารถจัดเก็บข้อมูลไปยังร้านค้าส่วนกลาง (HBase, HDFS)

  • เมื่ออัตราของข้อมูลขาเข้าเกินอัตราที่ข้อมูลสามารถเขียนไปยังปลายทางได้ Flume จะทำหน้าที่เป็นสื่อกลางระหว่างผู้ผลิตข้อมูลและร้านค้าส่วนกลางและให้ข้อมูลที่ไหลสม่ำเสมอระหว่างกัน

  • Flume มีคุณสมบัติของ contextual routing.

  • ธุรกรรมใน Flume เป็นแบบตามช่องทางที่มีการดูแลธุรกรรมสองรายการ (ผู้ส่งหนึ่งรายและผู้รับหนึ่งราย) สำหรับแต่ละข้อความ รับประกันการส่งข้อความที่เชื่อถือได้

  • Flume มีความน่าเชื่อถือทนต่อความผิดพลาดปรับขนาดได้จัดการและปรับแต่งได้

คุณสมบัติของ Flume

คุณสมบัติเด่นบางประการของ Flume มีดังนี้ -

  • Flume นำเข้าข้อมูลบันทึกจากเว็บเซิร์ฟเวอร์หลายเครื่องไปยังร้านค้าส่วนกลาง (HDFS, HBase) อย่างมีประสิทธิภาพ

  • เมื่อใช้ Flume เราสามารถรับข้อมูลจากเซิร์ฟเวอร์หลายเครื่องเข้าสู่ Hadoop ได้ทันที

  • นอกจากไฟล์บันทึกแล้ว Flume ยังใช้เพื่อนำเข้าข้อมูลเหตุการณ์จำนวนมากที่ผลิตโดยเว็บไซต์เครือข่ายสังคมเช่น Facebook และ Twitter และเว็บไซต์อีคอมเมิร์ซเช่น Amazon และ Flipkart

  • Flume รองรับแหล่งที่มาและประเภทปลายทางจำนวนมาก

  • Flume รองรับกระแส multi-hop กระแส fan-in fan-out การกำหนดเส้นทางตามบริบท ฯลฯ

  • Flume สามารถปรับขนาดได้ในแนวนอน

Big Data,อย่างที่เราทราบกันดีว่าเป็นชุดข้อมูลขนาดใหญ่ที่ไม่สามารถประมวลผลโดยใช้เทคนิคการคำนวณแบบเดิมได้ เมื่อวิเคราะห์ข้อมูลขนาดใหญ่จะให้ผลลัพธ์ที่มีคุณค่าHadoop เป็นกรอบงานโอเพ่นซอร์สที่อนุญาตให้จัดเก็บและประมวลผลข้อมูลขนาดใหญ่ในสภาพแวดล้อมแบบกระจายทั่วกลุ่มของคอมพิวเตอร์โดยใช้แบบจำลองการเขียนโปรแกรมอย่างง่าย

ข้อมูลการสตรีม / บันทึก

โดยทั่วไปข้อมูลส่วนใหญ่ที่จะวิเคราะห์จะสร้างโดยแหล่งข้อมูลต่างๆเช่นเซิร์ฟเวอร์แอปพลิเคชันไซต์เครือข่ายสังคมเซิร์ฟเวอร์คลาวด์และเซิร์ฟเวอร์ขององค์กร ข้อมูลนี้จะอยู่ในรูปของlog files และ events.

Log file - โดยทั่วไปไฟล์บันทึกคือไฟล์ fileที่แสดงรายการเหตุการณ์ / การกระทำที่เกิดขึ้นในระบบปฏิบัติการ ตัวอย่างเช่นเว็บเซิร์ฟเวอร์จะแสดงรายการคำขอทั้งหมดที่ส่งไปยังเซิร์ฟเวอร์ในล็อกไฟล์

ในการรวบรวมข้อมูลบันทึกดังกล่าวเราสามารถรับข้อมูลเกี่ยวกับ -

  • ประสิทธิภาพของแอปพลิเคชันและค้นหาความล้มเหลวของซอฟต์แวร์และฮาร์ดแวร์ต่างๆ
  • พฤติกรรมของผู้ใช้และได้รับข้อมูลเชิงลึกทางธุรกิจที่ดีขึ้น

วิธีการดั้งเดิมในการถ่ายโอนข้อมูลไปยังระบบ HDFS คือการใช้ไฟล์ putคำสั่ง ให้เราดูวิธีการใช้put คำสั่ง

HDFS ใส่คำสั่ง

ความท้าทายหลักในการจัดการข้อมูลบันทึกคือการย้ายบันทึกเหล่านี้ที่สร้างโดยเซิร์ฟเวอร์หลายเครื่องไปยังสภาพแวดล้อม Hadoop

Hadoop File System Shellให้คำสั่งเพื่อแทรกข้อมูลลงใน Hadoop และอ่านจากข้อมูลนั้น คุณสามารถแทรกข้อมูลลงใน Hadoop โดยใช้ไฟล์put คำสั่งดังที่แสดงด้านล่าง

$ Hadoop fs –put /path of the required file  /path in HDFS where to save the file

ปัญหาเกี่ยวกับการใส่คำสั่ง

เราสามารถใช้ไฟล์ putคำสั่ง Hadoop เพื่อถ่ายโอนข้อมูลจากแหล่งเหล่านี้ไปยัง HDFS แต่ต้องทนทุกข์ทรมานจากข้อเสียดังต่อไปนี้ -

  • การใช้ put คำสั่งเราสามารถถ่ายโอน only one file at a timeในขณะที่ตัวสร้างข้อมูลสร้างข้อมูลในอัตราที่สูงกว่ามาก เนื่องจากการวิเคราะห์ข้อมูลเก่ามีความแม่นยำน้อยกว่าเราจึงจำเป็นต้องมีโซลูชันในการถ่ายโอนข้อมูลแบบเรียลไทม์

  • ถ้าเราใช้ putคำสั่งข้อมูลเป็นสิ่งจำเป็นในการบรรจุและควรพร้อมสำหรับการอัปโหลด เนื่องจากเว็บเซิร์ฟเวอร์สร้างข้อมูลอย่างต่อเนื่องจึงเป็นงานที่ยากมาก

สิ่งที่เราต้องการต่อไปนี้คือวิธีแก้ปัญหาที่สามารถเอาชนะข้อบกพร่องได้ put สั่งและโอน "ข้อมูลสตรีม" จากเครื่องสร้างข้อมูลไปยังร้านค้าส่วนกลาง (โดยเฉพาะ HDFS) โดยมีความล่าช้าน้อยกว่า

ปัญหากับ HDFS

ใน HDFS ไฟล์จะอยู่ในรายการไดเร็กทอรีและความยาวของไฟล์จะถูกพิจารณาว่าเป็นศูนย์จนกว่าจะปิด ตัวอย่างเช่นหากแหล่งข้อมูลกำลังเขียนข้อมูลลงใน HDFS และเครือข่ายถูกขัดจังหวะระหว่างการดำเนินการ (โดยไม่ต้องปิดไฟล์) ข้อมูลที่เขียนในไฟล์จะสูญหายไป

ดังนั้นเราจึงจำเป็นต้องมีระบบที่เชื่อถือได้กำหนดค่าได้และบำรุงรักษาได้เพื่อถ่ายโอนข้อมูลบันทึกไปยัง HDFS

Note- ในระบบไฟล์ POSIX เมื่อใดก็ตามที่เราเข้าถึงไฟล์ (พูดว่ากำลังดำเนินการเขียน) โปรแกรมอื่น ๆ ยังคงสามารถอ่านไฟล์นี้ได้ (อย่างน้อยส่วนที่บันทึกไว้ของไฟล์) เนื่องจากมีไฟล์อยู่ในแผ่นดิสก์ก่อนที่จะปิด

โซลูชั่นที่มีอยู่

ในการส่งข้อมูลสตรีมมิ่ง (ไฟล์บันทึกเหตุการณ์ ฯลฯ .. ,) จากแหล่งต่างๆไปยัง HDFS เรามีเครื่องมือต่อไปนี้ให้ใช้งาน -

Scribe ของ Facebook

Scribe เป็นเครื่องมือที่ได้รับความนิยมอย่างมากซึ่งใช้ในการรวบรวมและสตรีมข้อมูลบันทึก ได้รับการออกแบบมาเพื่อปรับขนาดเป็นโหนดจำนวนมากและมีประสิทธิภาพต่อความล้มเหลวของเครือข่ายและโหนด

อาปาเช่คาฟคา

Kafka ได้รับการพัฒนาโดย Apache Software Foundation เป็นโบรกเกอร์ข้อความโอเพ่นซอร์ส การใช้ Kafka เราสามารถจัดการฟีดที่มีปริมาณงานสูงและเวลาแฝงต่ำ

Apache Flume

Apache Flume เป็นเครื่องมือ / บริการ / กลไกการนำเข้าข้อมูลสำหรับการรวบรวมการรวมและการขนส่งข้อมูลสตรีมมิ่งจำนวนมากเช่นข้อมูลบันทึกเหตุการณ์ (ฯลฯ ... ) จากเว็บสำรองต่างๆไปยังที่เก็บข้อมูลส่วนกลาง

เป็นเครื่องมือที่มีความน่าเชื่อถือสูงกระจายและกำหนดค่าได้ซึ่งออกแบบมาโดยเฉพาะเพื่อถ่ายโอนข้อมูลสตรีมมิ่งจากแหล่งต่างๆไปยัง HDFS

ในบทช่วยสอนนี้เราจะพูดถึงรายละเอียดวิธีการใช้ Flume พร้อมตัวอย่างบางส่วน

ภาพประกอบต่อไปนี้แสดงให้เห็นถึงสถาปัตยกรรมพื้นฐานของ Flume ดังที่แสดงในภาพประกอบdata generators (เช่น Facebook, Twitter) สร้างข้อมูลที่รวบรวมโดย Flume แต่ละตัว agentsวิ่งบนพวกเขา หลังจากนั้นกdata collector (ซึ่งเป็นตัวแทนเช่นกัน) รวบรวมข้อมูลจากเอเจนต์ซึ่งรวมและส่งไปยังที่เก็บส่วนกลางเช่น HDFS หรือ HBase

เหตุการณ์ Flume

อัน event เป็นหน่วยพื้นฐานของข้อมูลที่ขนส่งภายใน Flume. ประกอบด้วยเพย์โหลดของอาร์เรย์ไบต์ที่จะถูกขนส่งจากต้นทางไปยังปลายทางพร้อมกับส่วนหัวที่เป็นทางเลือก เหตุการณ์ Flume ทั่วไปจะมีโครงสร้างดังต่อไปนี้ -

ตัวแทน Flume

อัน agentเป็นกระบวนการ daemon อิสระ (JVM) ใน Flume รับข้อมูล (เหตุการณ์) จากไคลเอนต์หรือเอเจนต์อื่น ๆ และส่งต่อไปยังปลายทางถัดไป (ซิงก์หรือเอเจนต์) Flume อาจมีตัวแทนมากกว่าหนึ่งคน แผนภาพต่อไปนี้แสดงถึงไฟล์Flume Agent

ดังที่แสดงในแผนภาพ Flume Agent ประกอบด้วยองค์ประกอบหลักสามส่วน ได้แก่ source, channelและ sink.

ที่มา

source เป็นส่วนประกอบของตัวแทนที่รับข้อมูลจากตัวสร้างข้อมูลและถ่ายโอนไปยังหนึ่งช่องทางขึ้นไปในรูปแบบของเหตุการณ์ Flume

Apache Flume รองรับแหล่งที่มาหลายประเภทและแต่ละแหล่งรับเหตุการณ์จากตัวสร้างข้อมูลที่ระบุ

Example - แหล่ง Avro, แหล่ง Thrift, แหล่ง twitter 1% เป็นต้น

ช่อง

channelเป็นร้านค้าชั่วคราวที่รับเหตุการณ์จากแหล่งที่มาและบัฟเฟอร์จนกว่าพวกเขาจะถูกใช้โดยซิงก์ ทำหน้าที่เป็นสะพานเชื่อมระหว่างแหล่งที่มาและอ่างล้างมือ

ช่องเหล่านี้ทำธุรกรรมได้อย่างสมบูรณ์และสามารถทำงานกับแหล่งที่มาและอ่างล้างมือจำนวนเท่าใดก็ได้

Example - ช่อง JDBC, ช่องระบบไฟล์, ช่องหน่วยความจำ ฯลฯ

จม

sinkเก็บข้อมูลไว้ในร้านค้าส่วนกลางเช่น HBase และ HDFS มันใช้ข้อมูล (เหตุการณ์) จากช่องและส่งไปยังปลายทาง ปลายทางของอ่างล้างจานอาจเป็นตัวแทนรายอื่นหรือร้านค้าส่วนกลาง

Example - อ่างล้างจาน HDFS

Note- ตัวแทนฟลูมสามารถมีแหล่งที่มาอ่างล้างมือและช่องได้หลายช่องทาง เราได้แสดงรายการแหล่งที่มาซิงก์ช่องสัญญาณที่รองรับทั้งหมดในบทการกำหนดค่า Flume ของบทช่วยสอนนี้

ส่วนประกอบเพิ่มเติมของ Flume Agent

สิ่งที่เราได้กล่าวถึงข้างต้นคือส่วนประกอบดั้งเดิมของตัวแทน นอกจากนี้เรายังมีส่วนประกอบอีกสองสามอย่างที่มีบทบาทสำคัญในการถ่ายโอนเหตุการณ์จากตัวสร้างข้อมูลไปยังร้านค้าส่วนกลาง

อินเตอร์เซปเตอร์

Interceptors ใช้เพื่อปรับเปลี่ยน / ตรวจสอบเหตุการณ์ flume ที่ถ่ายโอนระหว่างแหล่งที่มาและช่องสัญญาณ

ตัวเลือกช่อง

สิ่งเหล่านี้ใช้เพื่อกำหนดช่องทางที่จะเลือกถ่ายโอนข้อมูลในกรณีที่มีหลายช่องสัญญาณ ตัวเลือกช่องมีสองประเภท -

  • Default channel selectors - สิ่งเหล่านี้เรียกอีกอย่างว่าการจำลองตัวเลือกช่องซึ่งจำลองเหตุการณ์ทั้งหมดในแต่ละช่อง

  • Multiplexing channel selectors - สิ่งเหล่านี้จะตัดสินใจให้ช่องส่งกิจกรรมตามที่อยู่ในส่วนหัวของเหตุการณ์นั้น

โปรเซสเซอร์ Sink

สิ่งเหล่านี้ใช้เพื่อเรียกซิงก์เฉพาะจากกลุ่มซิงก์ที่เลือก สิ่งเหล่านี้ใช้เพื่อสร้างเส้นทางเฟลโอเวอร์สำหรับซิงก์ของคุณหรือเหตุการณ์โหลดบาลานซ์ในซิงก์หลายรายการจากแชนเนล

Flume เป็นเฟรมเวิร์กที่ใช้ในการย้ายข้อมูลบันทึกไปยัง HDFS โดยทั่วไปเหตุการณ์และข้อมูลบันทึกจะถูกสร้างขึ้นโดยเซิร์ฟเวอร์บันทึกและเซิร์ฟเวอร์เหล่านี้มีเอเจนต์ Flume ที่รันอยู่ ตัวแทนเหล่านี้รับข้อมูลจากตัวสร้างข้อมูล

ข้อมูลในเอเจนต์เหล่านี้จะถูกรวบรวมโดยโหนดกลางที่เรียกว่า Collector. เช่นเดียวกับตัวแทนสามารถมีนักสะสมหลายคนใน Flume

สุดท้ายข้อมูลจากตัวรวบรวมทั้งหมดเหล่านี้จะถูกรวบรวมและผลักดันไปยังร้านค้าส่วนกลางเช่น HBase หรือ HDFS แผนภาพต่อไปนี้อธิบายการไหลของข้อมูลใน Flume

การไหลแบบหลายจุด

ภายใน Flume อาจมีตัวแทนหลายคนและก่อนที่จะไปถึงจุดหมายปลายทางสุดท้ายเหตุการณ์อาจเดินทางผ่านตัวแทนมากกว่าหนึ่งคน นี้เรียกว่าmulti-hop flow.

กระแสพัดลมออก

กระแสข้อมูลจากแหล่งหนึ่งไปยังหลายช่องทางเรียกว่า fan-out flow. มีสองประเภท -

  • Replicating - กระแสข้อมูลที่ข้อมูลจะถูกจำลองแบบในช่องที่กำหนดค่าไว้ทั้งหมด

  • Multiplexing - กระแสข้อมูลที่ข้อมูลจะถูกส่งไปยังช่องทางที่เลือกซึ่งระบุไว้ในส่วนหัวของเหตุการณ์

การไหลของพัดลม

กระแสข้อมูลที่ข้อมูลจะถูกถ่ายโอนจากหลายแหล่งไปยังช่องทางเดียวเรียกว่า fan-in flow.

การจัดการความล้มเหลว

ใน Flume สำหรับแต่ละเหตุการณ์จะมีการทำธุรกรรม 2 รายการ: รายการหนึ่งที่ผู้ส่งและอีกรายการที่ผู้รับ ผู้ส่งส่งเหตุการณ์ไปยังผู้รับ ไม่นานหลังจากได้รับข้อมูลผู้รับจะทำธุรกรรมของตนเองและส่งสัญญาณ“ รับ” ไปยังผู้ส่ง หลังจากได้รับสัญญาณผู้ส่งจะทำธุรกรรม (ผู้ส่งจะไม่ทำธุรกรรมจนกว่าจะได้รับสัญญาณจากผู้รับ)

เราได้กล่าวถึงสถาปัตยกรรมของ Flume ไปแล้วในบทที่แล้ว ในบทนี้ให้เราดูวิธีดาวน์โหลดและตั้งค่า Apache Flume

ก่อนดำเนินการต่อคุณต้องมีสภาพแวดล้อม Java ในระบบของคุณ ก่อนอื่นตรวจสอบให้แน่ใจว่าคุณได้ติดตั้ง Java ในระบบของคุณแล้ว สำหรับตัวอย่างบางส่วนในบทช่วยสอนนี้เราได้ใช้ Hadoop HDFS (เป็นอ่างล้างจาน) ดังนั้นเราขอแนะนำให้คุณติดตั้ง Hadoop พร้อมกับ Java เพื่อรวบรวมข้อมูลเพิ่มเติมตามลิงค์ -http://www.tutorialspoint.com/hadoop/hadoop_enviornment_setup.htm

การติดตั้ง Flume

ก่อนอื่นให้ดาวน์โหลดซอฟต์แวร์ Apache Flume เวอร์ชันล่าสุดจากเว็บไซต์ https://flume.apache.org/.

ขั้นตอนที่ 1

เปิดเว็บไซต์ คลิกที่downloadทางด้านซ้ายมือของหน้าแรก จะนำคุณไปยังหน้าดาวน์โหลดของ Apache Flume

ขั้นตอนที่ 2

ในหน้าดาวน์โหลดคุณจะเห็นลิงก์สำหรับไฟล์ไบนารีและซอร์สไฟล์ของ Apache Flume คลิกที่ลิงค์apache-flume-1.6.0-bin.tar.gz

คุณจะถูกเปลี่ยนเส้นทางไปยังรายการมิเรอร์ที่คุณสามารถเริ่มดาวน์โหลดได้โดยคลิกที่มิเรอร์เหล่านี้ ในทำนองเดียวกันคุณสามารถดาวน์โหลดซอร์สโค้ดของ Apache Flume โดยคลิกที่Apache-ฟลูม-1.6.0-src.tar.gz

ขั้นตอนที่ 3

สร้างไดเร็กทอรีที่มีชื่อ Flume ในไดเร็กทอรีเดียวกับที่ไดเร็กทอรีการติดตั้งของ Hadoop, HBaseและซอฟต์แวร์อื่น ๆ ได้รับการติดตั้งแล้ว (หากคุณได้ติดตั้งไว้แล้ว) ดังที่แสดงด้านล่าง

$ mkdir Flume

ขั้นตอนที่ 4

แตกไฟล์ tar ที่ดาวน์โหลดมาดังแสดงด้านล่าง

$ cd Downloads/ 
$ tar zxvf apache-flume-1.6.0-bin.tar.gz  
$ tar zxvf apache-flume-1.6.0-src.tar.gz

ขั้นตอนที่ 5

ย้ายเนื้อหาของ apache-flume-1.6.0-bin.tar ไฟล์ไปยังไฟล์ Flumeไดเร็กทอรีที่สร้างขึ้นก่อนหน้านี้ดังที่แสดงด้านล่าง (สมมติว่าเราได้สร้างไดเร็กทอรี Flume ในผู้ใช้โลคัลชื่อ Hadoop)

$ mv apache-flume-1.6.0-bin.tar/* /home/Hadoop/Flume/

การกำหนดค่า Flume

ในการกำหนดค่า Flume เราต้องแก้ไขไฟล์สามไฟล์คือ flume-env.sh, flumeconf.properties, และ bash.rc.

การตั้งค่า Path / Classpath

ใน .bashrc ตั้งค่าโฮมโฟลเดอร์เส้นทางและคลาสพา ธ สำหรับ Flume ดังที่แสดงด้านล่าง

โฟลเดอร์ conf

หากคุณเปิดไฟล์ conf โฟลเดอร์ของ Apache Flume คุณจะมีสี่ไฟล์ต่อไปนี้ -

  • flume-conf.properties.template,
  • flume-env.sh.template,
  • flume-env.ps1.template และ
  • log4j.properties.

ตอนนี้เปลี่ยนชื่อ

  • flume-conf.properties.template ไฟล์เป็น flume-conf.properties และ

  • flume-env.sh.template เช่น flume-env.sh

flume-env.sh

เปิด flume-env.sh ไฟล์และตั้งค่าไฟล์ JAVA_Home ไปยังโฟลเดอร์ที่ติดตั้ง Java ในระบบของคุณ

ตรวจสอบการติดตั้ง

ตรวจสอบการติดตั้ง Apache Flume โดยเรียกดูไฟล์ bin โฟลเดอร์และพิมพ์คำสั่งต่อไปนี้

$ ./flume-ng

หากคุณติดตั้ง Flume สำเร็จคุณจะได้รับข้อความช่วยเหลือของ Flume ดังที่แสดงด้านล่าง

หลังจากติดตั้ง Flume เราต้องกำหนดค่าโดยใช้ไฟล์กำหนดค่าซึ่งเป็นไฟล์คุณสมบัติ Java ที่มีไฟล์ key-value pairs. เราจำเป็นต้องส่งค่าไปยังคีย์ในไฟล์

ในไฟล์การกำหนดค่า Flume เราจำเป็นต้อง -

  • ตั้งชื่อคอมโพเนนต์ของเอเจนต์ปัจจุบัน
  • อธิบาย / กำหนดค่าแหล่งที่มา
  • อธิบาย / กำหนดค่าอ่างล้างจาน
  • อธิบาย / กำหนดค่าช่อง
  • ผูกแหล่งที่มาและซิงก์กับช่อง

โดยปกติเราสามารถมีตัวแทนได้หลายคนใน Flume เราสามารถแยกความแตกต่างของตัวแทนแต่ละคนได้โดยใช้ชื่อเฉพาะ และการใช้ชื่อนี้เราต้องกำหนดคอนฟิกแต่ละเอเจนต์

การตั้งชื่อส่วนประกอบ

ก่อนอื่นคุณต้องตั้งชื่อ / รายการส่วนประกอบเช่นแหล่งที่มาซิงก์และช่องของตัวแทนดังที่แสดงด้านล่าง

agent_name.sources = source_name 
agent_name.sinks = sink_name 
agent_name.channels = channel_name

Flume รองรับแหล่งที่มาอ่างล้างมือและช่องต่างๆ มีรายชื่ออยู่ในตารางด้านล่าง

แหล่งที่มา ช่อง อ่างล้างมือ
  • แหล่งที่มาของ Avro
  • แหล่งที่มาของ Thrift
  • แหล่งที่มาของ Exec
  • ที่มา JMS
  • แหล่งที่มาของ Spooling Directory
  • ที่มาของ Twitter 1% firehose
  • ที่มา Kafka
  • ที่มา NetCat
  • แหล่งกำเนิดลำดับ
  • แหล่งที่มาของ Syslog
  • ที่มา Syslog TCP
  • ที่มา Multiport Syslog TCP
  • ที่มา Syslog UDP
  • ที่มา HTTP
  • แหล่งที่มาของความเครียด
  • แหล่งที่มาเดิม
  • แหล่งที่มาของ Thrift Legacy
  • แหล่งที่มาที่กำหนดเอง
  • ที่มาอาลักษณ์
  • ช่องหน่วยความจำ
  • ช่อง JDBC
  • คาฟคาช่อง
  • ช่องไฟล์
  • ช่องหน่วยความจำที่รั่วไหลได้
  • ช่องทางการทำธุรกรรมหลอก
  • อ่างล้างจาน HDFS
  • Hive Sink
  • อ่างล้างจาน
  • อ่าง Avro
  • Thrift Sink
  • IRC Sink
  • ไฟล์ม้วนจม
  • Null Sink
  • HBaseSink
  • AsyncHBaseSink
  • MorphlineSolrSink
  • ElasticSearchSink
  • Kite Dataset Sink
  • คาฟคาซิงก์

คุณสามารถใช้สิ่งเหล่านี้ได้ ตัวอย่างเช่นหากคุณกำลังถ่ายโอนข้อมูล Twitter โดยใช้แหล่งที่มาของ Twitter ผ่านช่องหน่วยความจำไปยังอ่างล้างจาน HDFS และรหัสชื่อตัวแทนTwitterAgentแล้ว

TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS

หลังจากแสดงรายการส่วนประกอบของเอเจนต์แล้วคุณต้องอธิบายซอร์สซิงก์และแชนเนลโดยระบุค่าให้กับคุณสมบัติของเอเจนต์

การอธิบายแหล่งที่มา

แต่ละแหล่งจะมีรายการคุณสมบัติแยกกัน คุณสมบัติที่ชื่อว่า“ type” เป็นเรื่องปกติสำหรับทุกแหล่งที่มาและใช้เพื่อระบุประเภทของแหล่งที่มาที่เราใช้

นอกจากคุณสมบัติ "type" แล้วจำเป็นต้องระบุค่าของ required คุณสมบัติของแหล่งเฉพาะเพื่อกำหนดค่าดังที่แสดงด้านล่าง

agent_name.sources. source_name.type = value 
agent_name.sources. source_name.property2 = value 
agent_name.sources. source_name.property3 = value

ตัวอย่างเช่นหากเราพิจารณาไฟล์ twitter sourceต่อไปนี้เป็นคุณสมบัติที่เราต้องระบุค่าเพื่อกำหนดค่า

TwitterAgent.sources.Twitter.type = Twitter (type name) 
TwitterAgent.sources.Twitter.consumerKey =  
TwitterAgent.sources.Twitter.consumerSecret = 
TwitterAgent.sources.Twitter.accessToken =   
TwitterAgent.sources.Twitter.accessTokenSecret =

อธิบายอ่างล้างจาน

เช่นเดียวกับแหล่งที่มาแต่ละอ่างจะมีรายการคุณสมบัติแยกกัน คุณสมบัติที่ชื่อว่า“ type” เป็นสิ่งที่พบได้ทั่วไปในอ่างล้างจานทุกอ่างและใช้เพื่อระบุประเภทของอ่างล้างจานที่เราใช้ นอกจากคุณสมบัติ "type" แล้วจำเป็นต้องระบุค่าให้กับไฟล์required คุณสมบัติของซิงก์เฉพาะเพื่อกำหนดค่าดังที่แสดงด้านล่าง

agent_name.sinks. sink_name.type = value 
agent_name.sinks. sink_name.property2 = value 
agent_name.sinks. sink_name.property3 = value

ตัวอย่างเช่นหากเราพิจารณา HDFS sinkต่อไปนี้เป็นคุณสมบัติที่เราต้องระบุค่าเพื่อกำหนดค่า

TwitterAgent.sinks.HDFS.type = hdfs (type name)  
TwitterAgent.sinks.HDFS.hdfs.path = HDFS directory’s Path to store the data

การอธิบายช่อง

Flume มีช่องทางต่างๆในการถ่ายโอนข้อมูลระหว่างแหล่งที่มาและซิงก์ ดังนั้นพร้อมกับแหล่งที่มาและช่องทางจึงจำเป็นต้องอธิบายช่องที่ใช้ในตัวแทน

ในการอธิบายแต่ละช่องคุณต้องตั้งค่าคุณสมบัติที่จำเป็นดังที่แสดงด้านล่าง

agent_name.channels.channel_name.type = value 
agent_name.channels.channel_name. property2 = value 
agent_name.channels.channel_name. property3 = value

ตัวอย่างเช่นหากเราพิจารณา memory channelต่อไปนี้เป็นคุณสมบัติที่เราต้องระบุค่าเพื่อกำหนดค่า

TwitterAgent.channels.MemChannel.type = memory (type name)

การผูกซอร์สและซิงก์กับแชนเนล

เนื่องจากแชนเนลเชื่อมต่อแหล่งที่มาและซิงก์จึงจำเป็นต้องเชื่อมโยงทั้งสองเข้ากับแชนเนลดังที่แสดงด้านล่าง

agent_name.sources.source_name.channels = channel_name 
agent_name.sinks.sink_name.channels = channel_name

ตัวอย่างต่อไปนี้แสดงวิธีผูกซอร์สและซิงก์กับแชนเนล ที่นี่เราพิจารณาtwitter source, memory channel, และ HDFS sink.

TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channels = MemChannel

การเริ่มต้น Flume Agent

หลังจากกำหนดค่าเราต้องเริ่มตัวแทน Flume ทำได้ดังนี้ -

$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf 
Dflume.root.logger=DEBUG,console -n TwitterAgent

ที่ไหน -

  • agent - คำสั่งเพื่อเริ่มตัวแทน Flume

  • --conf ,-c<conf> - ใช้ไฟล์กำหนดค่าในไดเรกทอรี conf

  • -f<file> - ระบุพา ธ ไฟล์กำหนดค่าหากขาดหายไป

  • --name, -n <name> - ชื่อตัวแทน twitter

  • -D property =value - ตั้งค่าคุณสมบัติระบบ Java

เมื่อใช้ Flume เราสามารถดึงข้อมูลจากบริการต่างๆและส่งไปยังร้านค้าส่วนกลาง (HDFS และ HBase) บทนี้อธิบายถึงวิธีการดึงข้อมูลจากบริการ Twitter และจัดเก็บใน HDFS โดยใช้ Apache Flume

ตามที่กล่าวไว้ใน Flume Architecture เว็บเซิร์ฟเวอร์จะสร้างข้อมูลบันทึกและข้อมูลนี้ถูกรวบรวมโดยตัวแทนใน Flume ช่องจะบัฟเฟอร์ข้อมูลนี้ไปยังซิงก์ซึ่งสุดท้ายจะส่งไปยังร้านค้าส่วนกลาง

ในตัวอย่างที่ให้ไว้ในบทนี้เราจะสร้างแอปพลิเคชันและรับทวีตจากแอปพลิเคชันโดยใช้แหล่งข้อมูลทวิตเตอร์ทดลองที่จัดทำโดย Apache Flume เราจะใช้ช่องหน่วยความจำเพื่อบัฟเฟอร์ทวีตเหล่านี้และ HDFS sink เพื่อดันทวีตเหล่านี้ไปยัง HDFS

ในการดึงข้อมูล Twitter เราจะต้องทำตามขั้นตอนด้านล่าง -

  • สร้างแอปพลิเคชัน twitter
  • ติดตั้ง / เริ่ม HDFS
  • กำหนดค่า Flume

การสร้างแอปพลิเคชัน Twitter

ในการรับทวีตจาก Twitter จำเป็นต้องสร้างแอปพลิเคชัน Twitter ทำตามขั้นตอนด้านล่างเพื่อสร้างแอปพลิเคชัน Twitter

ขั้นตอนที่ 1

ในการสร้างแอปพลิเคชัน Twitter คลิกที่ลิงค์ต่อไปนี้ https://apps.twitter.com/. ลงชื่อเข้าใช้บัญชี Twitter ของคุณ คุณจะมีหน้าต่างการจัดการแอปพลิเคชัน Twitter ซึ่งคุณสามารถสร้างลบและจัดการแอป Twitter ได้

ขั้นตอนที่ 2

คลิกที่ Create New Appปุ่ม. คุณจะถูกนำไปยังหน้าต่างที่คุณจะได้รับแบบฟอร์มใบสมัครที่คุณต้องกรอกรายละเอียดของคุณเพื่อสร้างแอพ ขณะกรอกที่อยู่เว็บไซต์ให้ระบุรูปแบบ URL ที่สมบูรณ์ตัวอย่างเช่นhttp://example.com.

ขั้นตอนที่ 3

กรอกรายละเอียดยอมรับ Developer Agreement เมื่อเสร็จแล้วให้คลิกที่ไฟล์ Create your Twitter application buttonซึ่งอยู่ด้านล่างสุดของหน้า หากทุกอย่างเรียบร้อยดีแอพจะถูกสร้างขึ้นพร้อมรายละเอียดดังที่แสดงด้านล่าง

ขั้นตอนที่ 4

ภายใต้ keys and Access Tokens ที่ด้านล่างของหน้าคุณจะสังเกตเห็นปุ่มชื่อ Create my access token. คลิกเพื่อสร้างโทเค็นการเข้าถึง

ขั้นตอนที่ 5

สุดท้ายคลิกที่ไฟล์ Test OAuthซึ่งอยู่ทางด้านขวาบนของหน้า สิ่งนี้จะนำไปสู่หน้าที่แสดงไฟล์Consumer key, Consumer secret, Access token, และ Access token secret. คัดลอกรายละเอียดเหล่านี้ สิ่งเหล่านี้มีประโยชน์ในการกำหนดค่าเอเจนต์ใน Flume

การเริ่ม HDFS

เนื่องจากเรากำลังจัดเก็บข้อมูลใน HDFS เราจึงต้องติดตั้ง / ตรวจสอบ Hadoop เริ่ม Hadoop และสร้างโฟลเดอร์ในโฟลเดอร์เพื่อจัดเก็บข้อมูล Flume ทำตามขั้นตอนด้านล่างก่อนกำหนดค่า Flume

ขั้นตอนที่ 1: ติดตั้ง / ตรวจสอบ Hadoop

ติดตั้งHadoop หากมีการติดตั้ง Hadoop ในระบบของคุณแล้วให้ตรวจสอบการติดตั้งโดยใช้คำสั่ง Hadoop version ดังที่แสดงด้านล่าง

$ hadoop version

หากระบบของคุณมี Hadoop และหากคุณตั้งค่าตัวแปรพา ธ คุณจะได้ผลลัพธ์ต่อไปนี้ -

Hadoop 2.6.0 
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 
Compiled by jenkins on 2014-11-13T21:10Z 
Compiled with protoc 2.5.0 
From source with checksum 18e43357c8f927c0695f1e9522859d6a 
This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar

ขั้นตอนที่ 2: เริ่ม Hadoop

เรียกดูไฟล์ sbin ไดเร็กทอรีของ Hadoop และ start yarn และ Hadoop dfs (ระบบไฟล์แบบกระจาย) ดังที่แสดงด้านล่าง

cd /$Hadoop_Home/sbin/ 
$ start-dfs.sh 
localhost: starting namenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out 
localhost: starting datanode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out 
Starting secondary namenodes [0.0.0.0] 
starting secondarynamenode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out
  
$ start-yarn.sh 
starting yarn daemons 
starting resourcemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out 
localhost: starting nodemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out

ขั้นตอนที่ 3: สร้างไดเรกทอรีใน HDFS

ใน Hadoop DFS คุณสามารถสร้างไดเร็กทอรีโดยใช้คำสั่ง mkdir. เรียกดูและสร้างไดเร็กทอรีที่มีชื่อtwitter_data ในเส้นทางที่ต้องการดังแสดงด้านล่าง

$cd /$Hadoop_Home/bin/ 
$ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data

การกำหนดค่า Flume

เราต้องกำหนดค่าซอร์สช่องและซิงก์โดยใช้ไฟล์กำหนดค่าในไฟล์ confโฟลเดอร์ ตัวอย่างที่ให้ไว้ในบทนี้ใช้แหล่งข้อมูลการทดลองที่จัดทำโดย Apache Flume ที่ชื่อTwitter 1% Firehose ช่องหน่วยความจำและอ่างล้างจาน HDFS

แหล่งที่มาของ Firehose Twitter 1%

แหล่งข้อมูลนี้มีการทดลองสูง มันเชื่อมต่อกับ Twitter Firehose ตัวอย่าง 1% โดยใช้ API การสตรีมและดาวน์โหลดทวีตอย่างต่อเนื่องแปลงเป็นรูปแบบ Avro และส่งเหตุการณ์ Avro ไปยัง Flume sink แบบดาวน์สตรีม

เราจะได้รับแหล่งข้อมูลนี้ตามค่าเริ่มต้นพร้อมกับการติดตั้ง Flume jar ไฟล์ที่เกี่ยวข้องกับแหล่งข้อมูลนี้สามารถอยู่ในไฟล์ lib ตามที่แสดงด้านล่าง

การตั้งค่า classpath

ตั้งค่า classpath ตัวแปรเป็น lib โฟลเดอร์ของ Flume in Flume-env.sh ไฟล์ดังที่แสดงด้านล่าง

export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/*

แหล่งข้อมูลนี้ต้องการรายละเอียดเช่น Consumer key, Consumer secret, Access token, และ Access token secretของแอปพลิเคชัน Twitter ขณะกำหนดค่าแหล่งที่มานี้คุณต้องระบุค่าให้กับคุณสมบัติต่อไปนี้ -

  • Channels

  • Source type : org.apache.flume.source.twitter.TwitterSource

  • consumerKey - รหัสผู้ใช้ OAuth

  • consumerSecret - ความลับของผู้บริโภค OAuth

  • accessToken - โทเค็นการเข้าถึง OAuth

  • accessTokenSecret - ความลับโทเค็น OAuth

  • maxBatchSize- จำนวนข้อความทวิตเตอร์สูงสุดที่ควรอยู่ในชุดทวิตเตอร์ ค่าเริ่มต้นคือ 1000 (ไม่บังคับ)

  • maxBatchDurationMillis- จำนวนมิลลิวินาทีสูงสุดที่ต้องรอก่อนปิดชุดงาน ค่าเริ่มต้นคือ 1000 (ไม่บังคับ)

ช่อง

เรากำลังใช้ช่องหน่วยความจำ ในการกำหนดค่าช่องหน่วยความจำคุณต้องระบุค่าให้กับประเภทของช่องสัญญาณ

  • type- ถือประเภทของช่อง ในตัวอย่างของเราประเภทคือMemChannel.

  • Capacity- เป็นจำนวนเหตุการณ์สูงสุดที่จัดเก็บไว้ในช่อง ค่าเริ่มต้นคือ 100 (ไม่บังคับ)

  • TransactionCapacity- เป็นจำนวนเหตุการณ์สูงสุดที่ช่องยอมรับหรือส่ง ค่าเริ่มต้นคือ 100 (ไม่บังคับ)

อ่างล้างจาน HDFS

ซิงก์นี้เขียนข้อมูลลงใน HDFS ในการกำหนดค่าซิงก์นี้คุณต้องระบุรายละเอียดต่อไปนี้

  • Channel

  • type - hdfs

  • hdfs.path - เส้นทางของไดเร็กทอรีใน HDFS ที่จะจัดเก็บข้อมูล

และเราสามารถระบุค่าทางเลือกบางอย่างตามสถานการณ์ได้ ด้านล่างนี้เป็นคุณสมบัติเสริมของซิงก์ HDFS ที่เรากำลังกำหนดค่าในแอปพลิเคชันของเรา

  • fileType - นี่คือรูปแบบไฟล์ที่จำเป็นสำหรับไฟล์ HDFS ของเรา SequenceFile, DataStream และ CompressedStreamสตรีมนี้มีสามประเภท ในตัวอย่างของเราเรากำลังใช้ไฟล์DataStream.

  • writeFormat - อาจเป็นข้อความหรือเขียนได้

  • batchSize- เป็นจำนวนเหตุการณ์ที่เขียนลงในไฟล์ก่อนที่จะถูกล้างเข้าสู่ HDFS ค่าเริ่มต้นคือ 100

  • rollsize- เป็นขนาดไฟล์ที่จะทำให้เกิดการม้วน ค่าเริ่มต้นคือ 100

  • rollCount- เป็นจำนวนเหตุการณ์ที่เขียนลงในไฟล์ก่อนที่จะถูกรีด ค่าเริ่มต้นคือ 10

ตัวอย่าง - ไฟล์กำหนดค่า

ด้านล่างเป็นตัวอย่างของไฟล์กำหนดค่า คัดลอกเนื้อหานี้และบันทึกเป็นtwitter.conf ในโฟลเดอร์ conf ของ Flume

# Naming the components on the current agent. 
TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS
  
# Describing/Configuring the source 
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret 
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token 
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret 
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql
  
# Describing/Configuring the sink 

TwitterAgent.sinks.HDFS.type = hdfs 
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream 
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text 
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 
 
# Describing/Configuring the channel 
TwitterAgent.channels.MemChannel.type = memory 
TwitterAgent.channels.MemChannel.capacity = 10000 
TwitterAgent.channels.MemChannel.transactionCapacity = 100
  
# Binding the source and sink to the channel 
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

การดำเนินการ

เรียกดูโฮมไดเร็กทอรี Flume และเรียกใช้แอพพลิเคชั่นดังที่แสดงด้านล่าง

$ cd $FLUME_HOME 
$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf 
Dflume.root.logger=DEBUG,console -n TwitterAgent

หากทุกอย่างเรียบร้อยดีการสตรีมทวีตไปยัง HDFS จะเริ่มขึ้น ด้านล่างนี้คือภาพรวมของหน้าต่างพรอมต์คำสั่งขณะเรียกทวีต

กำลังตรวจสอบ HDFS

คุณสามารถเข้าถึง Hadoop Administration Web UI โดยใช้ URL ที่ระบุด้านล่าง

http://localhost:50070/

คลิกที่ดรอปดาวน์ชื่อ Utilitiesทางด้านขวามือของหน้า คุณสามารถเห็นสองตัวเลือกดังที่แสดงในภาพรวมด้านล่าง

คลิกที่ Browse the file systemและป้อนเส้นทางของไดเร็กทอรี HDFS ที่คุณเก็บทวีตไว้ ในตัวอย่างของเราเส้นทางจะเป็น/user/Hadoop/twitter_data/. จากนั้นคุณสามารถดูรายการไฟล์บันทึกของ twitter ที่เก็บไว้ใน HDFS ตามที่ระบุด้านล่าง

ในบทที่แล้วเราได้เห็นวิธีการดึงข้อมูลจากแหล่งที่มาของ twitter ไปยัง HDFS บทนี้จะอธิบายถึงวิธีการดึงข้อมูลจากSequence generator.

ข้อกำหนดเบื้องต้น

ในการเรียกใช้ตัวอย่างที่ให้ไว้ในบทนี้คุณต้องติดตั้ง HDFS พร้อมด้วย Flume. ดังนั้นให้ตรวจสอบการติดตั้ง Hadoop และเริ่ม HDFS ก่อนดำเนินการต่อ (ดูบทก่อนหน้าเพื่อเรียนรู้วิธีการเริ่ม HDFS)

การกำหนดค่า Flume

เราต้องกำหนดค่าซอร์สช่องและซิงก์โดยใช้ไฟล์กำหนดค่าในไฟล์ confโฟลเดอร์ ตัวอย่างที่ให้ไว้ในบทนี้ใช้ไฟล์sequence generator source, ก memory channelและ HDFS sink.

แหล่งกำเนิดลำดับ

เป็นแหล่งที่ทำให้เกิดเหตุการณ์อย่างต่อเนื่อง รักษาตัวนับที่เริ่มจาก 0 และเพิ่มทีละ 1 ใช้เพื่อการทดสอบ ขณะกำหนดค่าแหล่งที่มานี้คุณต้องระบุค่าให้กับคุณสมบัติต่อไปนี้ -

  • Channels

  • Source type - seq

ช่อง

เรากำลังใช้ไฟล์ memoryช่อง ในการกำหนดค่าช่องหน่วยความจำคุณต้องระบุค่าให้กับประเภทของช่องสัญญาณ ด้านล่างนี้เป็นรายการคุณสมบัติที่คุณต้องจัดหาในขณะกำหนดค่าช่องหน่วยความจำ -

  • type- ถือประเภทของช่อง ในตัวอย่างของเราประเภทคือ MemChannel

  • Capacity- เป็นจำนวนเหตุการณ์สูงสุดที่จัดเก็บไว้ในช่อง ค่าเริ่มต้นคือ 100 (ไม่บังคับ)

  • TransactionCapacity- เป็นจำนวนเหตุการณ์สูงสุดที่ช่องยอมรับหรือส่ง ค่าเริ่มต้นคือ 100 (ไม่บังคับ)

อ่างล้างจาน HDFS

ซิงก์นี้เขียนข้อมูลลงใน HDFS ในการกำหนดค่าซิงก์นี้คุณต้องระบุรายละเอียดต่อไปนี้

  • Channel

  • type - hdfs

  • hdfs.path - เส้นทางของไดเร็กทอรีใน HDFS ที่จะจัดเก็บข้อมูล

และเราสามารถระบุค่าทางเลือกบางอย่างตามสถานการณ์ได้ ด้านล่างนี้เป็นคุณสมบัติเสริมของซิงก์ HDFS ที่เรากำลังกำหนดค่าในแอปพลิเคชันของเรา

  • fileType - นี่คือรูปแบบไฟล์ที่จำเป็นสำหรับไฟล์ HDFS ของเรา SequenceFile, DataStream และ CompressedStreamสตรีมนี้มีสามประเภท ในตัวอย่างของเราเรากำลังใช้ไฟล์DataStream.

  • writeFormat - อาจเป็นข้อความหรือเขียนได้

  • batchSize- เป็นจำนวนเหตุการณ์ที่เขียนลงในไฟล์ก่อนที่จะถูกล้างเข้าสู่ HDFS ค่าเริ่มต้นคือ 100

  • rollsize- เป็นขนาดไฟล์ที่จะทำให้เกิดการม้วน ค่าเริ่มต้นคือ 100

  • rollCount- เป็นจำนวนเหตุการณ์ที่เขียนลงในไฟล์ก่อนที่จะถูกรีด ค่าเริ่มต้นคือ 10

ตัวอย่าง - ไฟล์กำหนดค่า

ด้านล่างเป็นตัวอย่างของไฟล์กำหนดค่า คัดลอกเนื้อหานี้และบันทึกเป็นseq_gen .conf ในโฟลเดอร์ conf ของ Flume

# Naming the components on the current agent 

SeqGenAgent.sources = SeqSource   
SeqGenAgent.channels = MemChannel 
SeqGenAgent.sinks = HDFS 
 
# Describing/Configuring the source 
SeqGenAgent.sources.SeqSource.type = seq
  
# Describing/Configuring the sink
SeqGenAgent.sinks.HDFS.type = hdfs 
SeqGenAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/seqgen_data/
SeqGenAgent.sinks.HDFS.hdfs.filePrefix = log 
SeqGenAgent.sinks.HDFS.hdfs.rollInterval = 0
SeqGenAgent.sinks.HDFS.hdfs.rollCount = 10000
SeqGenAgent.sinks.HDFS.hdfs.fileType = DataStream 
 
# Describing/Configuring the channel 
SeqGenAgent.channels.MemChannel.type = memory 
SeqGenAgent.channels.MemChannel.capacity = 1000 
SeqGenAgent.channels.MemChannel.transactionCapacity = 100 
 
# Binding the source and sink to the channel 
SeqGenAgent.sources.SeqSource.channels = MemChannel
SeqGenAgent.sinks.HDFS.channel = MemChannel

การดำเนินการ

เรียกดูโฮมไดเร็กทอรี Flume และเรียกใช้แอพพลิเคชั่นดังที่แสดงด้านล่าง

$ cd $FLUME_HOME 
$./bin/flume-ng agent --conf $FLUME_CONF --conf-file $FLUME_CONF/seq_gen.conf 
   --name SeqGenAgent

หากทุกอย่างเรียบร้อยดีแหล่งที่มาจะเริ่มสร้างหมายเลขลำดับซึ่งจะถูกผลักเข้าไปใน HDFS ในรูปแบบของไฟล์บันทึก

ด้านล่างนี้เป็นภาพรวมของหน้าต่างพรอมต์คำสั่งที่ดึงข้อมูลที่สร้างโดยตัวสร้างลำดับลงใน HDFS

การตรวจสอบ HDFS

คุณสามารถเข้าถึง Hadoop Administration Web UI โดยใช้ URL ต่อไปนี้ -

http://localhost:50070/

คลิกที่ดรอปดาวน์ชื่อ Utilitiesทางด้านขวามือของหน้า คุณสามารถดูสองตัวเลือกดังแสดงในแผนภาพด้านล่าง

คลิกที่ Browse the file system และป้อนเส้นทางของไดเร็กทอรี HDFS ที่คุณเก็บข้อมูลที่สร้างโดยตัวสร้างลำดับ

ในตัวอย่างของเราเส้นทางจะเป็น /user/Hadoop/ seqgen_data /. จากนั้นคุณสามารถดูรายการไฟล์บันทึกที่สร้างโดยตัวสร้างลำดับซึ่งเก็บไว้ใน HDFS ตามที่ระบุด้านล่าง

การตรวจสอบเนื้อหาของไฟล์

ไฟล์บันทึกทั้งหมดนี้มีตัวเลขในรูปแบบลำดับ คุณสามารถตรวจสอบเนื้อหาของไฟล์เหล่านี้ในระบบไฟล์โดยใช้cat คำสั่งดังที่แสดงด้านล่าง

บทนี้เป็นตัวอย่างเพื่ออธิบายว่าคุณสามารถสร้างเหตุการณ์ได้อย่างไรจากนั้นจึงบันทึกเหตุการณ์เหล่านั้นลงในคอนโซล สำหรับสิ่งนี้เรากำลังใช้ไฟล์NetCat แหล่งที่มาและ logger จม.

ข้อกำหนดเบื้องต้น

ในการเรียกใช้ตัวอย่างที่ให้ไว้ในบทนี้คุณต้องติดตั้ง Flume.

การกำหนดค่า Flume

เราต้องกำหนดค่าซอร์สช่องและซิงก์โดยใช้ไฟล์กำหนดค่าในไฟล์ confโฟลเดอร์ ตัวอย่างที่ให้ไว้ในบทนี้ใช้ไฟล์NetCat Source, Memory channelและก logger sink.

ที่มา NetCat

ในขณะกำหนดคอนฟิกซอร์ส NetCat เราต้องระบุพอร์ตขณะกำหนดคอนฟิกซอร์ส ตอนนี้แหล่งที่มา (แหล่ง NetCat) จะรับฟังพอร์ตที่กำหนดและรับแต่ละบรรทัดที่เราป้อนในพอร์ตนั้นเป็นแต่ละเหตุการณ์และโอนไปยังซิงก์ผ่านช่องทางที่ระบุ

ขณะกำหนดค่าแหล่งที่มานี้คุณต้องระบุค่าให้กับคุณสมบัติต่อไปนี้ -

  • channels

  • Source type - netcat

  • bind - ชื่อโฮสต์หรือที่อยู่ IP ที่จะผูก

  • port - หมายเลขพอร์ตที่เราต้องการให้แหล่งรับฟัง

ช่อง

เรากำลังใช้ไฟล์ memoryช่อง ในการกำหนดค่าช่องหน่วยความจำคุณต้องระบุค่าให้กับประเภทของช่องสัญญาณ ด้านล่างนี้เป็นรายการคุณสมบัติที่คุณต้องจัดหาในขณะกำหนดค่าช่องหน่วยความจำ -

  • type- ถือประเภทของช่อง ในตัวอย่างของเราประเภทคือMemChannel.

  • Capacity- เป็นจำนวนเหตุการณ์สูงสุดที่จัดเก็บไว้ในช่อง ค่าเริ่มต้นคือ 100 (ไม่บังคับ)

  • TransactionCapacity- เป็นจำนวนเหตุการณ์สูงสุดที่ช่องยอมรับหรือส่ง ค่าเริ่มต้นคือ 100 (ไม่บังคับ)

อ่างล้างจาน

ซิงก์นี้บันทึกเหตุการณ์ทั้งหมดที่ส่งผ่านไป โดยทั่วไปจะใช้เพื่อการทดสอบหรือการดีบัก ในการกำหนดค่าซิงก์นี้คุณต้องระบุรายละเอียดต่อไปนี้

  • Channel

  • type - คนตัดไม้

ตัวอย่างไฟล์คอนฟิกูเรชัน

ด้านล่างเป็นตัวอย่างของไฟล์กำหนดค่า คัดลอกเนื้อหานี้และบันทึกเป็นnetcat.conf ในโฟลเดอร์ conf ของ Flume

# Naming the components on the current agent
NetcatAgent.sources = Netcat   
NetcatAgent.channels = MemChannel 
NetcatAgent.sinks = LoggerSink  

# Describing/Configuring the source 
NetcatAgent.sources.Netcat.type = netcat 
NetcatAgent.sources.Netcat.bind = localhost
NetcatAgent.sources.Netcat.port = 56565  

# Describing/Configuring the sink 
NetcatAgent.sinks.LoggerSink.type = logger  

# Describing/Configuring the channel 
NetcatAgent.channels.MemChannel.type = memory 
NetcatAgent.channels.MemChannel.capacity = 1000 
NetcatAgent.channels.MemChannel.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
NetcatAgent.sources.Netcat.channels = MemChannel
NetcatAgent.sinks. LoggerSink.channel = MemChannel

การดำเนินการ

เรียกดูโฮมไดเร็กทอรี Flume และเรียกใช้แอพพลิเคชั่นดังที่แสดงด้านล่าง

$ cd $FLUME_HOME
$ ./bin/flume-ng agent --conf $FLUME_CONF --conf-file $FLUME_CONF/netcat.conf 
   --name NetcatAgent -Dflume.root.logger=INFO,console

หากทุกอย่างเรียบร้อยดีแหล่งที่มาจะเริ่มฟังพอร์ตที่ระบุ ในกรณีนี้ก็คือ56565. ด้านล่างนี้เป็นภาพรวมของหน้าต่างพรอมต์คำสั่งของแหล่งที่มา NetCat ซึ่งเริ่มต้นและรับฟังพอร์ต 56565

การส่งผ่านข้อมูลไปยังแหล่งที่มา

ในการส่งข้อมูลไปยังแหล่งที่มาของ NetCat คุณต้องเปิดพอร์ตที่ระบุในไฟล์คอนฟิกูเรชัน เปิดเทอร์มินัลแยกต่างหากและเชื่อมต่อกับแหล่งสัญญาณ (56565) โดยใช้ไฟล์curlคำสั่ง เมื่อการเชื่อมต่อสำเร็จคุณจะได้รับข้อความ“connected” ตามที่แสดงด้านล่าง

$ curl telnet://localhost:56565 
connected

ตอนนี้คุณสามารถป้อนข้อมูลทีละบรรทัด (หลังจากแต่ละบรรทัดคุณต้องกด Enter) แหล่งที่มา NetCat ได้รับแต่ละบรรทัดเป็นแต่ละเหตุการณ์และคุณจะได้รับข้อความที่ได้รับ "OK”.

เมื่อใดก็ตามที่คุณส่งข้อมูลเสร็จแล้วคุณสามารถออกจากคอนโซลได้โดยกด (Ctrl+C). ด้านล่างนี้เป็นภาพรวมของคอนโซลที่เราเชื่อมต่อกับซอร์สโดยใช้ไฟล์curl คำสั่ง

แต่ละบรรทัดที่ป้อนในคอนโซลด้านบนจะได้รับเป็นเหตุการณ์แต่ละรายการโดยต้นทาง เนื่องจากเราได้ใช้ไฟล์Logger sink เหตุการณ์เหล่านี้จะถูกล็อกออนเข้าสู่คอนโซล (คอนโซลต้นทาง) ผ่านช่องทางที่ระบุ (ช่องหน่วยความจำในกรณีนี้)

สแน็ปช็อตต่อไปนี้แสดงคอนโซล NetCat ที่บันทึกเหตุการณ์