Posts Tagged Guava

Trickle – asynchronous Java made easier

At Spotify, we’re doing more and more Java (Spotify started out as a Python-mostly shop, but performance requirements are changing that), and we’re doing more and more complex, asynchronous Java stuff. By ‘complex, asynchronous’, I mean things along the veins of:

  1. Call a search engine for a list of albums matching a certain query string.
  2. Call a search engine for a list of tracks matching the same query string.
  3. When the tracks list is available, call a service to find out how many times they  were played.
  4. Combine the results of the three service calls into some data structure and return it.

This is easy to do synchronously, but if you want performance, you don’t want to waste threads on blocking on service calls that take several tens of milliseconds. And if you do it asynchronously, you end up with code like this:

  ListenableFuture<List<Track>> tracks = search.searchForTracks(query);
  final ListenableFuture<List<DecoratedTrack>> decorated =
    Futures.transform(tracks,
      new AsyncFunction<List<Track>, List<DecoratedTrack>>() {
        @Override
        public ListenableFuture<List<DecoratedTrack>> apply(List<Track> tracks) {
          return decorationService.decorate(tracks);
        }
      });
  final ListenableFuture<List<Album>> albums = search.searchForAlbums(query);

  ListenableFuture<List<Object>> allDoneSignal =
    Futures.<Object>allAsList(decorated, albums);
 
  return Futures.transform(allDoneSignal,
    new Function<List<Object>, SomeResult>() {
      @Override
      public SomeResult apply(List<Object> dummy) {         
        return new SomeResult(
          Futures.getUnchecked(decorated),
          Futures.getUnchecked(albums));
      }
  });

It’s not exactly clear what’s going on. To me, some of the problems with the above code are:

  1. There’s a lot of noise due to Java syntax; it’s really hard to see which bits of the code do something useful.
  2. There’s a lot of concurrency management trivia in the way of understanding which service calls relate to which. The overall flow of data is very hard to understand.
  3. The pattern of using Futures.allAsList() to get a future that is a signal that ‘all the underlying futures are done’ is non-obvious, adding conceptual weight.
  4. The final transform doesn’t actually transform its inputs. To preserve type safety, we have to reach out to the ‘final’ decoratedTracks and albums futures.
  5. It’s easy for application code to accidentally make the code blocking by doing a ‘get’ on some future that isn’t yet completed, or by forgetting to add a future to a newly added service call to the ‘doneSignal’ list.

We’ve created a project called Trickle to reduce the level of ‘async pain’. With Trickle, the code above could be written like so:

  // constant definitions
  private static final Input<String> QUERY = Input.named("query");

  // one-time setup code, like a constructor or something
  private void wireUpGraph() {
    Func1<String, List<Track>> searchTracksFunc = new Func1<String, List<Track>>() {
      @Override
      public ListenableFuture<List<Track>> run(String query) {
        return search.searchForTracks(query);
      }
    };
    Func1<List<Track>, List<DecoratedTrack>> decorateTracksFunc =
      new Func1<List<Track>, List<DecoratedTrack>>() {
        @Override
        public ListenableFuture<List<DecoratedTrack>> run(List<Track> tracks) {
          return decorationService.decorate(tracks);
        }
      };
    Func1<String, List<Album>> searchAlbumsFunc = new Func1<String, List<Album>>() {
      @Override
      public ListenableFuture<List<Album>> run(String query) {
        return search.searchForAlbums(query);
      }
    };
    Func2<List<DecoratedTrack>, List<Album>, SomeResult> combine =
      new Func2<List<DecoratedTrack>, List<Album>, SomeResult>() {
        @Override
        public ListenableFuture run(List<DecoratedTrack> decorated, List<Album> albums) {
          return Futures.immediateFuture(new SomeResult(decorated, albums));
        }
      };

    Graph<List<Track>> searchTracks = Trickle.call(searchTracksFunc).with(QUERY);
    Graph<List<DecoratedTrack>> decorateTracks = Trickle.call(decorateTracksFunc).with(searchTracks);
    Graph<List<Album>> albums = Trickle.call(searchAlbumsFunc).with(QUERY);
    this.searchGraph = Trickle.call(combine).with(decorateTracks, albums);
  }

  // actual invocation method  
  public ListenableFuture search(String query) {
    return this.searchGraph.bind(QUERY, query).run();
  }

