You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

151 lines
5.6 KiB

  1. # frozen_string_literal: true
  2. require_relative '../../config/boot'
  3. require_relative '../../config/environment'
  4. require_relative 'cli_helper'
  5. module Mastodon
  6. class SearchCLI < Thor
  7. include CLIHelper
  8. # Indices are sorted by amount of data to be expected in each, so that
  9. # smaller indices can go online sooner
  10. INDICES = [
  11. AccountsIndex,
  12. TagsIndex,
  13. StatusesIndex,
  14. ].freeze
  15. option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
  16. option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
  17. desc 'deploy', 'Create or upgrade ElasticSearch indices and populate them'
  18. long_desc <<~LONG_DESC
  19. If ElasticSearch is empty, this command will create the necessary indices
  20. and then import data from the database into those indices.
  21. This command will also upgrade indices if the underlying schema has been
  22. changed since the last run.
  23. Even if creating or upgrading indices is not necessary, data from the
  24. database will be imported into the indices.
  25. LONG_DESC
  26. def deploy
  27. if options[:concurrency] < 1
  28. say('Cannot run with this concurrency setting, must be at least 1', :red)
  29. exit(1)
  30. end
  31. indices = begin
  32. if options[:only]
  33. options[:only].map { |str| "#{str.camelize}Index".constantize }
  34. else
  35. INDICES
  36. end
  37. end
  38. progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
  39. # First, ensure all indices are created and have the correct
  40. # structure, so that live data can already be written
  41. indices.select { |index| index.specification.changed? }.each do |index|
  42. progress.title = "Upgrading #{index} "
  43. index.purge
  44. index.specification.lock!
  45. end
  46. ActiveRecord::Base.configurations[Rails.env]['pool'] = options[:concurrency] + 1
  47. pool = Concurrent::FixedThreadPool.new(options[:concurrency])
  48. added = Concurrent::AtomicFixnum.new(0)
  49. removed = Concurrent::AtomicFixnum.new(0)
  50. progress.title = 'Estimating workload '
  51. # Estimate the amount of data that has to be imported first
  52. indices.each do |index|
  53. index.types.each do |type|
  54. progress.total = (progress.total || 0) + type.adapter.default_scope.count
  55. end
  56. end
  57. # Now import all the actual data. Mind that unlike chewy:sync, we don't
  58. # fetch and compare all record IDs from the database and the index to
  59. # find out which to add and which to remove from the index. Because with
  60. # potentially millions of rows, the memory footprint of such a calculation
  61. # is uneconomical. So we only ever add.
  62. indices.each do |index|
  63. progress.title = "Importing #{index} "
  64. batch_size = 1_000
  65. slice_size = (batch_size / options[:concurrency]).ceil
  66. index.types.each do |type|
  67. type.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
  68. futures = []
  69. batch.each_slice(slice_size) do |records|
  70. futures << Concurrent::Future.execute(executor: pool) do
  71. begin
  72. if !progress.total.nil? && progress.progress + records.size > progress.total
  73. # The number of items has changed between start and now,
  74. # since there is no good way to predict the final count from
  75. # here, just change the progress bar to an indeterminate one
  76. progress.total = nil
  77. end
  78. grouped_records = nil
  79. bulk_body = nil
  80. index_count = 0
  81. delete_count = 0
  82. ActiveRecord::Base.connection_pool.with_connection do
  83. grouped_records = type.adapter.send(:grouped_objects, records)
  84. bulk_body = Chewy::Type::Import::BulkBuilder.new(type, grouped_records).bulk_body
  85. end
  86. index_count = grouped_records[:index].size if grouped_records.key?(:index)
  87. delete_count = grouped_records[:delete].size if grouped_records.key?(:delete)
  88. # The following is an optimization for statuses specifically, since
  89. # we want to de-index statuses that cannot be searched by anybody,
  90. # but can't use Chewy's delete_if logic because it doesn't use
  91. # crutches and our searchable_by logic depends on them
  92. if type == StatusesIndex::Status
  93. bulk_body.map! do |entry|
  94. if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank?
  95. index_count -= 1
  96. delete_count += 1
  97. { delete: entry[:index].except(:data) }
  98. else
  99. entry
  100. end
  101. end
  102. end
  103. Chewy::Type::Import::BulkRequest.new(type).perform(bulk_body)
  104. progress.progress += records.size
  105. added.increment(index_count)
  106. removed.increment(delete_count)
  107. sleep 1
  108. rescue => e
  109. progress.log pastel.red("Error importing #{index}: #{e}")
  110. end
  111. end
  112. end
  113. futures.map(&:value)
  114. end
  115. end
  116. end
  117. progress.title = ''
  118. progress.stop
  119. say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
  120. end
  121. end
  122. end