Skip to content

A small Java 8 util library, complementary to Guava (BiStream, Substring, MoreStreams, Parallelizer).

License

Notifications You must be signed in to change notification settings

sanjanarampurkottur01/mug

 
 

Repository files navigation

Disclaimer: This is not an official Google product.

Mug

A small Java 8 utilities library (javadoc), with 0 deps.

Installation

Maven

Add the following to pom.xml:

  <dependency>
    <groupId>com.google.mug</groupId>
    <artifactId>mug</artifactId>
    <version>6.6</version>
  </dependency>

Protobuf utils:

  <dependency>
    <groupId>com.google.mug</groupId>
    <artifactId>mug-protobuf</artifactId>
    <version>6.6</version>
  </dependency>

Guava add-ons:

  <dependency>
    <groupId>com.google.mug</groupId>
    <artifactId>mug-guava</artifactId>
    <version>6.6</version>
  </dependency>

Gradle

Add to build.gradle:

  implementation 'com.google.mug:mug:6.6'
  implementation 'com.google.mug:mug-guava:6.6'
  implementation 'com.google.mug:mug-protobuf:6.6'

Stream

BiStream streams pairs of objects.

This class closely mirrors JDK Stream API (the few extra methods of "its own" are very straight-forward). If you are familiar with Jdk stream, learning curve is minimal.

Example 1: to concatenate Maps:

import static com.google.mu.util.stream.BiStream.concat;

Map<AccountId, Account> allAccounts = concat(primaryAccouunts, secondaryAccounts).toMap();

Example 2: to combine two streams:

BiStream.zip(requests, responses)
    .mapToObj(RequestAndResponseLog::new);

Example 3: to build a Map fluently:

Map<DoctorId, Patient> patientsByDoctorId = BiStream.zip(doctors, patients)
    .filter((doctor, patient) -> patient.likes(doctor))
    .mapKeys(Doctor::getId)
    .collect(toMap());

Example 4: to build Guava ImmutableListMultimap fluently:

ImmutableListMultimap<ZipCode, Address> addressesByZipCode = BiStream.from(addresses)
    .mapKeys(Address::getZipCode)
    .collect(ImmutableListMultimap::toImmutableListMultimap);

Example 5: to a Map into sub-maps:

import static com.google.mu.util.stream.BiCollectors.groupingBy;

Map<Address, PhoneNumber> phonebooks = ...;
Map<State, Map<Address, PhoneNumber>> statePhonebooks = BiStream.from(phonebooks)
    .collect(groupingBy(Address::state, Collectors::toMap))
    .toMap();

Example 6: to merge Map entries:

import static com.google.mu.util.stream.BiCollectors.toMap;
import static com.google.mu.util.stream.MoreCollectors.flatteningMaps;

Map<Account, Money> totalPayouts = projects.stream()
    .map(Project::payments)  // Stream<Map<Account, Money>>
    .collect(flatteningMaps(toMap(Money::add)));

Example 7: to apply grouping over Map entries:

import static com.google.mu.util.stream.BiCollectors.toMap;
import static com.google.mu.util.stream.MoreCollectors.flatteningMaps;
import static java.util.stream.Collectors.summingInt;

Map<EmployeeId, Integer> workerHours = projects.stream()
    .map(Project::getTaskAssignments)  // Stream<Map<Employee, Task>>
    .collect(flatteningMaps(toMap(summingInt(Task::hours))));

Example 8: to turn a Collection<Pair<K, V>> to BiStream<K, V>:

BiStream<K, V> stream = RiStream.from(pairs, Pair::getKey, Pair::getValue);

Q: Why not Map<Foo, Bar> or Multimap<Foo, Bar>?

A: Sometimes Foo and Bar are just an arbitrary pair of objects, with no key-value relationship. Or you may not trust Foo#equals() and hashCode(). Instead, drop-in replace your Stream<Pair<Foo, Bar>>/List<Pair<Foo, Bar>> with BiStream<Foo, Bar>/BiCollection<Foo, Bar> to get better readability.