The code is not shorter, but there are some interesting differences:

  • The dependencies between different calls are shown much more clearly.
  • The individual steps are more clearly separated out, making them more easily testable.
  • Each step (node in Trickle lingo) is only invoked when the previous ones are completed, so you never get a Future as an input to business logic. This makes it very hard for application code to block concurrency.
  • It’s forward-compatible with lambdas, meaning it’ll look a lot nicer with Java 8.

For more examples and information about how it works, take a look at the README and wiki, because this post is about why, not how, I think you should use Trickle.

Why Trickle?

A danger that you always run as an engineer is that you fall in love with your own ideas and pursue something because you invented it rather than because it’s actually a good thing. Some of us had created systems for managing asynchronous call graphs at previous jobs, and while they had very large limitations (at least the ones I created did), it was clear that they made things quite a lot easier in the contexts where they were used. But even with that experience, we were not sure Trickle was a good idea. So once we had an API that felt like it was worth trying out, we sent out an email to the backend guild at Spotify, asking for volunteers to compare it with the two best similar frameworks we had been able to find on the interwebs (we ruled out Disruptor and Akka because we felt that introducing them would be an unreasonably large change to our existing ecosystem). The comparison was done by implementing a particular call graph, and we then asked people to fill out a questionnaire measuring a) how easy it was to get started, b) how much the framework got out of the way allowing you to focus on the core business logic, and c) how clean the resulting code was. Nine people took the survey, and the results were pretty interesting (1 is worst, 5 is best):

Technology Getting going Focus on core Cleanness
ListenableFutures 4.0 3.6 2.7
RxJava 2.8 3.7 3.1
Trickle 3.9 3.8 4.4

The most common comment regarding ListenableFutures (5/9 said this) was: “I already knew it, so it was of course easy to get started”. The most common comment about Trickle (6/9) was “no documentation” – three of the people who said that also said “but it was still very easy to get going”. So Trickle, without documentation, was almost as easy to get going with as raw Futures that most of the people already knew, and it was a clear winner in code cleanness. Given that we considered the cleanness of the resulting code to be the most important criterion, it felt like we were onto something.

Since getting that feedback, we’ve iterated a couple of times more on the API, and to ensure that it is production quality, we’re using it in the service discovery system that our group owns (and which can bring down almost the entire Spotify backend if it fails). We’ve also added some documentation, but not too much – we want to make sure that the API is simple enough that you can use it without needing to read any documentation at all. As you can tell from the links above, we also open-sourced it. Finally, we did a micro benchmark to ensure we hadn’t done anything that would introduce crazy performance limitations. All micro benchmarks are liars, but the results look like we’re in a reasonable zone compared to the others:

Benchmark Mode Samples Mean Mean error Units
c.s.t.Benchmark.benchmarkExecutorGuava thrpt 5 68.778 4.066 ops/ms
c.s.t.Benchmark.benchmarkExecutorRx thrpt 5 20.242 0.710 ops/ms
c.s.t.Benchmark.benchmarkExecutorTrickle thrpt 5 52.148 1.776 ops/ms
c.s.t.Benchmark.benchmarkImmediateGuava thrpt 5 890.375 79.594 ops/ms
c.s.t.Benchmark.benchmarkImmediateRx thrpt 5 312.870 8.643 ops/ms
c.s.t.Benchmark.benchmarkImmediateTrickle thrpt 5 168.820 13.991 ops/ms

