Skip to content

Commit

Permalink
[INLONG-8645][Agent] Delete the capacity of loading trigger from loca…
Browse files Browse the repository at this point in the history
…l file (#8646)
  • Loading branch information
justinwwhuang authored Aug 7, 2023
1 parent 1ad696b commit 8877026
Show file tree
Hide file tree
Showing 9 changed files with 15 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_MQ_TOPIC = "job.topicInfo";

// File job
public static final String JOB_TRIGGER = "job.fileJob.trigger";
public static final String JOB_FILE_JOB_TRIGGER = "job.fileJob.trigger";
public static final String JOB_DIR_FILTER_PATTERN = "job.fileJob.dir.pattern"; // deprecated
public static final String JOB_DIR_FILTER_PATTERNS = "job.fileJob.dir.patterns";
public static final String JOB_DIR_FILTER_BLACKLIST = "job.fileJob.dir.blackList";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.ProfileFetcher;
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.conf.ConfigJetty;
import org.apache.inlong.agent.core.job.JobManager;
Expand All @@ -31,22 +29,16 @@
import org.apache.inlong.agent.db.CommandDb;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.JobProfileDb;
import org.apache.inlong.agent.db.LocalProfile;
import org.apache.inlong.agent.db.TriggerProfileDb;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CONF_PARENT;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_CONF_PARENT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;

/**
* Agent Manager, the bridge for job manager, task manager, db e.t.c it manages agent level operations and communicates
* with outside system.
Expand All @@ -63,7 +55,6 @@ public class AgentManager extends AbstractDaemon {
private final AgentConfiguration conf;
private final ExecutorService agentConfMonitor;
private final Db db;
private final LocalProfile localProfile;
private final CommandDb commandDb;
private final JobProfileDb jobProfileDb;
// jetty for config operations via http.
Expand All @@ -75,8 +66,6 @@ public AgentManager() {
this.db = initDb();
commandDb = new CommandDb(db);
jobProfileDb = new JobProfileDb(db);
String parentConfPath = conf.get(AGENT_CONF_PARENT, DEFAULT_AGENT_CONF_PARENT);
localProfile = new LocalProfile(parentConfPath);
triggerManager = new TriggerManager(this, new TriggerProfileDb(db));
jobManager = new JobManager(this, jobProfileDb);
taskManager = new TaskManager(this);
Expand Down Expand Up @@ -208,20 +197,6 @@ public void start() throws Exception {
LOGGER.info("starting task position manager");
positionManager.start();
LOGGER.info("starting read job from local");
// read job profiles from local
List<JobProfile> profileList = localProfile.readFromLocal();
for (JobProfile profile : profileList) {
if (profile.hasKey(JOB_TRIGGER)) {
TriggerProfile triggerProfile = TriggerProfile.parseJobProfile(profile);
// there is no need to store this profile in triggerDB, because
// this profile comes from local file.
triggerManager.restoreTrigger(triggerProfile);
} else {
// job db store instance info, so it's suitable to use submitJobProfile
// to store instance into job db.
jobManager.submitFileJobProfile(profile);
}
}
LOGGER.info("starting fetcher");
if (fetcher != null) {
fetcher.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@

import java.io.Closeable;

import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE;
import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;

/**
* start http server and get job/agent config via http
Expand Down Expand Up @@ -87,7 +87,7 @@ public void storeJobConf(JobProfile jobProfile) {
// store job conf to bdb
if (jobProfile != null) {
// trigger job is a special kind of job
if (jobProfile.hasKey(JOB_TRIGGER)) {
if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
triggerManager.submitTrigger(
TriggerProfile.parseJsonStr(jobProfile.toJsonStr()), true);
} else {
Expand Down Expand Up @@ -123,7 +123,7 @@ public void storeAgentConf(String confJsonStr) {
*/
public void deleteJobConf(JobProfile jobProfile) {
if (jobProfile != null) {
if (jobProfile.hasKey(JOB_TRIGGER)) {
if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId(), false);
} else {
jobManager.deleteJob(jobProfile.getInstanceId(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public TriggerManager(AgentManager manager, TriggerProfileDb triggerProfileDb) {
*/
public boolean restoreTrigger(TriggerProfile triggerProfile) {
try {
Class<?> triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_TRIGGER));
Class<?> triggerClass = Class.forName(triggerProfile.get(JobConstants.JOB_FILE_JOB_TRIGGER));
Trigger trigger = (Trigger) triggerClass.newInstance();
String triggerId = triggerProfile.get(JOB_ID);
if (triggerMap.containsKey(triggerId)) {
Expand Down Expand Up @@ -171,7 +171,7 @@ private Runnable jobFetchThread() {
// necessary to filter the stated monitored file task.

boolean alreadyExistTask = job.exist(tasks -> tasks.stream()
.filter(task -> !task.getJobConf().hasKey(JobConstants.JOB_TRIGGER))
.filter(task -> !task.getJobConf().hasKey(JobConstants.JOB_FILE_JOB_TRIGGER))
.filter(task -> subTaskFile.equals(
task.getJobConf().get(JobConstants.JOB_DIR_FILTER_PATTERNS, "")))
.findAny().isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_VIP_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.VERSION;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_OP;
import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
import static org.apache.inlong.agent.plugin.utils.PluginUtils.copyJobProfile;
import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp;
Expand Down Expand Up @@ -267,8 +267,8 @@ private void dealWithFetchResult(TaskResult taskResult) {
.map(TriggerProfile::getTriggerProfiles)
.forEach(profile -> {
LOGGER.info("the triggerProfile: {}", profile.toJsonStr());
if (profile.hasKey(JOB_TRIGGER)) {
dealWithTdmTriggerProfile(profile);
if (profile.hasKey(JOB_FILE_JOB_TRIGGER)) {
dealWithFileTriggerProfile(profile);
} else {
dealWithJobProfile(profile);
}
Expand Down Expand Up @@ -377,7 +377,7 @@ private boolean makeUpFiles(TriggerProfile triggerProfile, String dataTime) {
/**
* the trigger profile returned from manager should be parsed
*/
public void dealWithTdmTriggerProfile(TriggerProfile triggerProfile) {
public void dealWithFileTriggerProfile(TriggerProfile triggerProfile) {
ManagerOpEnum opType = ManagerOpEnum.getOpType(triggerProfile.getInt(JOB_OP));
boolean success = true;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN;
import static org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;

/**
* Read text files
Expand All @@ -53,7 +53,7 @@ public TextFileSource() {
@Override
public List<Reader> split(JobProfile jobConf) {
super.init(jobConf);
if (jobConf.hasKey(JOB_TRIGGER)) {
if (jobConf.hasKey(JOB_FILE_JOB_TRIGGER)) {
// trigger as a special reader.
return Collections.singletonList(new TriggerFileReader());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public boolean isSourceExist() {

@Override
public void init(JobProfile jobConf) {
this.triggerId = jobConf.get(JobConstants.JOB_TRIGGER);
this.triggerId = jobConf.get(JobConstants.JOB_FILE_JOB_TRIGGER);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private void registerAllSubDir(
Map<String, String> taskProfile = new HashMap<>();
String md5 = AgentUtils.getFileMd5(path.toFile());
taskProfile.put(path.toFile().getAbsolutePath() + ".md5", md5);
taskProfile.put(JobConstants.JOB_TRIGGER, null); // del trigger id
taskProfile.put(JobConstants.JOB_FILE_JOB_TRIGGER, null); // del trigger id
taskProfile.put(JobConstants.JOB_DIR_FILTER_PATTERNS, path.toFile().getAbsolutePath());
LOGGER.info("trigger_{} generate job profile to read file {}",
trigger.getTriggerProfile().getTriggerId(), path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public static JobProfile copyJobProfile(TriggerProfile triggerProfile, File pend
JobProfile copiedProfile = TriggerProfile.parseJsonStr(triggerProfile.toJsonStr());
String md5 = AgentUtils.getFileMd5(pendingFile);
copiedProfile.set(pendingFile.getAbsolutePath() + ".md5", md5);
copiedProfile.set(JobConstants.JOB_TRIGGER, null); // del trigger id
copiedProfile.set(JobConstants.JOB_FILE_JOB_TRIGGER, null); // del trigger id
copiedProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, pendingFile.getAbsolutePath());
return copiedProfile;
}
Expand Down

0 comments on commit 8877026

Please sign in to comment.