Airflow & Spark & Java ile Job
Herkese merhaba. Bu makalede göz aşinalığı kadar Airflow, ana konu olarak, Java ile yazdığımız Spark joblar hakkında ve bu jobları yazarken karşılaşıp çözdüğümüz ufak sorunlardan bahsedeceğim.
Airflow
Apache Airflow, iş akışlarını programlı olarak yazmak, planlamak ve izlemek için açık kaynaklı bir araçtır. İş akışlarını düzenlemek için veri mühendisleri tarafından kullanılan platformlardan biridir. Veri ardışık düzenlerinizin bağımlılıklarını, ilerlemesini, günlüklerini, kodunu, cron görevlerini ve başarı durumunu kolayca görselleştirebilirsiniz.
— > DAG
Airflow üzerindeki süreçler DAG (Directed Acyclic Graph) olarak ifade edilir. Cron olarak belirleyebildiğiniz zaman aralıklarına ve bir çok ayara göre bir DAG bildirebilirsiniz. DAG da belirtilenlere göre iş yapılır ve geçmiş iş bilgileri tutulur. Bizim DAG da belirttiğimiz iş ise Java ile yazdığımız Spark jobları oluyor.
Yukarıdaki resim de basit bir DAG de geçmişi görüyoruz. 11 kere, belirlediğim crona göre tetiklenmiş iş durum geçmişi görülüyor.
Bu 11 kere tektiklendiğini gördüğümüz iş durumlarının birinin üstüne geldiğimizde üstteki resimde gözüktüğü gibi çalışma zamanı ile ilgili bazı bilgilerin bulunduğu bir kart açılıyor.
Airflowun en beğendiğimiz özelliği ise bu belli zamanlarda tetiklenen işlerin zaman kavramları hepsinin kendine göre ayrı işleniyor oluşudur. Bunu daha da açacak olursak;
Run: 2021–07–05, 05:00:00 tarihli iş özelinde düşünürsek bu tarihi java kodumuz içerisinde main(String[] args) olarak String tipinde args adında tek boyutlu bir dizi içinde alabiliriz. Job 1 saatte bir çalışıyorsa mantıken yapacağınız işe en başta belirtilen tarihte ve 04:00:00–05:00:00 saatleri arasındaki data ile yapacağızdır. Bu tarih kapsamında startDate ve endDate olarak datayı çeker işlemimizi yapar ve bitiririz. Fakat bir hata durumunda job ilerleyen saatlerde çalışmaya devam eder fakat geçmişte hata alan zaman için spesifik bir tarih vermek için bir şeyler yapmak gerekir. Bu noktada zaman kavramları hepsinin kendine göre ayrı işleniyor olmasından dolayı fail eden iş tekrar çalıştırılırsa args içerisinde jobın tekrar çalıştığı anın tarihi değil aslında geçmişte çalışması gerektiği anın tarihi gelecektir. Bu da tabiki spesifik tarih belirterek jobı tekrar çalıştırma işini çok basite indirgemiş olur.
Spark
Spark, big data dünyasında çokça kullanılan ve büyük ölçekli veri işleme için birleşik analiz motoru olarak tanımlanır. Scala, Python veya Java ile yazabiliriz. Bizim ekibimizdeki geliştiriciler Java konusunda daha deneyimli olduğu için biz de Java ile yazmayı tercih ettik.
Java ile yazacağımız Spark da bir main(String[] args) altında bir SparkSession oluşturmak gerekli. Bizim Elasticsearch veya RDBMS den verileri aldığımız durumlar mevcut. Bu iki farklı veri çektiğimiz job içinde 2 farklı SparkSession oluşturmak durumda kaldık.
—> RDBMS
RDBMS ile verileri aldığımız SparkSession oluşturmak görüldüğü gibi nispeten kısa. Çünkü RDBMS bağlantısı adımını temp görüntü oluştururken girmemiz gerekiyor.
Oluşturduğumuz SparkSession ile datayı çekmek için belirli option bilgilerini girmemiz gerekiyor. Önemli ayrıntı, dbtable option bilgisi bizim RDBMS de veriyi almak istediğimiz tablonun adı ve en son createOrReplaceTempView() da belirttiğimiz isim ise temp olarak oluşturduğumuz ve sorgularımızda kullanacağımız isim.
Temp görüntüyü oluşturduktan sonra örnek bir sorgu örneği oluşturdum. Oluşturduğumuz bu sorgu da FROM kısmı için daha önceden belirlediğimiz temp görüntü isimini kullandık. SELECT veya WHERE kısmında ise orjinal tablomuzun kolonlarının adını kullanıyoruz. En başta oluşturduğumuz SparkSession ile sorguyu çalıştırıp List<Row> olarak çekiyoruz. Buradan sonra bu List içerisinde döngü ile gezip row.getAs(“id”) şeklinde seçili kolonun ismi belirtilen field değeri alınıyor olacak. Buradan sonraki kısımda job içerisinde bu veri ile ne yapacağınız sizin iş akışınıza bağlıdır.
— > Elasticsearch
Elasticsearch için SparkSession oluşturmak, RDBMS için olan SparkSession farklı olarak direk Elasticsearch config bilgilerini de girmeyi gerektiriyor.
SparkSession oluşturduktan sonra veriyi almak için bu sefer elastic query oluşturmak ve çalıştırmak gerekli. Yukarıda gördüğümüz şekilde JavaSparkContext oluşturup bunun ile birlikte query + index ismiyle (örnekte:spark_example) çalıştırarak JavaEsSpark.esRDD() ile JavaPairRDD tipinde verileri elde ediyoruz.
JavaPairRDD olarak veriyi aldıktan sonra partition belirleyerek döngü ile veri listenin içerisinde gezebiliriz. Burada önemli nokta RabbitTemplate örneğinde olduğu gibi yeni instance, partition içerisinde oluşturulmalıdır. Partition dışında oluşturulup içerisinde kullanmaya çalışılırsa hata alırsınız. Buna dikkat etmek gereklidir.
Özet
Airflow, ana konu olan Spark için bizim kullandığımız bir teknoloji. En azından hafızalarda ufak bir yer edinmesinin zararının bulunmayacağını düşündüm. https://airflow.apache.org/docs/apache-airflow/stable/index.html linkinden daha ayrıntılı inceleyebilirsiniz.
Spark konusu üzerinde ise ana uğraş noktası veriyi çekmekti. Örneklerde, RDBMS ve Elasticsearch den nasıl veriyi elde edeceğimizi parçalanmış şekilde ayrıntılı olarak ele aldık. https://spark.apache.org/docs/latest/quick-start.html linkinden daha ayrıntılı inceleyebilirsiniz.
Tüm örneklerin java file olarak bulunduğu şekilde Github da bir repo oluşturup gönderdim. https://github.com/MuratSensei/code-examples/tree/master/spark linkinde örnekteki kodları daha rahat okuyabilirsiniz.