Trickle is significantly slower than plain ListenableFutures (especially when the futures aren’t real futures because the result is immediately available; this case is very fast in Guava, whereas Trickle doesn’t do any optimisations for that). This is not a surprise, since Trickle is built on top of ListenableFutures and does more stuff. The key result we wanted out of this was that it shouldn’t be orders of magnitude slower than plain ListenableFutures, and it’s not. If a single thread is capable of doing 52k operations/second in Trickle, that’s more than our use case requires, so at least this test didn’t indicate that we had done something very wrong. I’m skeptical about the RxJava performance results; it’s slowness when using real threads may well be due to some mistake I did when writing the RxJava code.

Trickle, while still immature in particular regarding the exact details of the API, is now at a stage where we feel that it’s ready for general use, so if you’re using Futures.transform() and Futures.allAsList(), chances are great that you can make your code easier to read and work with by using Trickle instead. Or, if you’re doing things synchronously and wish that you could improve the performance of your services, perhaps Trickle could help you get a performance boost!

Advertisement

, , ,

8 Comments

Exceptional iterators using Guava

Some years ago, I came across a little gem of a blog post about using a class called AbstractIterator that I hadn’t heard of before. I immediately loved it, and started using it, and in general, the APIs defined in the JDK collections classes a lot more frequently. I’m still doing that today, but for the first time, I came across a situation where AbstractIterator (nowadays in Guava, not Google Collections, of course) let me down.

In the system I’m currently working on, we need to process tens of thousands of feeds that are generated ‘a little randomly’, often by people without deep technical skills. This means we cannot know for sure that entries in the feeds will be consistent; we may be able to parse the first four entries, but not the fifth, and so on. The quantity of feeds, combined with the fact that we need to be forgiving of less-than-perfect input means that we need to be able to continue ingesting data even if a few entries are bad.

Ideally, we wanted to write code similar to this:


Iterator iterator = feedParser.iterator();

while (iterator.hasNext()) {
  try {
    FeedEntry entry = iterator.next();

    // ... process the entry
  }
  catch (BadEntryException e) {
    // .. track the error and continue processing
  }
}

Now, we also wanted to use AbstractIterator as the base class for the parser to use, maybe something like:


class FeedEntryIterator extends AbstractIterator<FeedEntry> {
  protected FeedEntry computeNext() {
     if (noMoreInput()) {
        return endOfData();
     }

     try {
        return parseInput();
     }
     catch (ParseException e) {
        throw new BadEntryException(e);
     }
  }
}

This, however, doesn’t work for two reasons:

  1. The BadEntryException will be thrown during the execution of hasNext(), since the AbstractIterator class calls the computeNext() method to check if any more data is available, storing the result in an intermediate member field until the next() method is called.
  2. If the computeNext() method throws an exception, the AbstractIterator is left in a state FAILED, from which it cannot recover.

One option to work around this that I thought of was delegation. Something like this works for many scenarios:


public class ForwardingTransformingIterator<F, T> implements Iterator<T> {
    private final Iterator<F> source;
    private final Function<F, T> transformer;

    public ForwardingThrowingIterator(Iterator<F> source, Function<V, T> transformer) {
        this.delegate = delegate;
        this.transformer = transformer;
    }

    @Override
    public boolean hasNext() {
        return delegate.hasNext();
    }

    @Override
    public T next() {
        return transformer.apply(delegate.next());
    }
}

The transformation function could then safely throw exceptions for entries it fails to parse. There’s even a shorthand for the above code in Guava: Iterators.transform(). The only issue is that the delegate iterator needs to return chunks of data that are the same size as what the parser/transform needs. So if the parser needs to be, for instance, something like a SAX parser, you’re in trouble. When I had gotten this far, I figured the only solution was to modify the AbstractIterator class so that it can deal with exceptions directly. The source is a little too long to include in an already source-heavy post, but there’s a gist here. The essence of the change is to make the base class store exceptions of a particular type that are thrown by computeNext(), and re-throw them instead of returning a value when next() is called. It kind of works (we’re using this implementation right now), but it would be nice to find a better solution.

I’ve created a Guava issue for this, so if you think this would be useful, go ahead and vote for it. And if you have suggestions for how to improve the code, feel free to make them here or at code.google.com!

, ,

Leave a comment