Third-party Content Synchronization with Drupal Migrate

Sometimes you need to pull in content or data on an ongoing basis from a third-party product or website. Maybe you want to pull in a list of books from Amazon, or show some products from your Shopify store. You may need all the flexibility of nodes in Drupal, but you don’t want to copy content manually, and you don’t want to be forced to move away from those other systems that are already serving your needs.

Here’s a recipe for synchronizing content from outside websites or products – in our case, Eventbrite – using the Migrate module.

But First, A Few Alternatives

In our specific project, we considered a few alternatives before landing on Migrate. We could have reimplemented Eventbrite's functionality in Drupal. However, we didn’t want to abandon the product (Eventbrite) that was already meeting our client’s needs perfectly. We just needed to pull in the content itself, without having to manage it in multiple places. We also considered a client-side application like Vue.js or React to simply re-present their Eventbrite content on the site in a seamless manner. But with that approach, we would lose the flexibility of storing events as nodes and would need to reinvent many of the features which Drupal gives us for free, like clean URLs, Views, Search, fine-grained caching, and more.

What we really needed was a continuous content synchronization between Eventbrite and Drupal that would leverage Eventbrite for content entry and event management and Drupal for a seamless integration with the rest of their site. But, how to do it?

Enter the Migrate Module

But Migrate is just for moving old sites into new ones, right? The reality is Migrate comes with a plethora of excellent, built-in, plugins which makes importing content a breeze. Moreover, it has all the necessary concepts to run migrations on a schedule without importing anything that’s not new or updated. While it’s often overlooked, Migrate is a perfect tool for synchronization of content as much as it is a perfect tool for one-time migration of content.

In Drupal 7, the feeds module was often used for these kinds of tasks. Feeds isn’t as far along in Drupal 8 and Migrate is now a much more flexible platform on which to build these kinds of integrations.

Important Concepts

In order to understand how to use Migrate as a content synchronization tool, you’ll first need to understand a few important concepts about how Migrate is built. Migrate module makes liberal use of Drupal 8 plugins, making it incredibly flexible, but also a little hard to understand at first. Especially when coming directly from Drupal 7.

Migrations are about taking arbitrary data from one bucket of content and funneling it into a new Drupal-based bucket of content. In Migrate-speak, the first bucket of data is your data "source." Your Drupal site is then the "destination."

Between those two buckets – your source and your destination – you may need to manipulate or alter the data to make it compatible with your Drupal content types and fields. In Migrate, this is called a "processor." For example, you may need to transform a Unix timestamp from your source data into a formatted date, or make taxonomy terms out of a list of strings. Migrate lets you describe one or more "processing pipelines" for each field of the data you'll be importing.

These are the three key components we'll be working with:

  1. "source" plugins (to fetch the data to import)
  2. "process" plugins (to transform that data into something easier to use)
  3. "destination" plugins (to create our Drupal nodes).

The "Source" Plugin

Migrate already comes with a few source plugins out-of-the-box. They can plug into generic data sources like a legacy SQL database, CSV, XML, or JSON files. However, what we needed for our client was to integrate with a somewhat non-standard JSON-based API. For that, you’ll need to write a custom source plugin.

Q: How? A: SourcePluginBase and MigrateSourceInterface.

When implementing a source plugin, you’ll need to extend from SourcePluginBase and implement all the methods required by MigrateSourceInterface.

SourcePluginBase does much of the heavy lifting for you, but there remains one essential method you must write yourself, and it is by far the most complicated step of this entire effort. You’ll need to implement the initializeIterator() method. This method must return something that implements PHP’s built-in \Iterator interface. In a custom migration, connecting to a custom API, you’ll need to write your own custom class which implements this interface. An iterator is an object which can be used in a foreach in place of a PHP array. In that respect, they’re very much the same. You can write:

foreach ($my_iterator as $key => $value) {
  // $my_iterator might as well be an array, because it behaves the same here.
}

That’s where the similarity ends. You can’t assign values to an iterator, and you can’t arbitrarily look up a key. You can only loop over the iterator from beginning to end.

In the context of the Migrate module, the iterator is what provides each result row, or each piece of content, to be imported into your Drupal site. In the context of our Eventbrite implementation, our iterator is what made requests to the Eventbrite API.