Q: Why not Stream<FooAndBar>?

A: When you already have a proper domain object, sure. But you might find it cumbersome to define a bunch of FooAndBar, PatioChairAndKitchenSink one-off classes especially if the relationship between the two types is only relevant in the local code context.

Q: Why not Stream<Pair<Foo, Bar>>?

A: It's distracting to read code littered with opaque method names like getFirst() and getSecond().

StringFormat

Extracts structured data from string:

new StringFormat("/users/{user}/.{hidden_file_name}")
    .parse(filePath, (user, fileName) -> ...);
new StringFormat("{hour}:{minute}:{second}.{millis}")
    .parse(“10:26:30.748”, (hour, minute, second, millis) -> ...);

Substring

Example 1: strip off a prefix if existent:

String httpStripped = Substring.prefix("http://").removeFrom(uri);

Example 2: strip off any scheme prefix from a uri:

String schemeStripped = Substring.upToIncluding(first("://")).removeFrom(uri);

Example 3: split a string in the format of "name=value" into name and value:

Substring.first('=').split("name=value").map((name, value) -> ...);

Example 4: replace trailing "//" with "/" :

Substring.suffix("//").replaceFrom(path, "/");

Example 5: strip off the suffix starting with a dash (-) character :

last('-').toEnd().removeFrom(str);

Example 6: extract a substring using regex :

String quoted = Substring.first(Pattern.compile("'(.*?)'"), 1)
    .from(str)
    .orElseThrow(...);

