Une API asynchrone avec Gearman, Sinatra et mongoID

Même si Ruby on Rails facilite énormément la création d’une API REST il y a un grand intérêt à séparer le site frontal de l’API. Le site peut subir des montées en charge ou une coupure de maintenance qui occasionnerait de fait une interruption de l’API. Or s’il peut être gênant que le frontal soit interrompu, ça l’est encore plus pour l’API qui permet de mettre à jour des données ou de les fournir à une multitude de clients tiers. Dans ce billet je présente sans aller trop loin dans les détails, la création de cette API.

Sinatra est un framework web léger non MVC qui permet via son DSL de répondre à toutes les méthodes HTTP (DELETE, PUT,POST,GET) simplement et en utilisant un seul fichier Ruby. L’introduction démontre la plupart de ses capacités. C’est à mon avis idéal pour servir une API plutôt que de sortir l’artillerie lourde Rails.

MongoID est un driver Ruby pour la base NoSQL mongoDB. Il est en concurrence avec MongoMapper.

Gearman est un serveur qui fait parti des innombrables serveur de job. En effet depuis ces dernières années un nouveau serveur sort tous les 6 mois ou presque avec plus ou moins de succès. Parmi ceux-ci on peut citer Beanstalkd ou RabbitMQ, il existe un “vieux” (1 an) comparatif non exhaustif de quelques serveurs de job Choosing a message queue for Python on Ubuntu on a VPS. Pour ajouter au trouble on peut trouver parmi eux des serveurs NoSQL type clé/valeur comme redis avec le plugin Resque.

Je travaille depuis quelques mois sur mon projet Nodecast et je souhaitais rendre l’API asynchrone. Le principe du projet est qu’un client desktop extrait régulièrement des informations systèmes et les transmet au site web. Un serveur Sinatra expose l’API et enregistre les données dans MongoDB. Le problème est que si de plus en plus clients viennent pusher leurs données sur le serveur Sinatra, celui-ci aura forcément de plus en plus de mal à encaisser la charge. Avec une architecture de ce type entre la requête HTTP POST du client et la réponse du serveur, tout fonctionne de manière synchrone, de fait si le serveur Sinatra atteins son pallier il ne sera plus en mesure de répondre à de nouvelles requêtes. Il est nécessaire que l’échange entre le client et le serveur Sinatra soit le plus rapide possible.

Pour cela il est essentiel de découpler l’interaction entre le serveur et le travail effectué. Pour comprendre cela imaginons que le client ne transmette pas d’informations systèmes mais carrément une vidéo que le serveur devra par exemple ré-encoder. Il n’est pas concevable que lorsque le client (desktop ou browser) a fini de poster la vidéo, l’utilisateur doive encore attendre la fin du réencodage du film avant d’avoir la main. Ce travail de ré-encodage doit s’effectuer dans un 2ème temps.

La première règle à définir est que l’échange entre le client et le serveur doit s’effectuer le plus vite possible, c’est un échange du type “fire and forget”. Pour cela lorsque le client desktop poste les informations, le serveur ne doit renvoyer qu’un acknowledge et aucune autre information permettant au client de savoir si le travail s’est bien effectué. Car si c’était le cas on reviendrait à un fonctionnement synchrone … Cependant rien empêche le serveur de renvoyer au client un identifiant unique afin de lui permettre par la suite de connaitre via une autre requête l’état de du travail envoyé.

Pour rendre cela possible il est nécessaire de séparer le code qui réceptionne la requête du code qui va traiter la demande. Il manque un tampon entre les deux, car le code qui va traiter la demande, le worker, peut être justement occupé à en traiter une. L’intérêt d’un serveur de job (appelé aussi serveur de file de message) est de stocker le job et le transmettre au worker lorsque celui-ci en fait la demande. Dans l’industrie il existe le protocole AMQP que RabbitMQ implémente. Je n’ai malheureusement pas réussi à faire lancer ce dernier sur mon serveur Ubuntu 10.04. J’ai découvert Gearman qui est presque aussi simple d’usage sur Beanstalkd tout en proposant des capacités de montée en charge, un worker pouvant s’abonner à plusieurs serveurs Gearman. De plus cet outil est développé par Danga pour LiveJournal, voir leur présentation sur le backend de LiveJournal : Behind the Scenes at LiveJournal: Scaling Storytime.