There are five methods which every class that implements \Iterator must have:

  1. current() - This returns the current row of data. Migrate expects this to return an array representing the data you’ll be importing. It should be raw and unprocessed. We can clean it up later.
  2. key() - This returns the current ID of the data. Migrate expects this to be the source ID of the row to be imported.
  3. next() - This advances the iterator one place. This will be called before current() is called. You should prepare your class to return the next row of data the next time current() is called. In the context of the Eventbrite API, this could mean advancing one item in the returned JSON response from the Eventbrite API. However, Eventbrite’s API is paginated, it was in this method that, when we had no more rows in the current page, we would make a new HTTP request for the next page of JSON data and set up our class to return the next row of data.
  4. rewind() - This resets the Iterator so that it can be looped over anew. This clears out current data and sets up the next call to the current() method to return the first result row.
  5. valid() - This indicates when the iteration is complete, i.e. when there are no more rows to return. This method returns TRUE until you’ve returned every result. When you have nothing left to return after a call to next(), you should return FALSE to tell Migrate that there is nothing left to import.

I’m not going to go into the specifics of each method here; it is highly variable and entirely dependent on the source of your migration. Is your third-party API JSON-based or XML-based, etc.? Plus, if you’re here for Eventbrite, I’ve already done all the hard work for you! I’ve made all the Eventbrite code I wrote public on Github.

Once you’ve built your iterator, the rest of it should be smooth sailing. You’ll still need to implement the remaining methods for MigrateSourceInterface, each of which is more extensively documented on Drupal.org.

  • fields() - A list of fields available for your source rows. This is usually the top-level keys of the array returned by your Iterator’s current() method
  • getIds() - This returns the uniquely identifying fields and some schema information for your source data. E.g. user and user_id from some arbitrary data source
  • __toString() - This is usually just a human-readable name for your Migration, something like, “My Custom Migration Source”

Once you have all this done, you’re ready to set up a migration YAML file and almost all your custom PHP is already written.

Much of the documentation about migrations that exist today tells you to install the Migrate Plus module at this point. Migrate Plus gives you some nice Drush commands and specifies how you should place and define your migrations. Honestly, I found it completely confusing and, for our use-case, entirely unnecessary. It’s a rabbit hole I wouldn’t go down. Migrate itself comes with everything we need.

To define a migration, i.e. the YAML which tells the Migrate module which plugins to use and how to map your source data into your destination content types, you’ll need to place a file in a custom module under a directory named migration_templates. For me, I named this file eventbrite.yml, but you may name it how you want. Just make sure that the id that you define in YAML matches the filename.

The five top-level keys you must define in this file are:

  1. id: The machine ID for the migration, matching your filename
  2. label: The human-readable name of the Migration, in my case, “Eventbrite”
  3. source: This is where we tell the Migrate module to use our custom source plugin, more on that below
  4. destination: This is the plugin that tells migrate which plugin to map your content into. Usually, this will be entity:node
  5. process: This tells migrate how to map source data values into fields in your destination content. We’ll discuss that below as well

The source key tells the Migrate module which plugin will provide the source data that it needs to migrate or import. In our case, it looked like this:

source:
  plugin: eventbrite

Where the eventbrite string must match the plugin id defined by an annotation on our custom MigrateSourcePlugin. Ours looked like this:

/**
 * @MigrateSource(
 *   id = "eventbrite"
 * )
 */
class EventbriteSource extends SourcePluginBase … omitted ...

The process key is the third and last component of our custom migration. Briefly, you use this section to map your source fields into your destination fields. As a simple example, if your source data has a key like “name,” you might map that to “title” for a node. Of all the Migrate documentation, this section on process plugins is by far the most well-documented and I referenced it extensively.

The biggest misunderstanding I’ve seen about the process section is how powerful “pipelines” and ProcessPlugins can be. Do not worry about cleaning up and processing your data in your custom iterator. Instead, do it with ProcessPlugins and pipelines.

The documentation page for how to write a ProcessPlugin is incredibly short. That said, ProcessPlugins are incredibly easy to write. First, create a new class with a file named like: /src/Plugin/migrate/process/YourClassName.php. Your class should extend the ProcessPluginBase class. You only need to implement one method: transform().

The transform() method operates on each value, of each field, on a single result row. Thus, if your source data returns an array of strings for a field named “favorite_flavors” on a chef’s user profile, the transform method will be called once for each string in that array.

The idea is simple, the transform method takes $value as its first argument, does whatever changes it needs to, then returns the processed value. E.g., if you wanted to translate every occurrence of the word “umami” to a less pretentious word like “savory,” you would return the string “savory” every time $value was equal to “umami”.

By composing different processors and understanding what already comes free with the Migrate module (see the list of built-in processors), complicated migrations become much simpler to reason about as complexity grows.

Continuity

The single biggest differentiating factor between using Migrate for a legacy site migration and using Migrate for content synchronization is that you’ll run your migrations continuously on a regular interval. Usually, something like every 30 minutes or every hour. In order to run your migration continuously, it’s important for your migration to know a few things:

  • What has already been migrated
  • What, if anything, has been updated since the last run
  • What is new