Example 7: find the substring between the first and last curly braces ({) :

String body = Substring.between(first('{'), last('}'))
    .from(source)
    .orElseThrow(...);

Optionals

Example 1: to combine two Optional instances into a single one:

Optional<Couple> couple = Optionals.both(optionalHusband, optionalWife).map(Couple::new);

Example 2: to run code when two Optional instances are both present:

Optionals.both(findTeacher(), findStudent()).ifPresent(Teacher::teach);

Example 3: or else run a fallback code block:

static import com.google.mu.util.Optionals.ifPresent;

Optional<Teacher> teacher = findTeacher(...);
Optional<Student> student = findStudent(...);
ifPresent(teacher, student, Teacher::teach)             // teach if both present
    .or(() -> ifPresent(teacher, Teacher::workOut))     // teacher work out if present
    .or(() -> ifPresent(student, Student::doHomework))  // student do homework if present
    .orElse(() -> log("no teacher. no student"));       // or else log

Example 4: wrap a value in Optional if it exists:

static import com.google.mu.util.Optionals.optionally;

Optional<String> id = optionally(request.hasId(), request::getId);

All Optionals utilites propagate checked exception from the the lambda/method references.

Example 1: to group consecutive elements in a stream:

List<StockPrice> pricesOrderedByTime = ...;

List<List<StockPrice>> priceSequences =
    MoreStreams.groupConsecutive(
            pricesOrderedByTime.stream(), (p1, p2) -> closeEnough(p1, p2), toList())
        .collect(toList());

Example 2: to split a stream into smaller-size chunks (batches):

int batchSize = 5;
MoreStreams.dice(requests, batchSize)
    .map(BatchRequest::new)
    .forEach(batchClient::sendBatchRequest);

Example 3: to iterate over Streams in the presence of checked exceptions or control flow:

The Stream API provides forEach() to iterate over a stream, if you don't have to throw checked exceptions.

When checked exception is in the way, or if you need control flow (continue, return etc.), iterateThrough() and iterateOnce() can help. The following code uses iterateThrough() to write objects into an ObjectOutputStream, with IOException propagated:

Stream<?> stream = ...;
ObjectOutput out = ...;
iterateThrough(stream, out::writeObject);

with control flow:

for (Object obj : iterateOnce(stream)) {
  if (...) continue;
  else if (...) return;
  out.writeObject(obj);
}

Example 4: to generate a BFS stream:

Stream<V> bfs = MoreStreams.generate(root, node -> node.children().stream())
    .map(Node::value);

Example 5: to merge maps:

interface Page {
  Map<Day, Long> getTrafficHistogram();
}

List<Page> pages = ...;

// Merge traffic histogram across all pages of the web site
Map<Day, Long> siteTrafficHistogram = pages.stream()
    .map(Page::getTrafficHistogram)
    .collect(flatteningMaps(groupingBy(day -> day, Long::sum)))
    .toMap();

The JDK offers binary search algorithms out of the box, for sorted arrays and lists.

But the binary search algorithm is applicable to more use cases. For example:

  • You may want to search in a double array with a tolerance factor.
    Optional<Integer> index =
        BinarySearch.inSortedArrayWithTolerance(doubles, 0.0001).find(3.14)
  • Or search for the range of indexes when the array can have duplicates (at least according to the tolerance factor).
    Range<Integer> indexRange =
        BinarySearch.inSortedArrayWithTolerance(doubles, 0.0001).rangeOf(3.14)
  • Or search for the solution to a monotonic polynomial equation.
    long polynomial(int x) {
      return 5 * x * x * x + 3 * x + 2;
    }
    
    Optional<Integer> solvePolynomial(long y) {
      return BinarySearch.forInts()
          .find((low, x, high) -> Long.compare(y, polynomial(x));
    }

Example 1: turn your recursive algorithm into a lazy Stream:

class DepthFirst<N> extends Iteration<N> {
  private final Set<N> visited = new HashSet<>();
  
  DepthFirst<N> postOrder(N node) {
    if (visited.add(node)) {
      for (N successor : node.successors()) {
        yield(() -> postOrder(successor));
      }
      yield(node);
    }
    return this;
  }
}

Stream<N> postOrder = new DepthFirst<N>().postOrder(root).iterate();

Example 2: implement Fibonacci sequence as a stream:

class Fibonacci extends Iteration<Long> {
  Fibonacci from(long v0, long v1) {
    yield(v0);
    yield(() -> from(v1, v0 + v1));
    return this;
  }
}

Stream<Long> fibonacci = new Fibonacci().from(0, 1).iterate();
    => [0, 1, 2, 3, 5, 8, ...]

Have you needed to specify a whitelist of things, while also needed to allow all when the feature becomes ready for prime time?

A common work-around is to use a Set to capture the whitelist, and then treat empty set as all.

This has a few caveats:

  1. The code that checks the whitelist is error prone. You'll need to remember to check the special case for an empty Set:
if (allowedUsers.isEmpty() || allowedUsers.contains(userId)) {
  ...;
}
  1. If "empty" means all, how do you disable the feature completely, blocking all users?
  2. Assume allowedUsers in the above example is a flag, it's not super clear to the staff configuring this flag about what it means when it's empty. When your system has a dozen components, each with its own flavor of whitelist, allowlist flags, it's easy to mis-understand and mis-configure.

Instead, use Selection to differentiate and make explicit the all vs. none cases. Code checking against a Selection is straight-forward:

if (allowedUsers.has(userId)) {
  ...
}

To use it in a flag, you can use the Selection.parser() which will accept the '*' character as indication of accepting all, while setting it empty means to accept none. For example --allowed_users=*, --allowed_users=joe,amy.

Sometimes you may have a short list with elements representing structured data points. For example, if you are trying to parse a human name, which can either be first name only, or in the format of firstName lastName, or in addition with middle name, you can do:

import static com.google.mu.util.MoreCollections.findOnlyElements;

String fullName = ...;
List<String> nameParts =
    Substring.first(' ').repeatedly().splitThenTrim(fullName).collect(toList());
Optional<Result> result =
    findOnlyElements(nameParts, firstName -> ...)
        .or(() -> findOnlyElements(nameParts, (firstName, lastName) -> ...))
        .or(() -> findOnlyElements(nameParts, (firstName, middleName, lastName) -> ...));

In the above example, the short list may have 1, 2 or 3 elements. If you know the exact number of elements, you can use the onlyElements() Collector instead.

For example, you may need to parse out the hour, minute and second from a string that looks like 12:05:10:

import static com.google.mu.util.stream.MoreCollectors.onlyElements;

HourMinuteSecond result =
    Substring.first(':')
        .repeatedly()
        .split("12:05:10")
        .collect(onlyElements((h, m, s) -> new HourMinuteSecond(h, m, s));

Runs IO-bound operations in the structured concurrency model, while limiting max concurrency.

For example, the following snippet uploads a large number (think of millions) of pictures in parallel:

ExecutorService threadPool = Executors.newCachedThreadPool();
try {
  new Parallelizer(threadPool, numThreads)
      .parallelize(pictures.stream(), this::upload);
} finally {
  threadPool.shutdownNow();
}

Note that this code will terminate if any picture fails to upload. If upload() throws IOException and an IOException should not terminate the batch upload, the exception needs to be caught and handled:

  new Parallelizer(threadPool, numThreads)
      .parallelize(pictures, pic -> {
        try {
          upload(pic);
        } catch (IOException e) {
          log(e);
        }
      });

Why not parallel stream?

Like:

pictures.parallel().forEach(this::upload);

Reasons not to:

  • Parallelizer works with any existing ExecutorService.
  • Parallelizer supports an in-flight tasks limit.
  • Parallelizer can be interrupted.
  • Thread unsafe input streams or Iterators are okay because they are always consumed in the main thread.
  • When any parallel work fails, the pipeline is aborted with all pending and in-flight tasks are canceled.
  • Exceptions from worker threads are wrapped so that stack trace isn't misleading.

Why not just submitting to a fixed thread pool?

Like:

ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
try {
  pictures.forEach(pic -> threadPool.submit(() -> upload(pic)));
  threadPool.shutdown();
  threadPool.awaitTermination(100, SECONDS);
} finally {
  threadPool.shutdownNow();
}

Reasons not to:

  1. The thread pool queues all pending tasks. If the input stream is too large to fit in memory, you'll get an OutOfMemoryError.
  2. Exceptions (including NullPointerException, OutOfMemoryError) are silently swallowed (the executor may log stack trace). To propagate the exceptions, the Future objects need to be stored in a list and then Future#get() needs to be called on every future object after all tasks have been submitted to the executor.
  3. Tasks submitted to an executor are not "structured" but independent. One task failing doesn't automatically terminate the pipeline.

Read in parallel

The above examples are all about "writing". If for example you have a command line tool that needs to read from a distributed storage, you can do:

public void main(String[] args) throws Exception {
  Parallelizer parallelizer = Parallelizer.newDaemonParallelizer(10); // at most 10 concurrently
  Map<PictureId, Picture> pics = CharStreams.readLines(System.in).stream()
      .map(PictureId::parse)
      .distinct()
      .collect(parallelizer.inParallel(picId -> readPictureFromDistributedStorage(picId)))
      .toMap();
  ...
}
  • Retry blockingly or async
  • Configurable and extensible backoff strategies
  • Retry on exception or by return value
  • Everything is @Immutable and @ThreadSafe

To retry blockingly

Blocking the thread for retry isn't always a good idea at server side. It is however simple and being able to propagate exceptions directly up the call stack is convenient:

Account fetchAccountWithRetry() throws IOException {
  return new Retryer()
      .upon(IOException.class, Delay.ofMillis(1).exponentialBackoff(1.5, 4))
      .retryBlockingly(this::getAccount);
}

To retry asynchronously

CompletionStage<Account> fetchAccountWithRetry(ScheduledExecutorService executor) {
  return new Retryer()
      .upon(IOException.class, Delay.ofMillis(1).exponentialBackoff(1.5, 4))
      .retry(this::getAccount, executor);
}

To retry an already asynchronous operation

If getAccount() itself already runs asynchronously and returns CompletionStage<Account>, it can be retried using the retryAsync() method.

And for demo purpose, let's use Fibonacci backoff strategy, with a bit of randomization in the backoff to avoid bursty traffic, why not?

CompletionStage<Account> fetchAccountWithRetry(ScheduledExecutorService executor) {
  Random rnd = new Random();
  return new Retryer()
      .upon(IOException.class,
            Delay.ofMillis(30).fibonacci(4).stream()
                .map(d -> d.randomized(rnd, 0.3)))
      .retryAsync(this::getAccount, executor);
}

A side note: using Stream to transform will eagerly evaluate all list elements before retryAsync() is called. If that isn't desirable (like, you have nCopies(10000000, delay)), it's best to use some kind of lazy List transformation library. For example, if you use Guava, then:

Lists.transform(nCopies(1000000, Delay.ofMillis(30)), d -> d.randomized(rnd, 0.3))

To retry based on return value

Sometimes the API you work with may return error codes instead of throwing exceptions. Retries can be based on return values too:

new Retryer()
    .uponReturn(ErrorCode::BAD, Delay.ofMillis(10).exponentialBackoff(1.5, 4))
    .retryBlockingly(this::depositeMyMoney);

Or, use a predicate:

new Retryer()
    .ifReturns(r -> r == null, Delay.ofMillis(10).exponentialBackoff(1.5, 4))
    .retryBlockingly(this::depositeMyMoney);

Backoffs are just List<Delay>

exponentialBackoff(), fibonacci(), timed() and randomized() are provided out of the box for convenience purpose only. But at the end of the day, backoffs are just old-school boring Lists. This makes the backoff strategies extensible. You can create the List in any way you are used to, using any Java library. For example, there isn't a uniformDelay() in this library, because there is already Collections.nCopies(n, delay).

Or, to concatenate two different backoff strategies together (first uniform and then exponential), the Java 8 Stream API has a good tool for the job:

new Retryer()
    .upon(RpcException.class,
          Stream.concat(nCopies(3, Delay.ofMillis(1)).stream(),
                        Delay.ofMillis(2).exponentialBackoff(1.5, 4).stream()))
    .retry(...);

What about to retry infinitely? Collections.nCopies(Integer.MAX_VALUE, delay) isn't infinite but close. JDK only uses O(1) time and space for creating it; same goes for Delay#exponentialBackoff() and Delay#fibonacci().

To handle retry events

Sometimes the program may need custom handling of retry events, like, for example, to increment a stats counter based on the error code in the exception. Requirements like this can be done with a custom Delay implementation:

class RpcDelay extends Delay<RpcException> {

  @Override public Duration duration() {
    ...
  }

  @Override public void beforeDelay(RpcException e) {
    updateStatsCounter(e.getErrorCode(), "before delay", duration());
  }

  @Override public void afterDelay(RpcException e) {
    updateStatsCounter(e.getErrorCode(), "after delay", duration());
  }
}

return new Retryer()
    .upon(RpcException.class,
          Delay.ofMillis(10).exponentialBackoff(...).stream()
              .map(Delay::duration)
              .map(RpcDelay::new))
    .retry(this::sendRpcRequest, executor);

Or, to get access to the retry attempt number, which is also the list's index, here's an example:

class RpcDelay extends Delay<RpcException> {
  RpcDelay(int attempt, Duration duration) {...}

  @Override public void beforeDelay(RpcException e) {
    updateStatsCounter(e.getErrorCode(), "before delay " + attempt, duration());
  }

  @Override public void afterDelay(RpcException e) {...}
}

List<Delay<?>> delays = Delay.ofMillis(10).fibonacci(...);
return new Retryer()
    .upon(RpcException.class,
          IntStream.range(0, delays.size())
              .mapToObj(i -> new RpcDelay(i, delays.get(i).duration())))
    .retry(...);

To keep track of exceptions

If the method succeeds after retry, the exceptions are by default logged. As shown above, one can override beforeDelay() and afterDelay() to change or suppress the logging.

If the method fails after retry, the exceptions can also be accessed programmatically through exception.getSuppressed().

About

A small Java 8 util library, complementary to Guava (BiStream, Substring, MoreStreams, Parallelizer).

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 99.2%
  • Other 0.8%