Une API asynchrone avec Gearman, Sinatra et mongoID
Même si Ruby on Rails facilite énormément la création d’une API REST il y a un grand intérêt à séparer le site frontal de l’API. Le site peut subir des montées en charge ou une coupure de maintenance qui occasionnerait de fait une interruption de l’API. Or s’il peut être gênant que le frontal soit interrompu, ça l’est encore plus pour l’API qui permet de mettre à jour des données ou de les fournir à une multitude de clients tiers. Dans ce billet je présente sans aller trop loin dans les détails, la création de cette API.
Sinatra est un framework web léger non MVC qui permet via son DSL de répondre à toutes les méthodes HTTP (DELETE, PUT,POST,GET) simplement et en utilisant un seul fichier Ruby. L’introduction démontre la plupart de ses capacités. C’est à mon avis idéal pour servir une API plutôt que de sortir l’artillerie lourde Rails.
MongoID est un driver Ruby pour la base NoSQL mongoDB. Il est en concurrence avec MongoMapper.
Gearman est un serveur qui fait parti des innombrables serveur de job. En effet depuis ces dernières années un nouveau serveur sort tous les 6 mois ou presque avec plus ou moins de succès. Parmi ceux-ci on peut citer Beanstalkd ou RabbitMQ, il existe un « vieux » (1 an) comparatif non exhaustif de quelques serveurs de job Choosing a message queue for Python on Ubuntu on a VPS. Pour ajouter au trouble on peut trouver parmi eux des serveurs NoSQL type clé/valeur comme redis avec le plugin Resque.
Je travaille depuis quelques mois sur mon projet Nodecast et je souhaitais rendre l’API asynchrone. Le principe du projet est qu’un client desktop extrait régulièrement des informations systèmes et les transmet au site web. Un serveur Sinatra expose l’API et enregistre les données dans MongoDB. Le problème est que si de plus en plus clients viennent pusher leurs données sur le serveur Sinatra, celui-ci aura forcément de plus en plus de mal à encaisser la charge. Avec une architecture de ce type entre la requête HTTP POST du client et la réponse du serveur, tout fonctionne de manière synchrone, de fait si le serveur Sinatra atteins son pallier il ne sera plus en mesure de répondre à de nouvelles requêtes. Il est nécessaire que l’échange entre le client et le serveur Sinatra soit le plus rapide possible.
Pour cela il est essentiel de découpler l’interaction entre le serveur et le travail effectué. Pour comprendre cela imaginons que le client ne transmette pas d’informations systèmes mais carrément une vidéo que le serveur devra par exemple ré-encoder. Il n’est pas concevable que lorsque le client (desktop ou browser) a fini de poster la vidéo, l’utilisateur doive encore attendre la fin du réencodage du film avant d’avoir la main. Ce travail de ré-encodage doit s’effectuer dans un 2ème temps.
La première règle à définir est que l’échange entre le client et le serveur doit s’effectuer le plus vite possible, c’est un échange du type « fire and forget ». Pour cela lorsque le client desktop poste les informations, le serveur ne doit renvoyer qu’un acknowledge et aucune autre information permettant au client de savoir si le travail s’est bien effectué. Car si c’était le cas on reviendrait à un fonctionnement synchrone … Cependant rien empêche le serveur de renvoyer au client un identifiant unique afin de lui permettre par la suite de connaitre via une autre requête l’état de du travail envoyé.
Pour rendre cela possible il est nécessaire de séparer le code qui réceptionne la requête du code qui va traiter la demande. Il manque un tampon entre les deux, car le code qui va traiter la demande, le worker, peut être justement occupé à en traiter une. L’intérêt d’un serveur de job (appelé aussi serveur de file de message) est de stocker le job et le transmettre au worker lorsque celui-ci en fait la demande. Dans l’industrie il existe le protocole AMQP que RabbitMQ implémente. Je n’ai malheureusement pas réussi à faire lancer ce dernier sur mon serveur Ubuntu 10.04. J’ai découvert Gearman qui est presque aussi simple d’usage sur Beanstalkd tout en proposant des capacités de montée en charge, un worker pouvant s’abonner à plusieurs serveurs Gearman. De plus cet outil est développé par Danga pour LiveJournal, voir leur présentation sur le backend de LiveJournal : Behind the Scenes at LiveJournal: Scaling Storytime.
Sinatra expose l’API, réceptionne les demandes des clients, les transmet aussitôt à Gearman dans une file nommée en fonction du job (update,insert,…) puis renvoie aussitôt un ack au client desktop pour lui confirmer que sa requête est prise en compte. Sinatra est donc rapidement disponible à réceptionner une nouvelle requête. Le worker est abonné aux files d’attente dont il a la charge, il supprime le job du serveur afin de le traiter. Enfin Gearman offre la possibilité au worker de retourner au client l’état du travail effectué. Le client, ici Sinatra, dispose de ces 3 méthodes :
Il peut ainsi stocker dans un fichier de log le statut du worker.
Voici le code du client Sinatra lancé par Thin :
#!/usr/bin/env ruby
require 'rubygems'
require 'gearman'
require "bundler"
Bundler.setup
Bundler.require(:default)
RAILS_ENV="development"
Gearman::Util.debug = true
SERVERS = ['localhost:4730']
logger = Logger.new('log/server.log', 'daily')
logger.debug("Created logger")
File.open(File.join('../config/database.mongo.yml'), 'r') do |f|
@settings = YAML.load(f)[RAILS_ENV]
end
Mongoid.configure do |config|
name = @settings["database"]
host = @settings["host"]
config.use_object_ids = @settings["use_object_ids"]
logger.info "database : #{name}"
logger.info "host : #{host}"
config.master = Mongo::Connection.new.db(name)
end
require 'models_mongoid/user.rb'
require 'models_mongoid/profil.rb'
require 'models_mongoid/host.rb'
set :logging, true
helpers do
def protected!
unless authorized?
response['WWW-Authenticate'] = %(Basic realm="Nodecast HTTP Auth")
throw(:halt, [401, "Not authorized\n"])
end
end
def authorized?
@auth ||= Rack::Auth::Basic::Request.new(request.env)
# logger.info "AUTH = #{@auth.inspect}"
@current_user = User.where(:email => @auth.credentials.first, :authentication_token => @auth.credentials.last).first
@auth.provided? && @auth.basic? && @current_user
end
end
post '/hosts.xml' do
protected!
xml = Crack::XML.parse(request.body.read)
logger.info("POST")
uuid = UUIDTools::UUID.timestamp_create.to_s
host = {
:user => @current_user.email,
:uuid => uuid,
:datas => xml
}
client = Gearman::Client.new(SERVERS)
taskset = Gearman::Taskset.new
# task = Gearman::Task.new('add', Marshal.dump(xml), :background => true, :poll_status_interval => 1)
task = Gearman::Task.new('add', Marshal.dump(host))
task.on_complete {|d| logger.info "complete : #{d}" }
task.on_warning {|w| logger.info "[client] warn: #{w}" }
task.on_fail {|f| logger.info "[client] calculation failed : #{f}" }
taskset << task
client.run(taskset)
builder do |xml|
xml.instruct!
xml.host do
xml.uuid host[:uuid]
end
end
end
put '/host/update/:id' do
protected!
xml = Crack::XML.parse(request.body.read)
host = {
:user => @current_user.email,
:uuid => params[:id],
:datas => xml
}
logger.info("UPDATE")
client = Gearman::Client.new(SERVERS)
taskset = Gearman::Taskset.new
#task = Gearman::Task.new('update', Marshal.dump(host), :background => true, :poll_status_interval => 1)
task = Gearman::Task.new('update', Marshal.dump(host))
task.on_complete {|d| logger.info "complete : #{d}" }
task.on_warning {|w| logger.info "[client] warn: #{w}" }
task.on_fail {|f| logger.info "[client] calculation failed : #{f}" }
taskset << task
client.run(taskset)
builder do |xml|
xml.instruct!
xml.host do
xml.status "proceed"
end
end
end
et le code du worker lancé par un script daemons
#!/usr/bin/env ruby
require 'rubygems'
gem 'mongoid', '1.9.0'
require 'mongoid'
gem 'gearman-ruby', '2.0.0'
require 'gearman'
require 'uuidtools'
require 'logger'
require 'yaml'
require 'optparse'
require "pp"
options = {}
optparse = OptionParser.new do |opts|
opts.on('-w', '--work WORK', 'path to the work directory') do |work|
options[:work] = work
end
opts.on('-m', '--mongo MONGO', 'path to the mongo file') do |mongo|
options[:mongo] = mongo
end
opts.on('-e', '--env ENV', 'rails environment') do |env|
options[:env] = env
end
end
begin
optparse.parse!
mandatory = [:work, :mongo, :env]
missing = mandatory.select{ |param| options[param].nil? }
if not missing.empty?
puts "Missing options: #{missing.join(', ')}"
puts optparse
exit
end
rescue OptionParser::InvalidOption, OptionParser::MissingArgument
puts $!.to_s
puts optparse
exit
end
puts "Performing task with options: #{options.inspect}"
Gearman::Util.debug = true if options[:env] == "development"
servers = ['localhost:4730']
@@worker = Gearman::Worker.new(servers)
logger = Logger.new("#{options[:work]}/log/worker.log", 'daily')
logger.debug("Created logger")
File.open(File.join("#{options[:mongo]}/database.mongo.yml"), 'r') do |f|
@settings = YAML.load(f)[options[:env]]
end
Mongoid.configure do |config|
name = @settings["database"]
host = @settings["host"]
config.use_object_ids = @settings["use_object_ids"]
logger.info "database : #{name}"
logger.info "host : #{host}"
config.master = Mongo::Connection.new.db(name)
end
require "#{options[:work]}/models_mongoid/user.rb"
require "#{options[:work]}/models_mongoid/profil.rb"
require "#{options[:work]}/models_mongoid/host.rb"
require "#{options[:work]}/models_mongoid/osystem.rb"
require "#{options[:work]}/models_mongoid/load_statistic.rb"
require "#{options[:work]}/models_mongoid/memory_statistic.rb"
require "#{options[:work]}/models_mongoid/uptime_statistic.rb"
require "#{options[:work]}/models_mongoid/network_statistic.rb"
require "#{options[:work]}/models_mongoid/cpu_statistic.rb"
########## JOB ADD HOST ############
@@worker.add_ability('add') do |data,job|
dump = Marshal.load(data)
xml = dump[:datas]
@current_user = User.where(:email => dump[:user]).first
p "user = #{@current_user.inspect}"
push_host = {
:uuid => dump[:uuid],
:public => xml['host']['public'],
s_version => xml['host']['version'],
:patch_level => xml['host']['patch_level'].nil? ? "" : xml['host']['patch_level'].downcase,
:cpu_vendor => xml['host']['cpu_vendor'],
:architecture => xml['host']['architecture'],
:cpu_model => xml['host']['cpu_model'],
:cpu_mhz => xml['host']['cpu_mhz'],
:cpu_cache_size => xml['host']['cpu_cache_size'],
:cpu_number => xml['host']['cpu_number'],
:cpu_total_cores => xml['host']['cpu_total_cores'],
:cpu_total_sockets => xml['host']['cpu_total_sockets'],
:cpu_cores_per_socket => xml['host']['cpu_cores_per_socket'],
:mem_ram => xml['host']['mem_ram'],
:mem_total => xml['host']['mem_total'],
:hostname => xml['host']['hostname'],
:domain_name => xml['host']['domain_name'],
:default_gateway => xml['host']['default_gateway'],
:primary_dns => xml['host']['primary_dns'],
:secondary_dns => xml['host']['secondary_dns'],
:primary_interface => xml['host']['primary_interface'],
:primary_addr => xml['host']['primary_addr'],
:user_id => @current_user.id
}
host_uptime = {
:time => xml['host']['uptime_time'],
:days => xml['host']['uptime_days']
}
host_load = {
:loadavg0 => xml['host']['loadavg0'],
:loadavg1 => xml['host']['loadavg1'],
:loadavg2 => xml['host']['loadavg2']
}
host_mem = {
:mem_used => xml['host']['mem_used'],
:mem_free => xml['host']['mem_free'],
:mem_actual_free => xml['host']['mem_actual_free'],
:mem_actual_used => xml['host']['mem_actual_used'],
:mem_actual_free_percent => xml['host']['mem_actual_free_percent'],
:mem_actual_used_percent => xml['host']['mem_actual_used_percent'],
:swap_total => xml['host']['swap_total'],
:swap_used => xml['host']['swap_used'],
:swap_free => xml['host']['swap_free'],
:swap_page_in => xml['host']['swap_page_in'],
:swap_page_out => xml['host']['swap_page_out']
}
host_network = {
:rx_rate => xml['host']['rx_rate'],
:tx_rate => xml['host']['tx_rate']
}
host_cpu = {
:user => xml['host']['cpu_user'],
:sys => xml['host']['cpu_sys'],
:nice => xml['host']['cpu_nice'],
:idle => xml['host']['cpu_idle'],
:wait => xml['host']['cpu_wait'],
:irq => xml['host']['cpu_irq'],
:soft_irq => xml['host']['cpu_soft_irq'],
:stolen => xml['host']['cpu_stolen'],
:combined => xml['host']['cpu_combined'],
:total => xml['host']['cpu_total']
}
begin
profil = @current_user.profils.where(:context => xml['host']['profil']).first
if !profil
profil = @current_user.profils.create(:context => xml['host']['profil'])
end
@host = profil.hosts.create(push_host)
osystem = Osystem.where(:vendor => xml['host']['vendor'].downcase, :vendor_version => xml['host']['vendor_version'].downcase).first
if !osystem
osystem = Osystem.create(
:name => xml['host']['name'].downcase,
:vendor => xml['host']['vendor'].downcase,
:vendor_version => xml['host']['vendor_version'].downcase,
:vendor_code_name => xml['host']['vendor_code_name'].nil? ? "" : xml['host']['vendor_code_name'].downcase,
:description => xml['host']['description'],
s_base => xml['host']['os_base'],
s_type => xml['host']['os_type']
)
end
@host.osystem_id = osystem.id
osystem.hosts_number += 1
osystem.save
@host.save
if xml['host']['activated_memory'] == "true"
@host.memory_statistics.create(host_mem)
end
if xml['host']['activated_load'] == "true"
@host.load_statistics.create(host_load)
end
if xml['host']['activated_uptime'] == "true"
@host.uptime_statistics.create(host_uptime)
end
if xml['host']['activated_network'] == "true"
@host.network_statistics.create(host_network)
end
if xml['host']['activated_cpu'] == "true"
@host.cpu_statistics.create(host_cpu)
end
rescue => e
logger.info "error on create host : #{e}"
end
@current_user.hosts_number += 1
@current_user.save
logger.info "created"
end
########## JOB UPDATE HOST ############
@@worker.add_ability('update') do |data,job|
dump = Marshal.load(data)
xml = dump[:datas]
@current_user = User.where(:email => dump[:user]).first
@host = Host.where(:uuid => dump[:uuid], :blocked => false).first
if !@host
logger.info "host unknown"
elsif @current_user.hosts.include?(@host)
# os = Osystem.find(:all, :conditions => "lower(name) = '#{xml['host'][:os_name].downcase}' AND lower(codename) LIKE '%#{xml['host'][:os_codename].downcase}%' AND version = '#{xml['host'][:os_release]}' AND computer_architecture = '#{xml['host'][:architecture]}'")
host_uptime = {
:time => xml['host']['uptime_time'],
:days => xml['host']['uptime_days']
}
host_load = {
:loadavg0 => xml['host']['loadavg0'],
:loadavg1 => xml['host']['loadavg1'],
:loadavg2 => xml['host']['loadavg2']
}
host_mem = {
:mem_used => xml['host']['mem_used'],
:mem_free => xml['host']['mem_free'],
:mem_actual_free => xml['host']['mem_actual_free'],
:mem_actual_used => xml['host']['mem_actual_used'],
:mem_actual_free_percent => xml['host']['mem_actual_free_percent'],
:mem_actual_used_percent => xml['host']['mem_actual_used_percent'],
:swap_total => xml['host']['swap_total'],
:swap_used => xml['host']['swap_used'],
:swap_free => xml['host']['swap_free'],
:swap_page_in => xml['host']['swap_page_in'],
:swap_page_out => xml['host']['swap_page_out']
}
host_network = {
:rx_rate => xml['host']['rx_rate'],
:tx_rate => xml['host']['tx_rate']
}
host_cpu = {
:user => xml['host']['cpu_user'],
:sys => xml['host']['cpu_sys'],
:nice => xml['host']['cpu_nice'],
:idle => xml['host']['cpu_idle'],
:wait => xml['host']['cpu_wait'],
:irq => xml['host']['cpu_irq'],
:soft_irq => xml['host']['cpu_soft_irq'],
:stolen => xml['host']['cpu_stolen'],
:combined => xml['host']['cpu_combined'],
:total => xml['host']['cpu_total']
}
begin
if xml['host']['profil'] != @host.profil.context
profil = @current_user.profils.where(:context => xml['host']['profil']).first
if !profil
profil = @current_user.profils.create(:context => xml['host']['profil'])
end
@host.profil = profil
@host.save
end
if xml['host']['activated_memory'] == "true"
mem = @host.memory_statistics.create(host_mem)
end
if xml['host']['activated_load'] == "true"
@host.load_statistics.create(host_load)
end
if xml['host']['activated_uptime'] == "true"
@host.uptime_statistics.create(host_uptime)
end
if xml['host']['activated_network'] == "true"
@host.network_statistics.create(host_network)
end
if xml['host']['activated_cpu'] == "true"
@host.cpu_statistics.create(host_cpu)
end
logger.info "updated"
rescue => e
logger.info "failed on update : #{e}"
raise Exception.new("failed on update : #{e}")
end
else
logger.info "unauthorized update"
raise Exception.new("unauthorized update")
end
end
@@worker.work
















