Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support kubernetes workload identity #4

Closed
wants to merge 11 commits into from
4 changes: 4 additions & 0 deletions lib/resty/pubsub/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ _M.PUBSUB_BASE_DOMAIN = "pubsub.googleapis.com"

_M.IS_EMULATOR = false

_M.DISABLE_SSL = false

_M.OAUTH_BASE_URI = "https://www.googleapis.com/oauth2/v4/token"

_M.OAUTH_SCOPES = {
Expand All @@ -45,4 +47,6 @@ _M.OAUTH_TOKEN_EXPIRY = 3600 -- in seconds

_M.OAUTH_TOKEN_DICT = ngx.shared.OAUTH_TOKEN

_M.WORKLOAD_IDENTITY_TOKEN_URL = "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token"

return _M
2 changes: 1 addition & 1 deletion lib/resty/pubsub/oauth_client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ local function refresh_oauth_token(self)
return cjson.decode(res.body)["access_token"], nil
end

function _M.get_oauth_token(self)
function _M.get_token(self)

if self.oauth_token_dict == nil then
return nil, "Provided oauth lua dictionary not found, please refer to documentation for adding it to nginx configuration"
Expand Down
56 changes: 44 additions & 12 deletions lib/resty/pubsub/producer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ local constants = require "resty.pubsub.constants"
local ringbuffer = require "resty.pubsub.ringbuffer"
local request = require "resty.pubsub.request"
local oauthclient = require "resty.pubsub.oauth_client"
local workloadidentityclient = require "resty.pubsub.workload_identity_client"

local setmetatable = setmetatable

Expand Down Expand Up @@ -190,6 +191,7 @@ local function normalize_configs(self, pubsub_config)
pubsub_config.pubsub_base_domain = pubsub_config.pubsub_base_domain or constants.PUBSUB_BASE_DOMAIN
pubsub_config.pubsub_base_port = pubsub_config.pubsub_base_port or constants.PUBSUB_BASE_PORT
pubsub_config.is_emulator = pubsub_config.is_emulator or constants.IS_EMULATOR
pubsub_config.disable_ssl = pubsub_config.disable_ssl or constants.DISABLE_SSL

if pubsub_config.producer_config == nil then
pubsub_config.producer_config = {}
Expand All @@ -203,13 +205,40 @@ local function normalize_configs(self, pubsub_config)
pubsub_config.producer_config.keepalive_max_idle_timeout = pubsub_config.producer_config.keepalive_max_idle_timeout or constants.KEEPALIVE_MAX_IDLE_TIMEOUT
pubsub_config.producer_config.keepalive_pool_size = pubsub_config.producer_config.keepalive_pool_size or constants.KEEPALIVE_POLL_SIZE

pubsub_config.oauth_config.oauth_base_uri = pubsub_config.oauth_config.oauth_base_uri or constants.OAUTH_BASE_URI
pubsub_config.oauth_config.oauth_scopes = pubsub_config.oauth_config.oauth_scopes or constants.OAUTH_SCOPES
pubsub_config.oauth_config.oauth_token_dict = pubsub_config.oauth_config.oauth_token_dict or constants.OAUTH_TOKEN_DICT
if pubsub_config.oauth_config ~= nil then
pubsub_config.oauth_config.oauth_base_uri = pubsub_config.oauth_config.oauth_base_uri or constants.OAUTH_BASE_URI
pubsub_config.oauth_config.oauth_scopes = pubsub_config.oauth_config.oauth_scopes or constants.OAUTH_SCOPES
pubsub_config.oauth_config.oauth_token_dict = pubsub_config.oauth_config.oauth_token_dict or constants.OAUTH_TOKEN_DICT
end

if pubsub_config.workload_identity_config ~= nil then
pubsub_config.workload_identity_config.token_url = pubsub_config.workload_identity_config.token_url or constants.WORKLOAD_IDENTITY_TOKEN_URL
pubsub_config.workload_identity_config.token_dict = pubsub_config.workload_identity_config.token_dict or constants.OAUTH_TOKEN_DICT
end

return pubsub_config
end

local function validate_oauth_config(self, oauth_config)
if oauth_config == nil or oauth_config == {} then
return false, "Oauth Config not provided"
end

if oauth_config.service_account_key_path == nil then
return false, "Service Account key Path not provided"
end

return true, nil
end

local function validate_workload_identity_config(self, workload_identity_config)
if workload_identity_config == nil or workload_identity_config == {} then
return false, "Workload Identity Config not provided"
end

return true, nil
end

-- Check if necessary config is provided
local function validate_config(self, pubsub_config)
if not pubsub_config.project_id then
Expand All @@ -226,12 +255,11 @@ local function validate_config(self, pubsub_config)
end

if not pubsub_config.is_emulator then
if pubsub_config.oauth_config == nil or pubsub_config.oauth_config == {} then
return false, "Oauth Config not provided"
end
local valid_oauth_config, oauth_config_validation_error = validate_oauth_config(self, pubsub_config.oauth_config)
local valid_workload_identity_config, workload_identity_config_validation_error = validate_workload_identity_config(self, pubsub_config.workload_identity_config)

if pubsub_config.oauth_config.service_account_key_path == nil then
return false, "Service Account key Path not provided"
if not valid_oauth_config and not valid_workload_identity_config then
return false, oauth_config_validation_error or workload_identity_config_validation_error
end
end

Expand Down Expand Up @@ -270,17 +298,21 @@ function _M.new(self, project_id_or_config, pubsub_config, producer_config,

ngx.log(ngx.DEBUG, "Creating producer for topic: ", pubsub_config.topic)

-- Creating an instance of OAUTH 2.0 client for generating oauth token
local oauth_client = oauthclient:new(pubsub_config.oauth_config, pubsub_config.topic)
local auth_client

if pubsub_config.oauth_config ~= nil then
auth_client = oauthclient:new(pubsub_config.oauth_config, pubsub_config.topic)
elseif pubsub_config.workload_identity_config ~= nil then
auth_client = workloadidentityclient:new(pubsub_config.workload_identity_config, pubsub_config.topic)
end

local instance = {
producer_config = pubsub_config.producer_config,
success_callback = pubsub_config.success_callback,
error_callback = pubsub_config.error_callback,
last_flush = ngx.time(), -- We also need to track when the last batch flush was occured
ring_buffer = ringbuffer:new(pubsub_config.producer_config.max_batch_size, pubsub_config.producer_config.max_buffering), -- For storing buffered data
request = request:new(pubsub_config, oauth_client), -- For sending request to pubsub domain
oauth_client = oauth_client
request = request:new(pubsub_config, auth_client), -- For sending request to pubsub domain
}

_timer_flush(nil, instance, pubsub_config.producer_config.timer_interval)
Expand Down
15 changes: 8 additions & 7 deletions lib/resty/pubsub/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@ local setmetatable = setmetatable
local _M = {}
local mt = { __index = _M }

function _M.new(self, pubsub_config, oauth_client)
function _M.new(self, pubsub_config, auth_client)

local instance = {
project_id = pubsub_config.project_id,
pubsub_topic = pubsub_config.topic,
pubsub_base_domain = pubsub_config.pubsub_base_domain,
pubsub_base_port = pubsub_config.pubsub_base_port,
is_emulator = pubsub_config.is_emulator,
disable_ssl = pubsub_config.disable_ssl,
http_timeout = pubsub_config.producer_config.http_timeout,
keepalive_max_idle_timeout = pubsub_config.producer_config.keepalive_max_idle_timeout,
keepalive_pool_size = pubsub_config.producer_config.keepalive_pool_size,
oauth_client = oauth_client
auth_client = auth_client
}

return setmetatable(instance, mt)
Expand All @@ -58,11 +59,11 @@ function _M.batch_send(self, encoded_messages)
}

if not self.is_emulator then
local oauth_token, oauth_err = self.oauth_client:get_oauth_token()
if oauth_err ~= nil then
return false, self.pubsub_topic, oauth_err, encoded_messages
local auth_token, auth_err = self.auth_client:get_token()
if auth_err ~= nil then
return false, self.pubsub_topic, auth_err, encoded_messages
end
requestBody["headers"]["Authorization"] = "Bearer " .. oauth_token
requestBody["headers"]["Authorization"] = "Bearer " .. auth_token
end

local httpc = http.new()
Expand All @@ -77,7 +78,7 @@ function _M.batch_send(self, encoded_messages)
return false, self.pubsub_topic, connect_err, encoded_messages
end

if not self.is_emulator then
if not self.is_emulator and not self.disable_ssl then
local handshake_res, handshake_err = httpc:ssl_handshake(nil, self.pubsub_base_domain, false)
if not handshake_res then
ngx.log(ngx.ERR, "Got error in handshake: ", handshake_err)
Expand Down
88 changes: 88 additions & 0 deletions lib/resty/pubsub/workload_identity_client.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
--[[
The MIT License (MIT)

Copyright (c) 2020 Wingify Software Pvt. Ltd.

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
]]

local ngx = require "ngx"
local cjson = require "cjson"
local io = require "io"
local resty_rsa = require "resty.rsa"
local http = require "resty.http"
local constants = require "resty.pubsub.constants"

local setmetatable = setmetatable

local _M = {}
local mt = { __index = _M }

function _M.new(self, workload_identity_config, topic)

local instance = {
topic = topic,
token_url = workload_identity_config.token_url,
token_expires = 0, -- We need to maintain token expiry time so that we can update it before expiring
token_dict = workload_identity_config.token_dict
}

return setmetatable(instance, mt)
end

-- A method which sets the token to a lua dictionary.
local function token_setter(self, token, expires_in)
self.token_dict:set("token:" .. self.topic, token)
end

-- A method which gets token from a lua dictionary.
local function token_getter(self)
return self.token_dict:get("token:" .. self.topic)
end

function _M.get_token(self)

if self.token_dict == nil then
return nil, "Provided token lua dictionary not found, please refer to documentation for adding it to nginx configuration"
end

local status, token = pcall(function ()
if token_getter(self) == nil or self.token_expires == nil or (ngx.time() > self.token_expires) then
local httpc = http.new()
local res, err = httpc:request_uri(self.token_url, {
headers = {
["Metadata-Flavor"] = "Google"
}
})

if not res then
return {nil, err}
end

if res.status >=400 then
return {nil, cjson.decode(res.body)}
end

local decoded_response = cjson.decode(res.body)
local token = decoded_response["access_token"]
self.token_expires = ngx.time() + decoded_response["expires_in"]

token_setter(self, token)
return {token, nil}
else
return {token_getter(self), nil}
end
end)

if not status then
return nil, token -- If something fails while executing callback, token object will comprise of the callback error
else
return table.unpack(token) -- Else return a table consisting of data & error (if any)
end
end

return _M
1 change: 1 addition & 0 deletions lua-resty-pubsub.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ build = {
modules = {
["resty.pubsub.constants"] = "lib/resty/pubsub/constants.lua",
["resty.pubsub.oauth_client"] = "lib/resty/pubsub/oauth_client.lua",
["resty.pubsub.workload_identity_client"] = "lib/resty/pubsub/workload_identity_client.lua",
["resty.pubsub.producer"] = "lib/resty/pubsub/producer.lua",
["resty.pubsub.request"] = "lib/resty/pubsub/request.lua",
["resty.pubsub.ringbuffer"] = "lib/resty/pubsub/ringbuffer.lua"
Expand Down
82 changes: 82 additions & 0 deletions t/integration_test.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use Test::Nginx::Socket "no_plan";

use Cwd qw(cwd);

my $pwd = cwd();

our $HttpConfig = qq{
lua_package_path "$pwd/lib/?.lua;;";
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
lua_shared_dict OAUTH_TOKEN 1m;
};

run_tests();

__DATA__

=== TEST 1: Create Pubsub Producer

--- http_config eval: $::HttpConfig
--- config
location = /token {
default_type application/json;
return 200 '{"access_token":"foobar", "expires_in": 3599, "token_type": "Bearer"}';
}
location = /v1/projects/test/topics/test:publish {
return 200 "ok";
}
location = /t {
content_by_lua '
local producer = require "resty.pubsub.producer"

local create_producer = function()
local pubsub_config = {
project_id = "test",
topic = "test",
pubsub_base_domain = "127.0.0.1",
pubsub_base_port = 1984,
is_emulator = false,
disable_ssl = true,
producer_config = {
max_batch_size = 1, -- number of packets
timer_interval = 1, -- in milliseconds
last_flush_interval = 1, -- in milliseconds
},
workload_identity_config = {
token_url = "http://127.0.0.1:1984/token",
token_dict = OAUTH_TOKEN
}
}

local p, err = producer:new(pubsub_config)

if err ~= nil then
return
end

pcall(function()
local ok, send_err = p:send("Some Random Text", {
attr1 = "Test1",
attr2 = "Test2"
})

if send_err ~= nil then
ngx.print("Error: ", send_err)
return
end

os.execute("sleep 1")

ngx.print("Success")
end)


end

create_producer()
';
}
--- request
GET /t
--- response_body
Success
Loading