diff --git a/docs/en_US/guide/sources/builtin/http_pull.md b/docs/en_US/guide/sources/builtin/http_pull.md index 73a1c0428b..55f02cdaff 100644 --- a/docs/en_US/guide/sources/builtin/http_pull.md +++ b/docs/en_US/guide/sources/builtin/http_pull.md @@ -50,6 +50,9 @@ default: # # Expire time of the token in string, time unit is second, allow template # expire: '3600' # # Refresh token fetch method +# # Request header +# headers: +# Accept: application/json # refresh: # # Url to refresh the token, always use POST method # url: https://127.0.0.1/api/refresh diff --git a/docs/zh_CN/guide/sources/builtin/http_pull.md b/docs/zh_CN/guide/sources/builtin/http_pull.md index dd500810cd..d727d365ef 100644 --- a/docs/zh_CN/guide/sources/builtin/http_pull.md +++ b/docs/zh_CN/guide/sources/builtin/http_pull.md @@ -49,6 +49,9 @@ default: # body: '{"username": "admin","password": "123456"}' # # 令牌的过期时间,以字符串表示,时间单位为秒,允许使用模板 # expire: '3600' +# # 请求头 +# headers: +# Accept: application/json # # 如何刷新令牌 # refresh: # # 刷新令牌的 URL,总是使用 POST 方法发送请求 diff --git a/etc/sources/httppull.yaml b/etc/sources/httppull.yaml index 2b9467f4c6..33f6930711 100644 --- a/etc/sources/httppull.yaml +++ b/etc/sources/httppull.yaml @@ -32,6 +32,9 @@ default: # body: '{"username": "admin","password": "123456"}' # # Expire time of the token in string, time unit is second, allow template # expire: '3600' +# # Request header +# headers: +# Accept: application/json # # Refresh token fetch method # refresh: # # Url to refresh the token, always use POST method diff --git a/internal/io/http/client.go b/internal/io/http/client.go index 48ff667625..dec1e93ed0 100644 --- a/internal/io/http/client.go +++ b/internal/io/http/client.go @@ -52,9 +52,10 @@ type ClientConf struct { } type AccessTokenConf struct { - Url string `json:"url"` - Body string `json:"body"` - Expire string `json:"expire"` + Url string `json:"url"` + Body string `json:"body"` + Expire string `json:"expire"` + Headers map[string]string `json:"headers"` ExpireInSecond int } @@ -217,7 +218,7 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro // initialize the oAuth access token func (cc *ClientConf) auth(ctx api.StreamContext) error { - resp, err := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.accessConf.Url, nil, cc.accessConf.Body) + resp, err := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.accessConf.Url, cc.accessConf.Headers, cc.accessConf.Body) if err != nil { return err } diff --git a/internal/pkg/httpx/http.go b/internal/pkg/httpx/http.go index 80df39fa4f..884e6f77dd 100644 --- a/internal/pkg/httpx/http.go +++ b/internal/pkg/httpx/http.go @@ -17,7 +17,6 @@ package httpx import ( "bytes" "crypto/tls" - "encoding/json" "errors" "fmt" "io" @@ -83,37 +82,6 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string return client.Do(req) } -func convertToMap(v interface{}, sendSingle bool) (map[string]interface{}, error) { - switch t := v.(type) { - case []byte: - r := make(map[string]interface{}) - if err := json.Unmarshal(t, &r); err != nil { - if sendSingle { - return nil, fmt.Errorf("fail to decode content: %v", err) - } else { - r["result"] = string(t) - } - } - return r, nil - case map[string]interface{}: - return t, nil - case []map[string]interface{}: - r := make(map[string]interface{}) - if sendSingle { - return nil, fmt.Errorf("invalid content: %v", t) - } else { - j, err := json.Marshal(t) - if err != nil { - return nil, err - } - r["result"] = string(j) - } - return r, nil - default: - return nil, fmt.Errorf("invalid content: %v", v) - } -} - func IsValidUrl(uri string) bool { pu, err := url.ParseRequestURI(uri) if err != nil { diff --git a/pkg/connection/conn.go b/pkg/connection/conn.go index 7417a9aa81..c4ac51fe9e 100644 --- a/pkg/connection/conn.go +++ b/pkg/connection/conn.go @@ -139,13 +139,19 @@ func (meta *Meta) GetStatus() (s string, e string) { if ss != nil { s = ss.(string) if s == api.ConnectionConnected { - e = "" - // if connected, cw, cw.conn should exist - if _, isStateful := meta.cw.conn.(modules.StatefulDialer); !isStateful { - err := meta.cw.conn.Ping(context.Background()) - if err != nil { - s = api.ConnectionDisconnected - e = err.Error() + if meta.cw.IsInitialized() { + conn, err := meta.cw.Wait(context.Background()) + if err != nil || conn == nil { + return + } + e = "" + // if connected, cw, cw.conn should exist + if _, isStateful := conn.(modules.StatefulDialer); !isStateful { + err := conn.Ping(context.Background()) + if err != nil { + s = api.ConnectionDisconnected + e = err.Error() + } } } }