Skip to content

Commit

Permalink
Add ability to override disable flag and actions on a rule (confluent…
Browse files Browse the repository at this point in the history
…inc#2377)

* Add ability to override disable flag and actions on a rule

* Add test
  • Loading branch information
rayokota authored Dec 10, 2024
1 parent 16456fd commit 87979d8
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 11 deletions.
47 changes: 43 additions & 4 deletions src/Confluent.SchemaRegistry/AsyncSerde.cs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ protected async Task<object> ExecuteRules(
for (int i = 0; i < rules.Count; i++)
{
Rule rule = rules[i];
if (rule.Disabled)
if (IsDisabled(rule))
{
continue;
}
Expand Down Expand Up @@ -406,21 +406,21 @@ protected async Task<object> ExecuteRules(
default:
throw new ArgumentException("Unsupported rule kind " + rule.Kind);
}
await RunAction(ctx, ruleMode, rule, message != null ? rule.OnSuccess : rule.OnFailure,
await RunAction(ctx, ruleMode, rule, message != null ? GetOnSuccess(rule) : GetOnFailure(rule),
message, null, message != null ? null : ErrorAction.ActionType,
ruleRegistry)
.ConfigureAwait(continueOnCapturedContext: false);
}
catch (RuleException ex)
{
await RunAction(ctx, ruleMode, rule, rule.OnFailure, message,
await RunAction(ctx, ruleMode, rule, GetOnFailure(rule), message,
ex, ErrorAction.ActionType, ruleRegistry)
.ConfigureAwait(continueOnCapturedContext: false);
}
}
else
{
await RunAction(ctx, ruleMode, rule, rule.OnFailure, message,
await RunAction(ctx, ruleMode, rule, GetOnFailure(rule), message,
new RuleException("Could not find rule executor of type " + rule.Type),
ErrorAction.ActionType, ruleRegistry)
.ConfigureAwait(continueOnCapturedContext: false);
Expand All @@ -429,6 +429,45 @@ await RunAction(ctx, ruleMode, rule, rule.OnFailure, message,
return message;
}

private string GetOnSuccess(Rule rule)
{
if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride))
{
if (ruleOverride.OnSuccess != null)
{
return ruleOverride.OnSuccess;
}
}

return rule.OnSuccess;
}

private string GetOnFailure(Rule rule)
{
if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride))
{
if (ruleOverride.OnFailure != null)
{
return ruleOverride.OnFailure;
}
}

return rule.OnFailure;
}

private bool IsDisabled(Rule rule)
{
if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride))
{
if (ruleOverride.Disabled.HasValue)
{
return ruleOverride.Disabled.Value;
}
}

return rule.Disabled;
}

