mirror of https://github.com/mastodon/mastodon
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
80 lines
2.0 KiB
Ruby
80 lines
2.0 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
class WorkerBatch
|
|
include Redisable
|
|
|
|
TTL = 3600
|
|
|
|
def initialize(id = nil)
|
|
@id = id || SecureRandom.hex(12)
|
|
end
|
|
|
|
attr_reader :id
|
|
|
|
# Connect the batch with an async refresh. When the number of processed jobs
|
|
# passes the given threshold, the async refresh will be marked as finished.
|
|
# @param [String] async_refresh_key
|
|
# @param [Float] threshold
|
|
def connect(async_refresh_key, threshold: 1.0)
|
|
redis.hset(key, { 'async_refresh_key' => async_refresh_key, 'threshold' => threshold })
|
|
end
|
|
|
|
# Add jobs to the batch. Usually when the batch is created.
|
|
# @param [Array<String>] jids
|
|
def add_jobs(jids)
|
|
if jids.blank?
|
|
async_refresh_key = redis.hget(key, 'async_refresh_key')
|
|
|
|
if async_refresh_key.present?
|
|
async_refresh = AsyncRefresh.new(async_refresh_key)
|
|
async_refresh.finish!
|
|
end
|
|
|
|
return
|
|
end
|
|
|
|
redis.multi do |pipeline|
|
|
pipeline.sadd(key('jobs'), jids)
|
|
pipeline.expire(key('jobs'), TTL)
|
|
pipeline.hincrby(key, 'pending', jids.size)
|
|
pipeline.expire(key, TTL)
|
|
end
|
|
end
|
|
|
|
# Remove a job from the batch, such as when it's been processed or it has failed.
|
|
# @param [String] jid
|
|
def remove_job(jid)
|
|
_, pending, processed, async_refresh_key, threshold = redis.multi do |pipeline|
|
|
pipeline.srem(key('jobs'), jid)
|
|
pipeline.hincrby(key, 'pending', -1)
|
|
pipeline.hincrby(key, 'processed', 1)
|
|
pipeline.hget(key, 'async_refresh_key')
|
|
pipeline.hget(key, 'threshold')
|
|
end
|
|
|
|
if async_refresh_key.present?
|
|
async_refresh = AsyncRefresh.new(async_refresh_key)
|
|
async_refresh.increment_result_count(by: 1)
|
|
async_refresh.finish! if pending.zero? || processed >= threshold.to_f * (processed + pending)
|
|
end
|
|
end
|
|
|
|
# Get pending jobs.
|
|
# @returns [Array<String>]
|
|
def jobs
|
|
redis.smembers(key('jobs'))
|
|
end
|
|
|
|
# Inspect the batch.
|
|
# @returns [Hash]
|
|
def info
|
|
redis.hgetall(key)
|
|
end
|
|
|
|
private
|
|
|
|
def key(suffix = nil)
|
|
"worker_batch:#{@id}#{":#{suffix}" if suffix}"
|
|
end
|
|
end
|