Sinatra expose l’API, réceptionne les demandes des clients, les transmet aussitôt à Gearman dans une file nommée en fonction du job (update,insert,…) puis renvoie aussitôt un ack au client desktop pour lui confirmer que sa requête est prise en compte. Sinatra est donc rapidement disponible à réceptionner une nouvelle requête. Le worker est abonné aux files d’attente dont il a la charge, il supprime le job du serveur afin de le traiter. Enfin Gearman offre la possibilité au worker de retourner au client l’état du travail effectué. Le client, ici Sinatra, dispose de ces 3 méthodes :

task.on_complete {|d| logger.info “complete : #{d}” }
task.on_warning {|w| logger.info “[client] warn: #{w}” }
task.on_fail {|f| logger.info “[client] failed : #{f}” }

Il peut ainsi stocker dans un fichier de log le statut du worker.

Voici le code du client Sinatra lancé par Thin :

#!/usr/bin/env ruby
require 'rubygems'

require 'gearman'

require "bundler"

Bundler.setup

Bundler.require(:default)

RAILS_ENV="development"

Gearman::Util.debug = true

SERVERS = ['localhost:4730']

logger = Logger.new('log/server.log', 'daily')

logger.debug("Created logger")

File.open(File.join('../config/database.mongo.yml'), 'r') do |f|

@settings = YAML.load(f)[RAILS_ENV]

end

Mongoid.configure do |config|

name = @settings["database"]

host = @settings["host"]

config.use_object_ids = @settings["use_object_ids"]

logger.info "database : #{name}"

logger.info "host : #{host}"

config.master = Mongo::Connection.new.db(name)

end

require 'models_mongoid/user.rb'

require 'models_mongoid/profil.rb'

require 'models_mongoid/host.rb'

set :logging, true

helpers do

def protected!

unless authorized?

response['WWW-Authenticate'] = %(Basic realm="Nodecast HTTP Auth")

throw(:halt, [401, "Not authorized\n"])

end

end

def authorized?

@auth ||=  Rack::Auth::Basic::Request.new(request.env)

# logger.info "AUTH = #{@auth.inspect}"

@current_user = User.where(:email => @auth.credentials.first, :authentication_token => @auth.credentials.last).first

@auth.provided? && @auth.basic? && @current_user

end

end

post '/hosts.xml' do

protected!

xml = Crack::XML.parse(request.body.read)

logger.info("POST")

uuid = UUIDTools::UUID.timestamp_create.to_s

host = {

:user => @current_user.email,

:uuid => uuid,

:datas => xml

}

client = Gearman::Client.new(SERVERS)

taskset = Gearman::Taskset.new

# task = Gearman::Task.new('add', Marshal.dump(xml), :background => true, :poll_status_interval => 1)

task = Gearman::Task.new('add', Marshal.dump(host))

task.on_complete {|d| logger.info "complete : #{d}" }

task.on_warning {|w| logger.info "[client] warn: #{w}" }

task.on_fail {|f| logger.info "[client] calculation failed : #{f}" }

taskset << task

client.run(taskset)

builder do |xml|

xml.instruct!

xml.host do

xml.uuid host[:uuid]

end

end

end

put '/host/update/:id' do

protected!

xml = Crack::XML.parse(request.body.read)

host = {

:user => @current_user.email,

:uuid => params[:id],

:datas => xml

}

logger.info("UPDATE")

client = Gearman::Client.new(SERVERS)

taskset = Gearman::Taskset.new

#task = Gearman::Task.new('update', Marshal.dump(host), :background => true, :poll_status_interval => 1)

task = Gearman::Task.new('update', Marshal.dump(host))

task.on_complete {|d| logger.info "complete : #{d}" }

task.on_warning {|w| logger.info "[client] warn: #{w}" }

task.on_fail {|f| logger.info "[client] calculation failed : #{f}" }

taskset << task

client.run(taskset)

builder do |xml|

xml.instruct!

xml.host do

xml.status "proceed"

end

end

end

et le code du worker lancé par un script daemons

#!/usr/bin/env ruby
require 'rubygems'

gem 'mongoid', '1.9.0'
require 'mongoid'
gem 'gearman-ruby', '2.0.0'
require 'gearman'
require 'uuidtools'
require 'logger'
require 'yaml'
require 'optparse'
require "pp"

options = {}

optparse = OptionParser.new do |opts|
  opts.on('-w', '--work WORK', 'path to the work directory') do |work|
    options[:work] = work
  end
  opts.on('-m', '--mongo MONGO', 'path to the mongo file') do |mongo|
    options[:mongo] = mongo
  end
  opts.on('-e', '--env ENV', 'rails environment') do |env|
    options[:env] = env
  end
end

begin
  optparse.parse!
  mandatory = [:work, :mongo, :env]
  missing = mandatory.select{ |param| options[param].nil? }
  if not missing.empty?
    puts "Missing options: #{missing.join(', ')}"
    puts optparse
    exit
  end
rescue OptionParser::InvalidOption, OptionParser::MissingArgument
  puts $!.to_s
  puts optparse
  exit
end

puts "Performing task with options: #{options.inspect}"            

Gearman::Util.debug = true if options[:env] == "development"

servers = ['localhost:4730']
@@worker = Gearman::Worker.new(servers)

logger = Logger.new("#{options[:work]}/log/worker.log", 'daily')
logger.debug("Created logger")

File.open(File.join("#{options[:mongo]}/database.mongo.yml"), 'r') do |f|
  @settings = YAML.load(f)[options[:env]]
end

Mongoid.configure do |config|
  name = @settings["database"]
  host = @settings["host"]
  config.use_object_ids = @settings["use_object_ids"]
  logger.info "database : #{name}"
  logger.info "host : #{host}"
  config.master = Mongo::Connection.new.db(name)
end

require "#{options[:work]}/models_mongoid/user.rb"
require "#{options[:work]}/models_mongoid/profil.rb"
require "#{options[:work]}/models_mongoid/host.rb"
require "#{options[:work]}/models_mongoid/osystem.rb"
require "#{options[:work]}/models_mongoid/load_statistic.rb"
require "#{options[:work]}/models_mongoid/memory_statistic.rb"
require "#{options[:work]}/models_mongoid/uptime_statistic.rb"
require "#{options[:work]}/models_mongoid/network_statistic.rb"
require "#{options[:work]}/models_mongoid/cpu_statistic.rb" 

########## JOB ADD HOST ############
@@worker.add_ability('add') do |data,job|

  dump = Marshal.load(data)
  xml = dump[:datas]

  @current_user = User.where(:email => dump[:user]).first

  p "user = #{@current_user.inspect}"

  push_host = {
    :uuid => dump[:uuid],
    :public => xml['host']['public'],
    :o s_version => xml['host']['version'],
    :patch_level => xml['host']['patch_level'].nil? ? "" : xml['host']['patch_level'].downcase,
    :cpu_vendor => xml['host']['cpu_vendor'],
    :architecture => xml['host']['architecture'],
    :cpu_model => xml['host']['cpu_model'],
    :cpu_mhz => xml['host']['cpu_mhz'],
    :cpu_cache_size => xml['host']['cpu_cache_size'],
    :cpu_number => xml['host']['cpu_number'],
    :cpu_total_cores => xml['host']['cpu_total_cores'],
    :cpu_total_sockets => xml['host']['cpu_total_sockets'],
    :cpu_cores_per_socket => xml['host']['cpu_cores_per_socket'],
    :mem_ram => xml['host']['mem_ram'],
    :mem_total => xml['host']['mem_total'],
    :hostname => xml['host']['hostname'],
    :domain_name => xml['host']['domain_name'],
    :default_gateway => xml['host']['default_gateway'],
    :primary_dns => xml['host']['primary_dns'],
    :secondary_dns => xml['host']['secondary_dns'],
    :primary_interface => xml['host']['primary_interface'],
    :primary_addr => xml['host']['primary_addr'],
    :user_id => @current_user.id
  }

  host_uptime = {
    :time => xml['host']['uptime_time'],
    :days => xml['host']['uptime_days']
  }

  host_load = {
    :loadavg0 => xml['host']['loadavg0'],
    :loadavg1 => xml['host']['loadavg1'],
    :loadavg2 => xml['host']['loadavg2']
  }

  host_mem = {
    :mem_used => xml['host']['mem_used'],
    :mem_free => xml['host']['mem_free'],
    :mem_actual_free => xml['host']['mem_actual_free'],
    :mem_actual_used => xml['host']['mem_actual_used'],
    :mem_actual_free_percent => xml['host']['mem_actual_free_percent'],
    :mem_actual_used_percent => xml['host']['mem_actual_used_percent'],
    :swap_total => xml['host']['swap_total'],
    :swap_used => xml['host']['swap_used'],
    :swap_free => xml['host']['swap_free'],
    :swap_page_in => xml['host']['swap_page_in'],
    :swap_page_out => xml['host']['swap_page_out']
    }

  host_network = {
    :rx_rate => xml['host']['rx_rate'],
    :tx_rate => xml['host']['tx_rate']
  }

  host_cpu = {
    :user => xml['host']['cpu_user'],
    :sys => xml['host']['cpu_sys'],
    :nice => xml['host']['cpu_nice'],
    :idle => xml['host']['cpu_idle'],
    :wait => xml['host']['cpu_wait'],
    :irq => xml['host']['cpu_irq'],
    :soft_irq => xml['host']['cpu_soft_irq'],
    :stolen => xml['host']['cpu_stolen'],
    :combined => xml['host']['cpu_combined'],
    :total => xml['host']['cpu_total']
  }

  begin
    profil = @current_user.profils.where(:context => xml['host']['profil']).first

    if !profil
      profil = @current_user.profils.create(:context  => xml['host']['profil'])
    end

    @host = profil.hosts.create(push_host)

    osystem = Osystem.where(:vendor => xml['host']['vendor'].downcase, :vendor_version => xml['host']['vendor_version'].downcase).first

    if !osystem
      osystem = Osystem.create(
                               :name => xml['host']['name'].downcase,
                               :vendor => xml['host']['vendor'].downcase,
                               :vendor_version => xml['host']['vendor_version'].downcase,
                               :vendor_code_name => xml['host']['vendor_code_name'].nil? ? "" : xml['host']['vendor_code_name'].downcase,
                               :description => xml['host']['description'],
                               :o s_base => xml['host']['os_base'],
                               :o s_type => xml['host']['os_type']
                               )
    end

    @host.osystem_id = osystem.id
    osystem.hosts_number += 1
    osystem.save
    @host.save

    if xml['host']['activated_memory'] == "true"
      @host.memory_statistics.create(host_mem)
    end

    if xml['host']['activated_load'] == "true"
      @host.load_statistics.create(host_load)
    end

    if xml['host']['activated_uptime'] == "true"
      @host.uptime_statistics.create(host_uptime)
    end

    if xml['host']['activated_network'] == "true"
      @host.network_statistics.create(host_network)
    end

    if xml['host']['activated_cpu'] == "true"
      @host.cpu_statistics.create(host_cpu)
    end

  rescue => e
    logger.info "error on create host : #{e}"
  end

  @current_user.hosts_number += 1
  @current_user.save

logger.info "created"
end

########## JOB UPDATE HOST ############
@@worker.add_ability('update') do |data,job|

  dump = Marshal.load(data)
  xml = dump[:datas]

  @current_user = User.where(:email => dump[:user]).first
  @host = Host.where(:uuid => dump[:uuid], :blocked => false).first

  if !@host
logger.info "host unknown"

  elsif @current_user.hosts.include?(@host)
    # os = Osystem.find(:all, :conditions => "lower(name) = '#{xml['host'][:os_name].downcase}' AND lower(codename) LIKE '%#{xml['host'][:os_codename].downcase}%' AND version = '#{xml['host'][:os_release]}' AND computer_architecture = '#{xml['host'][:architecture]}'")

    host_uptime = {
      :time => xml['host']['uptime_time'],
      :days => xml['host']['uptime_days']
    }

    host_load = {
      :loadavg0 => xml['host']['loadavg0'],
      :loadavg1 => xml['host']['loadavg1'],
      :loadavg2 => xml['host']['loadavg2']
    }

    host_mem = {
      :mem_used => xml['host']['mem_used'],
      :mem_free => xml['host']['mem_free'],
      :mem_actual_free => xml['host']['mem_actual_free'],
      :mem_actual_used => xml['host']['mem_actual_used'],
      :mem_actual_free_percent => xml['host']['mem_actual_free_percent'],
      :mem_actual_used_percent => xml['host']['mem_actual_used_percent'],
      :swap_total => xml['host']['swap_total'],
      :swap_used => xml['host']['swap_used'],
      :swap_free => xml['host']['swap_free'],
      :swap_page_in => xml['host']['swap_page_in'],
      :swap_page_out => xml['host']['swap_page_out']
    }

    host_network = {
      :rx_rate => xml['host']['rx_rate'],
      :tx_rate => xml['host']['tx_rate']
    }

    host_cpu = {
      :user => xml['host']['cpu_user'],
      :sys => xml['host']['cpu_sys'],
      :nice => xml['host']['cpu_nice'],
      :idle => xml['host']['cpu_idle'],
      :wait => xml['host']['cpu_wait'],
      :irq => xml['host']['cpu_irq'],
      :soft_irq => xml['host']['cpu_soft_irq'],
      :stolen => xml['host']['cpu_stolen'],
      :combined => xml['host']['cpu_combined'],
      :total => xml['host']['cpu_total']
    }

    begin

      if xml['host']['profil'] != @host.profil.context
        profil = @current_user.profils.where(:context => xml['host']['profil']).first
        if !profil
          profil = @current_user.profils.create(:context  => xml['host']['profil'])
        end
        @host.profil = profil
        @host.save
      end

         if xml['host']['activated_memory'] == "true"
           mem = @host.memory_statistics.create(host_mem)
         end

         if xml['host']['activated_load'] == "true"
           @host.load_statistics.create(host_load)
         end

         if xml['host']['activated_uptime'] == "true"
           @host.uptime_statistics.create(host_uptime)
         end

         if xml['host']['activated_network'] == "true"
           @host.network_statistics.create(host_network)
         end

         if xml['host']['activated_cpu'] == "true"
           @host.cpu_statistics.create(host_cpu)
         end

         logger.info "updated"

    rescue => e
      logger.info "failed on update : #{e}"
      raise Exception.new("failed on update : #{e}")
    end

  else
    logger.info "unauthorized update"
    raise Exception.new("unauthorized update")
  end
end

@@worker.work

, ,

  1. Laisser un commentaire

Répondre

Entrez vos coordonnées ci-dessous ou cliquez sur une icône pour vous connecter:

Gravatar
Logo WordPress.com

Vous commentez à l'aide de votre compte WordPress.com. Déconnexion / Changer )

Twitter picture

Vous commentez à l'aide de votre compte Twitter. Déconnexion / Changer )

Photo Facebook

Vous commentez à l'aide de votre compte Facebook. Déconnexion / Changer )

Connexion à %s

Suivre

Get every new post delivered to your Inbox.

Joignez-vous à 189 followers