private static IRuleExecutor GetRuleExecutor(RuleRegistry ruleRegistry, string type)
{
if (ruleRegistry.TryGetExecutor(type, out IRuleExecutor result))
Expand Down
40 changes: 40 additions & 0 deletions src/Confluent.SchemaRegistry/RuleOverride.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

namespace Confluent.SchemaRegistry
{
/// <summary>
/// A rule override.
/// </summary>
public class RuleOverride
{
public string Type { get; set; }

public string OnSuccess { get; set; }

public string OnFailure { get; set; }

public bool? Disabled { get; set; }

public RuleOverride(string type, string onSuccess, string onFailure, bool? disabled)
{
Type = type;
OnSuccess = onSuccess;
OnFailure = onFailure;
Disabled = disabled;
}
}
}
54 changes: 49 additions & 5 deletions src/Confluent.SchemaRegistry/RuleRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,16 @@ public class RuleRegistry
{
private readonly SemaphoreSlim ruleExecutorsMutex = new SemaphoreSlim(1);
private readonly SemaphoreSlim ruleActionsMutex = new SemaphoreSlim(1);
private readonly SemaphoreSlim ruleOverridesMutex = new SemaphoreSlim(1);

private IDictionary<string, IRuleExecutor> ruleExecutors = new Dictionary<string, IRuleExecutor>();
private IDictionary<string, IRuleAction> ruleActions = new Dictionary<string, IRuleAction>();
private IDictionary<string, RuleOverride> ruleOverrides = new Dictionary<string, RuleOverride>();

private static readonly RuleRegistry GLOBAL_INSTANCE = new RuleRegistry();

public static RuleRegistry GlobalInstance => GLOBAL_INSTANCE;

public static List<IRuleAction> GetRuleActions()
{
return GlobalInstance.GetActions();
}

public void RegisterExecutor(IRuleExecutor executor)
{
ruleExecutorsMutex.Wait();
Expand Down Expand Up @@ -123,6 +120,48 @@ public List<IRuleAction> GetActions()
}
}

public void RegisterOverride(RuleOverride ruleOverride)
{
ruleOverridesMutex.Wait();
try
{
if (!ruleOverrides.ContainsKey(ruleOverride.Type))
{
ruleOverrides.Add(ruleOverride.Type, ruleOverride);
}
}
finally
{
ruleOverridesMutex.Release();
}
}

public bool TryGetOverride(string name, out RuleOverride ruleOverride)
{
ruleOverridesMutex.Wait();
try
{
return ruleOverrides.TryGetValue(name, out ruleOverride);
}
finally
{
ruleOverridesMutex.Release();
}
}

public List<RuleOverride> GetOverrides()
{
ruleOverridesMutex.Wait();
try
{
return new List<RuleOverride>(ruleOverrides.Values);
}
finally
{
ruleOverridesMutex.Release();
}
}

public static void RegisterRuleExecutor(IRuleExecutor executor)
{
GlobalInstance.RegisterExecutor(executor);
Expand All @@ -132,5 +171,10 @@ public static void RegisterRuleAction(IRuleAction action)
{
GlobalInstance.RegisterAction(action);
}

public static void RegisterRuleOverride(RuleOverride ruleOverride)
{
GlobalInstance.RegisterOverride(ruleOverride);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,12 @@ public void ISpecificRecordCELFieldTransform()
schema.RuleSet = new RuleSet(new List<Rule>(),
new List<Rule>
{
new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null,
new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null,
"typeName == 'STRING' ; value + '-suffix'", null, null, false)
}
);
store[schemaStr] = 1;
subjectStore["topic-value"] = new List<RegisteredSchema> { schema };
subjectStore["topic-value"] = new List<RegisteredSchema> { schema };
var config = new AvroSerializerConfig
{
AutoRegisterSchemas = false,
Expand All @@ -305,6 +305,46 @@ public void ISpecificRecordCELFieldTransform()
Assert.Equal(user.favorite_number, result.favorite_number);
}

[Fact]
public void ISpecificRecordCELFieldTransformDisable()
{
var schemaStr = User._SCHEMA.ToString();
var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Avro, null);
schema.RuleSet = new RuleSet(new List<Rule>(),
new List<Rule>
{
new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null,
"typeName == 'STRING' ; value + '-suffix'", null, null, false)
}
);
store[schemaStr] = 1;
subjectStore["topic-value"] = new List<RegisteredSchema> { schema };
var config = new AvroSerializerConfig
{
AutoRegisterSchemas = false,
UseLatestVersion = true
};
RuleRegistry registry = new RuleRegistry();
registry.RegisterOverride(new RuleOverride("CEL_FIELD", null, null, true));
var serializer = new AvroSerializer<User>(schemaRegistryClient, config, registry);
var deserializer = new AvroDeserializer<User>(schemaRegistryClient, null);

var user = new User
{
favorite_color = "blue",
favorite_number = 100,
name = "awesome"
};

Headers headers = new Headers();
var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;
var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;

Assert.Equal("awesome", result.name);
Assert.Equal("blue", result.favorite_color);
Assert.Equal(user.favorite_number, result.favorite_number);
}

[Fact]
public void ISpecificRecordCELFieldCondition()
{
Expand Down

0 comments on commit 87979d8

Please sign in to comment.