You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

114 lines
3.8 KiB

  1. # frozen_string_literal: true
  2. class BatchedRemoveStatusService < BaseService
  3. include StreamEntryRenderer
  4. # Delete given statuses and reblogs of them
  5. # Dispatch PuSH updates of the deleted statuses, but only local ones
  6. # Dispatch Salmon deletes, unique per domain, of the deleted statuses, but only local ones
  7. # Remove statuses from home feeds
  8. # Push delete events to streaming API for home feeds and public feeds
  9. # @param [Status] statuses A preferably batched array of statuses
  10. def call(statuses)
  11. statuses = Status.where(id: statuses.map(&:id)).includes(:account, :stream_entry).flat_map { |status| [status] + status.reblogs.includes(:account, :stream_entry).to_a }
  12. @mentions = statuses.map { |s| [s.id, s.mentions.includes(:account).to_a] }.to_h
  13. @tags = statuses.map { |s| [s.id, s.tags.pluck(:name)] }.to_h
  14. @stream_entry_batches = []
  15. @salmon_batches = []
  16. @json_payloads = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h
  17. # Ensure that rendered XML reflects destroyed state
  18. Status.where(id: statuses.map(&:id)).in_batches.destroy_all
  19. # Batch by source account
  20. statuses.group_by(&:account_id).each do |_, account_statuses|
  21. account = account_statuses.first.account
  22. unpush_from_home_timelines(account_statuses)
  23. batch_stream_entries(account_statuses) if account.local?
  24. end
  25. # Cannot be batched
  26. statuses.each do |status|
  27. unpush_from_public_timelines(status)
  28. batch_salmon_slaps(status) if status.local?
  29. end
  30. Pubsubhubbub::DistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch }
  31. NotificationWorker.push_bulk(@salmon_batches) { |batch| batch }
  32. end
  33. private
  34. def batch_stream_entries(statuses)
  35. stream_entry_ids = statuses.map { |s| s.stream_entry.id }
  36. stream_entry_ids.each_slice(100) do |batch_of_stream_entry_ids|
  37. @stream_entry_batches << [batch_of_stream_entry_ids]
  38. end
  39. end
  40. def unpush_from_home_timelines(statuses)
  41. account = statuses.first.account
  42. recipients = account.followers.local.pluck(:id)
  43. recipients << account.id if account.local?
  44. recipients.each do |follower_id|
  45. unpush(follower_id, statuses)
  46. end
  47. end
  48. def unpush_from_public_timelines(status)
  49. payload = @json_payloads[status.id]
  50. redis.pipelined do
  51. redis.publish('timeline:public', payload)
  52. redis.publish('timeline:public:local', payload) if status.local?
  53. @tags[status.id].each do |hashtag|
  54. redis.publish("timeline:hashtag:#{hashtag}", payload)
  55. redis.publish("timeline:hashtag:#{hashtag}:local", payload) if status.local?
  56. end
  57. end
  58. end
  59. def batch_salmon_slaps(status)
  60. return if @mentions[status.id].empty?
  61. payload = stream_entry_to_xml(status.stream_entry.reload)
  62. recipients = @mentions[status.id].map(&:account).reject(&:local?).uniq(&:domain).map(&:id)
  63. recipients.each do |recipient_id|
  64. @salmon_batches << [payload, status.account_id, recipient_id]
  65. end
  66. end
  67. def unpush(follower_id, statuses)
  68. key = FeedManager.instance.key(:home, follower_id)
  69. originals = statuses.reject(&:reblog?)
  70. reblogs = statuses.reject { |s| !s.reblog? }
  71. # Quickly remove all originals
  72. redis.pipelined do
  73. originals.each do |status|
  74. redis.zremrangebyscore(key, status.id, status.id)
  75. redis.publish("timeline:#{follower_id}", @json_payloads[status.id])
  76. end
  77. end
  78. # For reblogs, re-add original status to feed, unless the reblog
  79. # was not in the feed in the first place
  80. reblogs.each do |status|
  81. redis.zadd(key, status.reblog_of_id, status.reblog_of_id) unless redis.zscore(key, status.reblog_of_id).nil?
  82. redis.publish("timeline:#{follower_id}", @json_payloads[status.id])
  83. end
  84. end
  85. def redis
  86. Redis.current
  87. end
  88. end