Skip to content

Commit

Permalink
[INLONG-11400][Manager] Support Airflow schedule engine
Browse files Browse the repository at this point in the history
  • Loading branch information
ZKpLo committed Nov 19, 2024
1 parent 81fb821 commit 3b3e1ab
Show file tree
Hide file tree
Showing 37 changed files with 2,879 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "Full representation of the connection.")
public class AirflowConnection {

@JsonProperty("connection_id")
@ApiModelProperty("The connection ID.")
private String connectionId;

@JsonProperty("conn_type")
@ApiModelProperty("The connection type.")
private String connType;

@JsonProperty("description")
@ApiModelProperty("The description of the connection.")
private String description;

@JsonProperty("host")
@ApiModelProperty("Host of the connection.")
private String host;

@JsonProperty("login")
@ApiModelProperty("Login of the connection.")
private String login;

@JsonProperty("schema")
@ApiModelProperty("Schema of the connection.")
private String schema;

@JsonProperty("port")
@ApiModelProperty("Port of the connection.")
private Integer port;

@JsonProperty("password")
@ApiModelProperty("Password of the connection.")
private String password;

@JsonProperty("extra")
@ApiModelProperty("Additional information description of the connection.")
private String extra;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "DAG Description Information.")
public class DAG {

@JsonProperty("dag_id")
@ApiModelProperty("The ID of the DAG.")
private String dagId;

@JsonProperty("root_dag_id")
@ApiModelProperty("If the DAG is SubDAG then it is the top level DAG identifier. Otherwise, null.")
private String rootDagId;

@JsonProperty("is_paused")
@ApiModelProperty("Whether the DAG is paused.")
private Boolean isPaused;

@JsonProperty("is_active")
@ApiModelProperty("Whether the DAG is currently seen by the scheduler(s).")
private Boolean isActive;

@JsonProperty("description")
@ApiModelProperty("User-provided DAG description, which can consist of several sentences or paragraphs that describe DAG contents.")
private String description;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.util.List;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "Collection of DAGs.")
public class DAGCollection {

@JsonProperty("dags")
@ApiModelProperty("List of DAGs.")
private List<DAG> dags = null;

@JsonProperty("total_entries")
@ApiModelProperty("The length of DAG list.")
private Integer totalEntries;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "DAGRun Description Information.")
public class DAGRun {

@JsonProperty("conf")
@ApiModelProperty("JSON object describing additional configuration parameters.")
private Object conf;

@JsonProperty("dag_id")
@ApiModelProperty("Airflow DAG id.")
private String dagId;

@JsonProperty("dag_run_id")
@ApiModelProperty("Airflow DAGRun id (Nullable).")
private String dagRunId;

@JsonProperty("end_date")
@ApiModelProperty("The end time of this DAGRun.")
private String endDate;

@JsonProperty("start_date")
@ApiModelProperty("The start time of this DAGRun.")
private String startDate;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "DAGRunConf Description Information.")
public class DAGRunConf {

@JsonProperty("inlong_group_id")
@ApiModelProperty("Specify the Inlong group ID")
private String inlongGroupId;

@JsonProperty("start_time")
@ApiModelProperty("The start time of DAG scheduling.")
private long startTime;

@JsonProperty("end_time")
@ApiModelProperty("The end time of DAG scheduling.")
private long endTime;

@JsonProperty("boundary_type")
@ApiModelProperty("The offline task boundary type.")
private String boundaryType;

@JsonProperty("cron_expr")
@ApiModelProperty("Cron expression.")
private String cronExpr;

@JsonProperty("seconds_interval")
@ApiModelProperty("Time interval (in seconds).")
private String secondsInterval;

@JsonProperty("connection_id")
@ApiModelProperty("Airflow Connection Id of Inlong Manager.")
private String connectionId;

@JsonProperty("timezone")
@ApiModelProperty("The timezone.")
private String timezone;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.math.BigDecimal;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "[RFC7807](https://tools.ietf.org/html/rfc7807) compliant response. ")
public class Error {

@JsonProperty("detail")
@ApiModelProperty("Error Details.")
private String detail;

@JsonProperty("instance")
@ApiModelProperty("Error of the instance.")
private String instance;

@JsonProperty("status")
@ApiModelProperty("Error of the status.")
private BigDecimal status;

@JsonProperty("title")
@ApiModelProperty("Error of the title.")
private String title;

@JsonProperty("type")
@ApiModelProperty("Error of the type.")
private String type;
}
24 changes: 24 additions & 0 deletions inlong-manager/manager-schedule/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,29 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<exclusions>
<exclusion>
<groupId>com.vaadin.external.google</groupId>
<artifactId>android-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</exclusion>
<exclusion>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
public enum ScheduleEngineType {

NONE("None"),
AIRFLOW("Airflow"),
QUARTZ("Quartz");

private final String type;
Expand Down
Loading

0 comments on commit 3b3e1ab

Please sign in to comment.