rabbitmq

Neden gerekli?
Ölçeklenebilir bir web ortamı oluşturmak için anlık yapılmayan (asenkron) işlemler çok önemlidir. Bir mesaj kuyruğu imlementasyonu ile, işlemleri sıraya sokup, sırası geldiği zaman işlenmesini sağlayabilirsiniz.Mesajları bir sıraya sokup işlem yaptırmaya aslında yabancı değiliz; bir çoğumuz kendi “mesaj kuyruğu” implementasyonunu biraz kötü bir şekilde yapmıştır. Örneğin, kullanıcılara gönderilecek e-postaları bir veritabanına(MySQL?) yazıp, , cron job’lar ile belli zaman aralıklarında bu veritabanından bir veriyi çekip, e-postayı gönderdikten sonra bu ilgili kaydı veritabanından silme işlemini yapmışızdır.Ancak burada bazı sorunlarımız var:1) Veritabanları bu iş için değil (yaz, belli aralıklarla oku ‘select * from queue where completed=0 limit 1’, sil)2) Anlık istek sayınız (concurrency) arttıkça, “işin mesajını”i veritabanına “yazmak”, işi tamamlamaktan daha uzun zaman alabiliyor3) “Ölçekleme”den bashedince, veritabanına “yazma” işlemini ölçeklemeniz, gerçekten çok zor.4) Üstelik, bu mesaj kuyruğunu implemente etmek, test etmek ve performansını düzeltmek için uzunca bir zaman harcamanız gerekebiliyor.RabbitMQ bunların hepsini sizin için yapıyor.Mesaj Kuyruğu
Mesaj kuyruğu (veya “enterprise service bus” ) çözümleri, kurumsal pazarda sıkça kullanılan ve pastanın büyük bir payının IBM,Tibco gibi firmaların elinde bulunduğu bir pazardı. 2004 yılında başlayan ve 2006’da olgunlaşan açık standartlı AMQP çözümü ile “mesaj kuyruğu” çözümleri bir standart ile açık hale geldi.OpenAMQ, Apache QPid ve Red Hat Enterprise MRG gibi kimi kurumsal kimi ise açık çözümler piyasaya çıktı. Bunların arasından RabbitMQ ise, ölçeklenebilirliği, kararlılığı ve basitliği ile ön plana çıktı.Erlang ile geliştirilen bu uygulama, çoğu Erlang uygılamasında olduğu gibi threade-safe bir yapıya sahip ve kolayca ölçeklenebiliyor.

rabbitmq yapisi

Bilmeniz gereken bir kaç terim
consumer : Kuyruğu dinleyen uygulama
publisher : Kuyruğa mesaj gönderen uygulama
VirtualHost : VirtualHost’lar, genelde yetki yönetimi için kullanılır, Exchange ve Queue’lar virtualhost’lar içinde tanımlanır
Exchange : Mesajı ilgili “routing key”e göre ilgili queue’ya yönlendiren bölüm
Queue : Mesajların son olarak düştüğü kuyruk
Exchange type : Gelen mesajın, “routing key”e göre hangi queue’ya “nasıl” gönderileceğini belirtir
Kurulum
RabbitMQ kurulumu
RabbitMQ’nun sitesinde Linux ve Windows için hazırlanmış paketler bulunmaktaDebian testing kullanıcıları

apt-get install rabbitmq-serve

komutu ile uygulamayı kurabilirler. Sunucunun 5672 portundan başladığından emin olun 🙂İstemci kütüphaneleri kurulumu
AMQP protokolü için hazırlanmış bir çok istemci bulunmakta. Biz bu örnekte PHP ve Python istemcilerini kullanacağız.- Python kütüphanesiLinux kullanıcıları aşağıdaki komutlarla py-amqplib kütüphanesini kurabilir

wget -c http://barryp.org/static/software/download/py-amqplib/0.6/amqplib-0.6.tgztar -zxvf amqplib-0.6.tgzcd amqplib-0.6python setup.py instal

PHP KütüphanesiSVN deposundaki kodları çekerek, oluşturacağınız PHP dosyanızı koyacağınız dizinden erişilebilecek bir dizine kopyalayın

svn checkout http://php-amqplib.googlecode.com/svn/trunk/ php-amqplib-read-only

