Skip to content

Commit

Permalink
integration hRaven: add tags to group map-reduce jobs to a single query
Browse files Browse the repository at this point in the history
  • Loading branch information
dcfocus committed Apr 22, 2019
1 parent 2c314fa commit ce7948e
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions ql/src/java/org/apache/hadoop/hive/ql/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.PrintStream;
import java.io.Serializable;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -37,7 +39,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hive.common.ValidTxnList;
Expand Down Expand Up @@ -90,29 +96,28 @@
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.processors.CommandProcessor;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.security.authorization.AuthorizationUtils;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
import org.apache.hadoop.hive.ql.session.OperationLog;
import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel;
import org.apache.hadoop.hive.ql.session.SessionState;
Expand All @@ -127,10 +132,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;

public class Driver implements CommandProcessor {

static final private String CLASS_NAME = Driver.class.getName();
Expand Down Expand Up @@ -536,6 +537,15 @@ public void run() {
conf.set("mapreduce.workflow.id", "hive_" + queryId);
conf.set("mapreduce.workflow.name", queryStr);

String timeStamp = Long.toString(System.currentTimeMillis());
conf.set("batch.desc", queryStr + '_' + timeStamp);
conf.set("hive.batch.desc", queryStr + '_' + timeStamp);
conf.set("hive.flow.submitted.timestamp", timeStamp);
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] bytes = digest.digest(queryStr.getBytes(StandardCharsets.UTF_8));
String querySignature = new String(bytes, StandardCharsets.UTF_8);
conf.set("hive.signature", querySignature);

// initialize FetchTask right here
if (plan.getFetchTask() != null) {
plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext());
Expand Down

0 comments on commit ce7948e

Please sign in to comment.