Browse Source

Split SalmonWorker into smaller parts, move profile updating into another job

closed-social-glitch-2
Eugen Rochko 7 years ago
parent
commit
5442083b3c
9 changed files with 39 additions and 25 deletions
  1. +6
    -10
      app/services/follow_remote_account_service.rb
  2. +3
    -3
      app/services/process_feed_service.rb
  3. +3
    -7
      app/services/process_interaction_service.rb
  4. +2
    -0
      app/workers/admin/suspension_worker.rb
  5. +2
    -0
      app/workers/application_worker.rb
  6. +1
    -4
      app/workers/distribution_worker.rb
  7. +20
    -0
      app/workers/remote_profile_update_worker.rb
  8. +1
    -1
      app/workers/salmon_worker.rb
  9. +1
    -0
      spec/services/process_feed_service_spec.rb

+ 6
- 10
app/services/follow_remote_account_service.rb View File

@ -45,13 +45,13 @@ class FollowRemoteAccountService < BaseService
account.suspended = true if domain_block && domain_block.suspend? account.suspended = true if domain_block && domain_block.suspend?
account.silenced = true if domain_block && domain_block.silence? account.silenced = true if domain_block && domain_block.silence?
xml = get_feed(account.remote_url)
hubs = get_hubs(xml)
body, xml = get_feed(account.remote_url)
hubs = get_hubs(xml)
account.uri = get_account_uri(xml) account.uri = get_account_uri(xml)
account.hub_url = hubs.first.attribute('href').value account.hub_url = hubs.first.attribute('href').value
get_profile(xml, account)
get_profile(body, account)
account.save! account.save!
account account
@ -61,7 +61,7 @@ class FollowRemoteAccountService < BaseService
def get_feed(url) def get_feed(url)
response = http_client.get(Addressable::URI.parse(url)) response = http_client.get(Addressable::URI.parse(url))
Nokogiri::XML(response)
[response.to_s, Nokogiri::XML(response)]
end end
def get_hubs(xml) def get_hubs(xml)
@ -82,12 +82,8 @@ class FollowRemoteAccountService < BaseService
author_uri.content author_uri.content
end end
def get_profile(xml, account)
update_remote_profile_service.call(xml.at_xpath('/xmlns:feed'), account)
end
def update_remote_profile_service
@update_remote_profile_service ||= UpdateRemoteProfileService.new
def get_profile(body, account)
RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), false)
end end
def http_client def http_client

+ 3
- 3
app/services/process_feed_service.rb View File

@ -5,15 +5,15 @@ class ProcessFeedService < BaseService
xml = Nokogiri::XML(body) xml = Nokogiri::XML(body)
xml.encoding = 'utf-8' xml.encoding = 'utf-8'
update_author(xml, account)
update_author(body, xml, account)
process_entries(xml, account) process_entries(xml, account)
end end
private private
def update_author(xml, account)
def update_author(body, xml, account)
return if xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS).nil? return if xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS).nil?
UpdateRemoteProfileService.new.call(xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS), account, true)
RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true)
end end
def process_entries(xml, account) def process_entries(xml, account)

+ 3
- 7
app/services/process_interaction_service.rb View File

@ -24,7 +24,7 @@ class ProcessInteractionService < BaseService
return if account.suspended? return if account.suspended?
if salmon.verify(envelope, account.keypair) if salmon.verify(envelope, account.keypair)
update_remote_profile_service.call(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS), account, true)
RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true)
case verb(xml) case verb(xml)
when :follow when :follow
@ -114,7 +114,7 @@ class ProcessInteractionService < BaseService
return if status.nil? return if status.nil?
remove_status_service.call(status) if account.id == status.account_id
RemovalWorker.perform_async(status.id) if account.id == status.account_id
end end
def favourite!(xml, from_account) def favourite!(xml, from_account)
@ -130,7 +130,7 @@ class ProcessInteractionService < BaseService
end end
def add_post!(body, account) def add_post!(body, account)
process_feed_service.call(body, account)
ProcessingWorker.perform_async(account.id, body.force_encoding('UTF-8'))
end end
def status(xml) def status(xml)
@ -153,10 +153,6 @@ class ProcessInteractionService < BaseService
@process_feed_service ||= ProcessFeedService.new @process_feed_service ||= ProcessFeedService.new
end end
def update_remote_profile_service
@update_remote_profile_service ||= UpdateRemoteProfileService.new
end
def remove_status_service def remove_status_service
@remove_status_service ||= RemoveStatusService.new @remove_status_service ||= RemoveStatusService.new
end end

+ 2
- 0
app/workers/admin/suspension_worker.rb View File

@ -3,6 +3,8 @@
class Admin::SuspensionWorker class Admin::SuspensionWorker
include Sidekiq::Worker include Sidekiq::Worker
sidekiq_options queue: 'pull'
def perform(account_id) def perform(account_id)
SuspendAccountService.new.call(Account.find(account_id)) SuspendAccountService.new.call(Account.find(account_id))
end end

+ 2
- 0
app/workers/application_worker.rb View File

@ -1,3 +1,5 @@
# frozen_string_literal: true
class ApplicationWorker class ApplicationWorker
def info(message) def info(message)
Rails.logger.info("#{self.class.name} - #{message}") Rails.logger.info("#{self.class.name} - #{message}")

+ 1
- 4
app/workers/distribution_worker.rb View File

@ -4,10 +4,7 @@ class DistributionWorker < ApplicationWorker
include Sidekiq::Worker include Sidekiq::Worker
def perform(status_id) def perform(status_id)
status = Status.find(status_id)
FanOutOnWriteService.new.call(status)
WarmCacheService.new.call(status)
FanOutOnWriteService.new.call(Status.find(status_id))
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
info("Couldn't find the status") info("Couldn't find the status")
end end

+ 20
- 0
app/workers/remote_profile_update_worker.rb View File

@ -0,0 +1,20 @@
# frozen_string_literal: true
class RemoteProfileUpdateWorker
include Sidekiq::Worker
sidekiq_options queue: 'pull'
def perform(account_id, body, resubscribe)
account = Account.find(account_id)
xml = Nokogiri::XML(body)
xml.encoding = 'utf-8'
author_container = xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS) || xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS)
UpdateRemoteProfileService.new.call(author_container, account, resubscribe)
rescue ActiveRecord::RecordNotFound
true
end
end

+ 1
- 1
app/workers/salmon_worker.rb View File

@ -7,7 +7,7 @@ class SalmonWorker
def perform(account_id, body) def perform(account_id, body)
ProcessInteractionService.new.call(body, Account.find(account_id)) ProcessInteractionService.new.call(body, Account.find(account_id))
rescue ActiveRecord::RecordNotFound
rescue Nokogiri::XML::XPath::SyntaxError, ActiveRecord::RecordNotFound
true true
end end
end end

+ 1
- 0
spec/services/process_feed_service_spec.rb View File

@ -16,6 +16,7 @@ RSpec.describe ProcessFeedService do
end end
it 'updates remote user\'s account information' do it 'updates remote user\'s account information' do
account.reload
expect(account.display_name).to eq '::1' expect(account.display_name).to eq '::1'
expect(account).to have_attached_file(:avatar) expect(account).to have_attached_file(:avatar)
end end

Loading…
Cancel
Save