When the Migrate module can answer these questions, it can optimize the migration so it only imports or updates what needs to be changed, i.e., it doesn’t import the same content over and over.

To do this, you need to specify one of two methods for answering these questions. You can either specify track_changes:TRUE under the source in your migration YAML, or you can specify a high_water_property. The former will hash each result row and compare it to a previously computed hash. If they match, Migrate will skip that row. The latter, will examine the property you specify and compare it to the same property from the previous migration. If the incoming high water property is higher, then Migrate knows it should import the row. Typically, you might use something like a “changed” or “updated” timestamp on the incoming content as your high water property.

Both methods work fine, sometimes you just might be unable to use one or the other. For example, if there are no available properties on your source data to act as a high water mark, then the track_changes method is your only option. You may be unable to use the high_water_property if there are fields on your source data that might change over time (thereby changing the hash of the content) but you do not want to trigger an update when those fields change.

Cron

The final piece of the puzzle is actually ensuring that your migration runs on a regular basis. To do that, you’ll want to write a little bit of code to run your migration on cron.

I found this tutorial on cron and the Drupal 8 Queue API to be very informative and helpful. I would recommend it if you’d like to learn more about Drupal’s Queue API. Here, we’re just going to go over the minimum required effort to get a migration importing regularly on cron.

First, you’ll need to implement hook_cron in a custom module. Put the following in that function:

/**
 * Implements hook_cron().
 *
 * Schedules a synchronization of my migration.
 */
function mymodule_importer_cron() {
  $queue = \Drupal::queue('mymodule_importer');
 
  // We only ever need to be sure that we get the latest content once. Lining
  // up multiple sync's in a row would be unnecessary and would just be a
  // resource hog. We check the queue depth to prevent that.
  $queue_depth = (integer) $queue->numberOfItems();
  if ($queue_depth === 0) {
    $queue->createItem(TRUE);
  }
}

In the above, we’re loading a queue and adding an item to that queue. Below, we’ll implement a QueueWorker that will run your migration when there is an item in the queue. It’s possible that the migration might take longer than the amount of time you have between cron runs. In that case, items would start piling up and you would never empty the queue. Here, we just make sure we have one item in the queue. There’s no reason to let them pile up.

Next, in a file named like src/Plugin/QueueWorker/MyModuleImporterQueueWorker.php, we’ll write a class that extends QueueWorkerBase:

namespace Drupal\mymodule_importer\Plugin\QueueWorker;
 
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Component\Plugin\PluginManagerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\migrate\MigrateExecutable;
use Drupal\migrate\MigrateExecutableInterface;
use Drupal\migrate\MigrateMessage;
 
/**
 * @QueueWorker(
 *   id = "mymodule_importer",
 *   title = @Translation("My Module Cron Importer"),
 *   cron = {
 *     "time" = 30,
 *   },
 * )
 */
class MyModuleImporterQueueWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {
   … omitted ...
}

Notice that the id value in the annotation matches the name we put in our implementation of hook_cron. That is important. The “time” value is the maximum time that this run of your worker is allowed to take. If it does not complete in time, it will be killed and the item will remain in the queue and will be executed on the next cron run.

Within the class, we’ll inject the the migration plugin manager…

  public function __construct(PluginManagerInterface $migration_manager) {
    $this->migrationManager = $migration_manager;
  }
 
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static($container->get(‘plugin.manager.migration’));
  }
 
  public function processItem($item) {
    $migration = $this->migrateManager->createInstance('your_migration');
    $message = new MigrateMessage('Content Imported');
    $executable = new MigrateExecutable($migration, $message);
    $executable->import();
  }

I’ve left out a lot of code standards for clarity above (don’t judge). The key things to notice are that ‘your_migration’ must match the id of the migration in your migration YAML file. The rest of the processItem() method is just a little limbo to get your migration to a point where you can call import() without an error.

With all this written, your migration will be run every time cron is executed.

Conclusion

It took a lot of research to get this working the first time, but we still saved a lot of time by using as much as we could from the Migrate module to implement third-party content synchronization. Once I wrote this initial implementation, I’ve been able to simply tweak the Iterator and machine names in order to implement synchronization with another API on another project. Getting everything set up and working took about a day and will probably take less time in the future.

You can see the work in action at Museum of Contemporary Art Denver – just check out their events page.

I hope you’ll let me know if you give this a try, and what you did and didn’t find helpful!

Code Drupal Drupal 8 Drupal Planet

Read This Next