Örnek işlem
Sisteme bir kullanıcı kaydolduğu zaman, onun bulunduğu yere yakın kullanıcılara e-posta gönderen bir uygulama yazacak olalım. Yapmamız gereken işlemler şunlar:* Veritabanından kaydolan kullanıcının bulunduğu yeri al* Veritabanından, bu bölgeye yakın diğer kullanıcıları bul* Bulunan kullanıcılara e-posta gönderVe bu işlemin 10 saniye civarında sürdüğünü düşünün. Normalde bu işlemi “anlık” yapmaya kalkarsak, her kayıt olan kullanıcı kayıt sırasında, aslında onu çok da ilgilendirmeyen bir işlem için 10 saniye beklemek zorunda kalacak. Aslında bu tam “kuyruğa gönderilecek” bir iş.Yazacağımız bir “publisher” ile, çalışmakta olan RabbitMQ sunucumuza “X kullanıcısı kayıt oldu, gerekli işlemler yapılmalı” tarzında bir mesaj göndermemiz ve işlemleri yapacak sorumlu uygulamanın (consumer), sırası gelince bu işi yapmasını sağlayabiliriz. Ve biz mesajı gönderdiğikten sonra kullanıcının bu işlemi beklemesine gerek kalmayacaktır.
Aşağıda PHP ile yazılmış bir “publisher” ve “Python” ile yazılmış bir consumer bu işlemleri yapıyor.consumer.py “””Kullanici kayit islemlerinden gelen mesajlari dinleyen,mesaj geldiginde ilgili kullanicinin bulundugu yeri veritabanindan cekme,kullanicinin bulundugu bu yere yakin diger kullanicilari cekme,ve bulunan diger kullanicilara e-posta gonderme islemlerinisimule eden kodhttp://barryp.org/software/py-amqplib/ adresinden py-amqplib’i indirip kurmalisiniz”””from amqplib import client_0_8 as amqpimport time # islemlerin surdugu zamanlari simule etmek icinclass rabbitMQMailQueue:# yapilandirma degiskenleridef __init__(self):# rabbitMQ ayarlariself.rabbitMQConf = {“hostPort” : “localhost:5672″,”user” : “guest”,”pass” : “guest”,”virtualHost” : “/”}# kuyrugun ayarlariself.queueConf = {“name” : “eposta”,”exchange” : “kayit”}# kuyrugu dinle ve gelen her mesajda doJobs methodunu calistirdef listen(self):try:# amqp’ye baglanconn = amqp.Connection(self.rabbitMQConf[‘hostPort’],self.rabbitMQConf[‘user’],self.rabbitMQConf[‘pass’],virtualHost = self.rabbitMQConf[‘virtualHost’],insist = False)chan = conn.channel()chan.queue_declare( self.queueConf[‘name’],durable=True, # kuyruk yeniden basladiginda yeniden yaratilsinexclusive=False,auto_delete=False) # son islemden sonra kuyrugu silmechan.exchange_declare(self.queueConf[‘exchange’],type=”direct”, # direkt olarak ilgili kuyruga gonderdurable=True,auto_delete=False,)# queue ile exchange’i baglachan.queue_bind(self.queueConf[‘name’], self.queueConf[‘exchange’])# islemden sonra cagirilacak methodu ayarlachan.basic_consume(self.queueConf[‘name’],no_ack=True,callback=self.doJobs) # ilgili methodu calistir# kuyrugu dinlemeye baslawhile True:chan.wait()chan.close()conn.close()except Exception, error :print “Bir hata olustu : “,error# kuyruktan her mesaj geldiginde ilgili kullaniciya ait asagidaki islemleri yapdef doJobs(self,msg):print “Kuyruktan bir mesaj alindi”userId = msg.bodyprint userId, ” id’li kullanicinin bolgesi bulunuyor”time.sleep(1) # islemin 1 saniye surdugunu varsayalimprint userId, ” id’li kullanicinin bulundugu yere yakin kullanicilar bulunuyor”time.sleep(6) # islemin 6 saniye surdugunu varsayalimprint “Bulunan kullanicilara e-posta gonderiliyor”time.sleep(3) # e-posta gonderme isleminin 3 saniye surdugunu varsayalim# toplamda islemler 10 saniye suruyor# bu islem bittikten sonra kuyruktan yeni mesaj aliniyor# uygulamayi baslatif __name__ == ‘__main__’:mailQueue = rabbitMQMailQueue()mailQueue.listen() publisher.php rabbitMQConf = array(“host”=>”localhost”,”port”=>”5672″,”user”=>”guest”,”pass”=>”guest”,”virtualHost”=>”/”);// kuyruk ayarlari$this->queueConf = array(“exchange”=>”kayit”);}// kullanici id’sini mesaj olarak gonderfunction send($userId) {try{// amqp’ye baglan$conn = new AMQPConnection($this->rabbitMQConf[‘host’],$this->rabbitMQConf[‘port’],$this->rabbitMQConf[‘user’],$this->rabbitMQConf[‘pass’]);$channel = $conn->channel();$channel->access_request($this->rabbitMQConf[‘virtualHost’], false, false, true, true);// mesajı gönder$msg = new AMQPMessage($userId, array(‘content_type’ => ‘text/plain’));$channel->basic_publish($msg, $this->queueConf[‘exchange’]);$channel->close();$conn->close();return true;}catch(Exception $e) {echo “Bir hata oluştu “.$e->getMessage();}}}$messageSender = new sendRegisterMessageToQueue();$userId = 777;if ( $messageSender->send($userId ) ) {echo “Kullanicinin yakinindaki insanlara e-posta gonderme mesaji iletildi”;} Kaynaklar:http://www.rabbitmq.com/http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/