diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..48ad661 --- /dev/null +++ b/.clang-format @@ -0,0 +1,137 @@ +--- +Language: Cpp +# BasedOnStyle: LLVM +AccessModifierOffset: -2 +AlignAfterOpenBracket: Align +AlignConsecutiveMacros: false +AlignConsecutiveAssignments: false +AlignConsecutiveDeclarations: false +AlignEscapedNewlines: Right +AlignOperands: true +AlignTrailingComments: true +AllowAllArgumentsOnNextLine: true +AllowAllConstructorInitializersOnNextLine: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortBlocksOnASingleLine: Never +AllowShortCaseLabelsOnASingleLine: false +AllowShortFunctionsOnASingleLine: All +AllowShortLambdasOnASingleLine: All +AllowShortIfStatementsOnASingleLine: Never +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: false +AlwaysBreakTemplateDeclarations: MultiLine +BinPackArguments: true +BinPackParameters: true +BraceWrapping: + AfterCaseLabel: false + AfterClass: false + AfterControlStatement: false + AfterEnum: false + AfterFunction: false + AfterNamespace: false + AfterObjCDeclaration: false + AfterStruct: false + AfterUnion: false + AfterExternBlock: false + BeforeCatch: false + BeforeElse: false + IndentBraces: false + SplitEmptyFunction: true + SplitEmptyRecord: true + SplitEmptyNamespace: true +BreakBeforeBinaryOperators: None +BreakBeforeBraces: Attach +BreakBeforeInheritanceComma: false +BreakInheritanceList: BeforeColon +BreakBeforeTernaryOperators: true +BreakConstructorInitializersBeforeComma: false +BreakConstructorInitializers: BeforeColon +BreakAfterJavaFieldAnnotations: false +BreakStringLiterals: true +ColumnLimit: 80 +CommentPragmas: '^ IWYU pragma:' +CompactNamespaces: false +ConstructorInitializerAllOnOneLineOrOnePerLine: false +ConstructorInitializerIndentWidth: 4 +ContinuationIndentWidth: 4 +Cpp11BracedListStyle: true +DeriveLineEnding: true +DerivePointerAlignment: false +DisableFormat: false +ExperimentalAutoDetectBinPacking: false +FixNamespaceComments: true +ForEachMacros: + - foreach + - Q_FOREACH + - BOOST_FOREACH +IncludeBlocks: Preserve +IncludeCategories: + - Regex: '^"(llvm|llvm-c|clang|clang-c)/' + Priority: 2 + SortPriority: 0 + - Regex: '^(<|"(gtest|gmock|isl|json)/)' + Priority: 3 + SortPriority: 0 + - Regex: '.*' + Priority: 1 + SortPriority: 0 +IncludeIsMainRegex: '(Test)?$' +IncludeIsMainSourceRegex: '' +IndentCaseLabels: false +IndentGotoLabels: true +IndentPPDirectives: None +IndentWidth: 4 +IndentWrappedFunctionNames: false +JavaScriptQuotes: Leave +JavaScriptWrapImports: true +KeepEmptyLinesAtTheStartOfBlocks: true +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCBinPackProtocolList: Auto +ObjCBlockIndentWidth: 4 +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: true +PenaltyBreakAssignment: 2 +PenaltyBreakBeforeFirstCallParameter: 19 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakString: 1000 +PenaltyBreakTemplateDeclaration: 10 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 60 +PointerAlignment: Right +ReflowComments: true +SortIncludes: true +SortUsingDeclarations: true +SpaceAfterCStyleCast: false +SpaceAfterLogicalNot: false +SpaceAfterTemplateKeyword: true +SpaceBeforeAssignmentOperators: true +SpaceBeforeCpp11BracedList: false +SpaceBeforeCtorInitializerColon: true +SpaceBeforeInheritanceColon: true +SpaceBeforeParens: ControlStatements +SpaceBeforeRangeBasedForLoopColon: true +SpaceInEmptyBlock: false +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 1 +SpacesInAngles: false +SpacesInConditionalStatement: false +SpacesInContainerLiterals: true +SpacesInCStyleCastParentheses: false +SpacesInParentheses: false +SpacesInSquareBrackets: false +SpaceBeforeSquareBrackets: false +Standard: Latest +StatementMacros: + - Q_UNUSED + - QT_REQUIRE_VERSION +TabWidth: 8 +UseCRLF: false +UseTab: Never +... + diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml new file mode 100644 index 0000000..0c7235f --- /dev/null +++ b/.github/workflows/linter.yml @@ -0,0 +1,30 @@ +name: lint-and-build + +on: push + +jobs: + clang-lint: + name: Lint code base + runs-on: ubuntu-20.04 + + steps: + - name: Checkout code + uses: actions/checkout@v2 + - name: Run clang-format-lint + uses: DoozyX/clang-format-lint-action@v0.11 + with: + source: '.' + exclude: './build' + clangFormatVersion: 10 + inplace: True + - name: Check build + run: docker run -i --volume $GITHUB_WORKSPACE:/src/sg-bridge:z --workdir /src/sg-bridge fedora:32 /bin/sh -c 'sh ./build/build_checks.sh' + - name: Commit in-place changes based on linting recommendations + uses: EndBug/add-and-commit@v4 + with: + author_name: InfraWatch CI + author_email: robot@infra.watch + message: 'Committing clang-format changes' + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 1f9ab53..0000000 --- a/.travis.yml +++ /dev/null @@ -1,18 +0,0 @@ -# vim: noai:ts=2:sw=2:ft=yaml: -language: minimal - -git: - depth: 1 - -sudo: required - -services: - - docker - -# setup dependencies required for testing -before_install: - - docker pull fedora:32 - -# execute unit testing and code coverage -install: - - docker run -uroot --network host -it --volume $PWD:/src/sg-bridge:z --workdir /src/sg-bridge fedora:32 /bin/sh -c 'sh ./build/build_checks.sh' diff --git a/amqp_rcv_th.c b/amqp_rcv_th.c index 93b32ce..4ed666f 100644 --- a/amqp_rcv_th.c +++ b/amqp_rcv_th.c @@ -28,8 +28,10 @@ static time_t start_time; * are processed. */ static void close_all(pn_connection_t *c, app_data_t *app) { - if (c) pn_connection_close(c); - if (app->listener) pn_listener_close(app->listener); + if (c) + pn_connection_close(c); + if (app->listener) + pn_listener_close(app->listener); } static void check_condition(pn_event_t *e, pn_condition_t *cond, @@ -44,7 +46,8 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond, } /* This function handles events when we are acting as the receiver */ -static void handle_receive(app_data_t *app, pn_event_t *event, int *batch_done) { +static void handle_receive(app_data_t *app, pn_event_t *event, + int *batch_done) { /* printf("handle_receive %s\n", app->container_id);*/ *batch_done = 0; @@ -53,7 +56,8 @@ static void handle_receive(app_data_t *app, pn_event_t *event, int *batch_done) pn_link_t *l = pn_delivery_link(d); size_t size = pn_delivery_pending(d); - pn_rwbytes_t *m = rb_get_head(app->rbin); /* Append data to incoming message buffer */ + pn_rwbytes_t *m = + rb_get_head(app->rbin); /* Append data to incoming message buffer */ assert(m); ssize_t recv; // First time through m->size = 0 for a partial message... @@ -63,14 +67,14 @@ static void handle_receive(app_data_t *app, pn_event_t *event, int *batch_done) if (recv == PN_ABORTED) { printf("Message aborted\n"); fflush(stdout); - m->size = 0; /* Forget the data we accumulated */ - pn_delivery_settle(d); /* Free the delivery so we can - receive the next message */ - pn_link_flow(l, 1); /* Replace credit for aborted message */ + m->size = 0; /* Forget the data we accumulated */ + pn_delivery_settle(d); /* Free the delivery so we can + receive the next message */ + pn_link_flow(l, 1); /* Replace credit for aborted message */ } else if (recv < 0 && recv != PN_EOS) { /* Unexpected error */ pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code(recv)); - pn_link_close(l); /* Unexpected error, close the link */ + pn_link_close(l); /* Unexpected error, close the link */ } else if (!pn_delivery_partial(d)) { /* Message is complete */ // Place in the ring buffer HERE rb_put(app->rbin); @@ -83,10 +87,11 @@ static void handle_receive(app_data_t *app, pn_event_t *event, int *batch_done) int link_credit = pn_link_credit(l); int free = rb_free_size(app->rbin); int credit = free - link_credit + 1; - if ( credit > 0 ) { + if (credit > 0) { pn_link_flow(l, credit); } - if ((app->message_count > 0) && (app->sock_sent >= app->message_count)) { + if ((app->message_count > 0) && + (app->sock_sent >= app->message_count)) { close_all(pn_event_connection(event), app); exit_code = 1; @@ -102,144 +107,143 @@ static void handle_receive(app_data_t *app, pn_event_t *event, int *batch_done) */ static bool handle(app_data_t *app, pn_event_t *event, int *batch_done) { switch (pn_event_type(event)) { - case PN_DELIVERY: { - pn_link_t *l = pn_event_link(event); - if (l) { /* Only delegate link-related events */ - handle_receive(app, event, batch_done); - } - break; + case PN_DELIVERY: { + pn_link_t *l = pn_event_link(event); + if (l) { /* Only delegate link-related events */ + handle_receive(app, event, batch_done); } + break; + } - case PN_LISTENER_OPEN: { - char port[256]; /* Get the listening port */ - pn_netaddr_host_port(pn_listener_addr(pn_event_listener(event)), - NULL, 0, port, sizeof(port)); - printf("listening on %s\n", port); - fflush(stdout); - break; - } - case PN_LISTENER_ACCEPT: - pn_listener_accept2(pn_event_listener(event), NULL, NULL); - break; + case PN_LISTENER_OPEN: { + char port[256]; /* Get the listening port */ + pn_netaddr_host_port(pn_listener_addr(pn_event_listener(event)), NULL, + 0, port, sizeof(port)); + printf("listening on %s\n", port); + fflush(stdout); + break; + } + case PN_LISTENER_ACCEPT: + pn_listener_accept2(pn_event_listener(event), NULL, NULL); + break; - case PN_CONNECTION_INIT: - if (app->verbose) { - printf("PN_CONNECTION_INIT %s\n", app->container_id); - } - pn_connection_t *c = pn_event_connection(event); - pn_connection_set_container(c, app->container_id); - pn_connection_open(c); - pn_session_t *s = pn_session(c); - pn_session_open(s); - { - pn_link_t *l = pn_receiver(s, "sa_receiver"); - pn_terminus_set_address(pn_link_source(l), app->amqp_con.address); - pn_link_open(l); - /* cannot receive without granting credit: */ - pn_link_flow(l, rb_free_size(app->rbin)); - } - break; + case PN_CONNECTION_INIT: + if (app->verbose) { + printf("PN_CONNECTION_INIT %s\n", app->container_id); + } + pn_connection_t *c = pn_event_connection(event); + pn_connection_set_container(c, app->container_id); + pn_connection_open(c); + pn_session_t *s = pn_session(c); + pn_session_open(s); + { + pn_link_t *l = pn_receiver(s, "sa_receiver"); + pn_terminus_set_address(pn_link_source(l), app->amqp_con.address); + pn_link_open(l); + /* cannot receive without granting credit: */ + pn_link_flow(l, rb_free_size(app->rbin)); + } + break; - case PN_CONNECTION_BOUND: { - if (app->verbose) { - printf("PN_CONNECTION_BOUND %s\n", app->container_id); - } - /* Turn off security */ - pn_transport_t *t = pn_event_transport(event); - pn_transport_require_auth(t, false); - pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS"); - pn_transport_set_max_frame(t, app->ring_buffer_size); - break; + case PN_CONNECTION_BOUND: { + if (app->verbose) { + printf("PN_CONNECTION_BOUND %s\n", app->container_id); } - case PN_CONNECTION_LOCAL_OPEN: { - if (app->verbose) { - printf("PN_CONNECTION_LOCAL_OPEN %s\n", app->container_id); - } - break; + /* Turn off security */ + pn_transport_t *t = pn_event_transport(event); + pn_transport_require_auth(t, false); + pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS"); + pn_transport_set_max_frame(t, app->ring_buffer_size); + break; + } + case PN_CONNECTION_LOCAL_OPEN: { + if (app->verbose) { + printf("PN_CONNECTION_LOCAL_OPEN %s\n", app->container_id); } - case PN_CONNECTION_REMOTE_OPEN: { - if (app->verbose) { - printf("PN_CONNECTION_REMOTE_OPEN %s\n", app->container_id); - } - pn_connection_open( - pn_event_connection(event)); /* Complete the open */ - printf("%s ==> (%s)\n", app->container_id, app->amqp_con.url); - break; + break; + } + case PN_CONNECTION_REMOTE_OPEN: { + if (app->verbose) { + printf("PN_CONNECTION_REMOTE_OPEN %s\n", app->container_id); } + pn_connection_open(pn_event_connection(event)); /* Complete the open */ + printf("%s ==> (%s)\n", app->container_id, app->amqp_con.url); + break; + } - case PN_SESSION_LOCAL_OPEN: { - if (app->verbose) { - printf("PN_SESSION_LOCAL_OPEN %s\n", app->container_id); - } - pn_connection_t *c = pn_event_connection(event); - pn_session_t *s = pn_session(c); - pn_link_t *l = pn_receiver(s, "my_receiver"); - pn_terminus_set_address(pn_link_source(l), app->amqp_con.address); - - break; + case PN_SESSION_LOCAL_OPEN: { + if (app->verbose) { + printf("PN_SESSION_LOCAL_OPEN %s\n", app->container_id); } - case PN_SESSION_INIT: { - if (app->verbose) { - printf("PN_SESSION_INIT %s\n", app->container_id); - } - pn_session_set_incoming_capacity(pn_event_session(event), app->ring_buffer_size * app->ring_buffer_count); - pn_session_set_outgoing_window(pn_event_session(event), app->ring_buffer_count); - break; + pn_connection_t *c = pn_event_connection(event); + pn_session_t *s = pn_session(c); + pn_link_t *l = pn_receiver(s, "my_receiver"); + pn_terminus_set_address(pn_link_source(l), app->amqp_con.address); + + break; + } + case PN_SESSION_INIT: { + if (app->verbose) { + printf("PN_SESSION_INIT %s\n", app->container_id); } - case PN_SESSION_REMOTE_OPEN: { - if (app->verbose) { - printf("PN_SESSION_REMOTE_OPEN %s\n", app->container_id); - } - pn_session_open(pn_event_session(event)); - break; + pn_session_set_incoming_capacity(pn_event_session(event), + app->ring_buffer_size * + app->ring_buffer_count); + pn_session_set_outgoing_window(pn_event_session(event), + app->ring_buffer_count); + break; + } + case PN_SESSION_REMOTE_OPEN: { + if (app->verbose) { + printf("PN_SESSION_REMOTE_OPEN %s\n", app->container_id); } + pn_session_open(pn_event_session(event)); + break; + } - case PN_TRANSPORT_CLOSED: - check_condition( - event, pn_transport_condition(pn_event_transport(event)), app); - break; - - case PN_CONNECTION_REMOTE_CLOSE: - check_condition( - event, - pn_connection_remote_condition(pn_event_connection(event)), - app); - pn_connection_close( - pn_event_connection(event)); /* Return the close */ - break; - - case PN_SESSION_REMOTE_CLOSE: - check_condition( - event, pn_session_remote_condition(pn_event_session(event)), - app); - pn_session_close(pn_event_session(event)); /* Return the close */ - pn_session_free(pn_event_session(event)); - break; - - case PN_LINK_REMOTE_CLOSE: - case PN_LINK_REMOTE_DETACH: - check_condition( - event, pn_link_remote_condition(pn_event_link(event)), app); - pn_link_close(pn_event_link(event)); /* Return the close */ - pn_link_free(pn_event_link(event)); - break; - - case PN_PROACTOR_TIMEOUT: - break; - - case PN_LISTENER_CLOSE: - app->listener = NULL; /* Listener is closed */ - check_condition( - event, pn_listener_condition(pn_event_listener(event)), app); - break; - - case PN_PROACTOR_INACTIVE: - return false; - break; - - default: { - break; - } + case PN_TRANSPORT_CLOSED: + check_condition(event, + pn_transport_condition(pn_event_transport(event)), app); + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition( + event, pn_connection_remote_condition(pn_event_connection(event)), + app); + pn_connection_close(pn_event_connection(event)); /* Return the close */ + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition( + event, pn_session_remote_condition(pn_event_session(event)), app); + pn_session_close(pn_event_session(event)); /* Return the close */ + pn_session_free(pn_event_session(event)); + break; + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: + check_condition(event, pn_link_remote_condition(pn_event_link(event)), + app); + pn_link_close(pn_event_link(event)); /* Return the close */ + pn_link_free(pn_event_link(event)); + break; + + case PN_PROACTOR_TIMEOUT: + break; + + case PN_LISTENER_CLOSE: + app->listener = NULL; /* Listener is closed */ + check_condition(event, pn_listener_condition(pn_event_listener(event)), + app); + break; + + case PN_PROACTOR_INACTIVE: + return false; + break; + + default: { + break; + } } return exit_code == 0; } @@ -297,7 +301,8 @@ void *amqp_rcv_th(void *app_ptr) { if (app->standalone) { app->listener = pn_listener(); } - pn_proactor_addr(addr, sizeof(addr), app->amqp_con.host, app->amqp_con.port); + pn_proactor_addr(addr, sizeof(addr), app->amqp_con.host, + app->amqp_con.port); if (app->standalone) { pn_proactor_listen(app->proactor, app->listener, addr, LISTEN_BACKLOG); } else { diff --git a/bridge.c b/bridge.c index d271c6c..b8a0876 100644 --- a/bridge.c +++ b/bridge.c @@ -61,16 +61,46 @@ struct option_info { }; struct option_info option_info[] = { - {{"amqp_url", required_argument, 0, ARG_AMQP_URL}, "host[:port]/path", "URL of the AMQP endpoint (%s)", DEFAULT_AMQP_URL}, - {{"gw_unix", optional_argument, 0, ARG_GW_UNIX}, "/path/to/socket", "Connect to gateway with unix socket (%s)", DEFAULT_UNIX_SOCKET_PATH}, - {{"gw_inet", optional_argument, 0, ARG_GW_INET}, "host[:port]", "Connect to gateway with inet socket (%s)", DEFAULT_INET_TARGET}, - {{"block", no_argument, 0, ARG_BLOCK}, "", "Outgoing socket connection will block (%s)", DEFAULT_SOCKET_BLOCK}, - {{"rbc", required_argument, 0, ARG_RING_BUFFER_COUNT}, "4096", "Number of message buffers between AMQP and Outgoing (%s)", DEFAULT_RING_BUFFER_COUNT}, - {{"rbs", required_argument, 0, ARG_RING_BUFFER_SIZE}, "2048", "Size of a message buffer between AMQP and Outgoing (%s)", DEFAULT_RING_BUFFER_SIZE}, - {{"stat_period", required_argument, 0, ARG_STAT_PERIOD}, "period_in_seconds", "How often to print stats, 0 for no stats (%s)", DEFAULT_STATS_PERIOD}, - {{"cid", required_argument, 0, ARG_CID}, "connection_id", "AMQP container ID (should be unique) (%s)", DEFAULT_CONTAINER_ID_PATTERN}, - {{"count", required_argument, 0, ARG_COUNT}, "stop_count", "Number of AMQP mesg to rcv before exit, 0 for continous (%s)", DEFAULT_STOP_COUNT}, - {{"verbose", no_argument, 0, ARG_VERBOSE}, "", "Print extra info, multiple instance increase verbosity.", ""}, + {{"amqp_url", required_argument, 0, ARG_AMQP_URL}, + "host[:port]/path", + "URL of the AMQP endpoint (%s)", + DEFAULT_AMQP_URL}, + {{"gw_unix", optional_argument, 0, ARG_GW_UNIX}, + "/path/to/socket", + "Connect to gateway with unix socket (%s)", + DEFAULT_UNIX_SOCKET_PATH}, + {{"gw_inet", optional_argument, 0, ARG_GW_INET}, + "host[:port]", + "Connect to gateway with inet socket (%s)", + DEFAULT_INET_TARGET}, + {{"block", no_argument, 0, ARG_BLOCK}, + "", + "Outgoing socket connection will block (%s)", + DEFAULT_SOCKET_BLOCK}, + {{"rbc", required_argument, 0, ARG_RING_BUFFER_COUNT}, + "4096", + "Number of message buffers between AMQP and Outgoing (%s)", + DEFAULT_RING_BUFFER_COUNT}, + {{"rbs", required_argument, 0, ARG_RING_BUFFER_SIZE}, + "2048", + "Size of a message buffer between AMQP and Outgoing (%s)", + DEFAULT_RING_BUFFER_SIZE}, + {{"stat_period", required_argument, 0, ARG_STAT_PERIOD}, + "period_in_seconds", + "How often to print stats, 0 for no stats (%s)", + DEFAULT_STATS_PERIOD}, + {{"cid", required_argument, 0, ARG_CID}, + "connection_id", + "AMQP container ID (should be unique) (%s)", + DEFAULT_CONTAINER_ID_PATTERN}, + {{"count", required_argument, 0, ARG_COUNT}, + "stop_count", + "Number of AMQP mesg to rcv before exit, 0 for continous (%s)", + DEFAULT_STOP_COUNT}, + {{"verbose", no_argument, 0, ARG_VERBOSE}, + "", + "Print extra info, multiple instance increase verbosity.", + ""}, {{"help", no_argument, 0, ARG_HELP}, "", "Print help.", ""}}; static void usage(char *program) { @@ -79,26 +109,30 @@ static void usage(char *program) { "The missing link between AMQP and golang.\n\n", program); - int widths[] = {0,0}; + int widths[] = {0, 0}; for (int i = 0; i < (sizeof(option_info) / sizeof(option_info[0])); i++) { int len; - if ( (len = strlen(option_info[i].lopt.name)) > widths[0] ) { + if ((len = strlen(option_info[i].lopt.name)) > widths[0]) { widths[0] = len; } - if ( (len = strlen(option_info[i].arg_example)) > widths[1] ) { + if ((len = strlen(option_info[i].arg_example)) > widths[1]) { widths[1] = len; } } fprintf(stdout, "args:\n"); for (int i = 0; i < (sizeof(option_info) / sizeof(option_info[0])); i++) { char help_buffer[200]; - snprintf(help_buffer, sizeof(help_buffer), option_info[i].arg_help, option_info[i].arg_default); - fprintf(stdout, " --%-*s %-*s %s\n", widths[0], option_info[i].lopt.name, widths[1], option_info[i].arg_example, help_buffer); + snprintf(help_buffer, sizeof(help_buffer), option_info[i].arg_help, + option_info[i].arg_default); + fprintf(stdout, " --%-*s %-*s %s\n", widths[0], + option_info[i].lopt.name, widths[1], option_info[i].arg_example, + help_buffer); } } -static int match_regex(char *regmatch, char *matches[], int n_matches, const char *to_match) { +static int match_regex(char *regmatch, char *matches[], int n_matches, + const char *to_match) { /* "M" contains the matches found. */ regmatch_t m[n_matches]; regex_t regex; @@ -123,7 +157,7 @@ static int match_regex(char *regmatch, char *matches[], int n_matches, const cha int match_len = m[i].rm_eo - m[i].rm_so; - matches[i] = malloc(match_len + 1); // make room for '\0' + matches[i] = malloc(match_len + 1); // make room for '\0' int k = 0; for (int j = m[i].rm_so; j < m[i].rm_eo; j++) { @@ -164,74 +198,73 @@ int main(int argc, char **argv) { longopts[i] = option_info[i].lopt; } - while ((opt = getopt_long(argc, argv, "hv", - longopts, &index)) != -1) { + while ((opt = getopt_long(argc, argv, "hv", longopts, &index)) != -1) { switch (opt) { - case ARG_BLOCK: - app.socket_flags ^= MSG_DONTWAIT; - break; - case ARG_AMQP_URL: - app.amqp_con.url = strdup(optarg); - break; - case ARG_GW_UNIX: - if (optarg != NULL) { - app.unix_socket_name = optarg; + case ARG_BLOCK: + app.socket_flags ^= MSG_DONTWAIT; + break; + case ARG_AMQP_URL: + app.amqp_con.url = strdup(optarg); + break; + case ARG_GW_UNIX: + if (optarg != NULL) { + app.unix_socket_name = optarg; + } + app.domain = AF_UNIX; + break; + case ARG_RING_BUFFER_COUNT: + if (optarg != NULL) { + app.ring_buffer_count = atoi(optarg); + } + break; + case ARG_RING_BUFFER_SIZE: + if (optarg != NULL) { + app.ring_buffer_size = atoi(optarg); + } + break; + case ARG_GW_INET: + if (optarg != NULL) { + char *matches[4]; + if (match_regex("^([^:]*)(:([0-9]+))*$", matches, 4, optarg) <= + 0) { + fprintf(stderr, "Invalid INET address: %s", optarg); + exit(1); } - app.domain = AF_UNIX; - break; - case ARG_RING_BUFFER_COUNT: - if (optarg != NULL) { - app.ring_buffer_count = atoi(optarg); - } - break; - case ARG_RING_BUFFER_SIZE: - if (optarg != NULL) { - app.ring_buffer_size = atoi(optarg); - } - break; - case ARG_GW_INET: - if (optarg != NULL) { - char *matches[4]; - if (match_regex("^([^:]*)(:([0-9]+))*$", matches, 4, optarg) <= 0) { - fprintf(stderr, "Invalid INET address: %s", optarg); - exit(1); - } - app.peer_host = matches[2]; - app.peer_port = matches[3]; - } - app.domain = AF_INET; - break; - case ARG_CID: - strncpy(cid_buf, optarg, sizeof(cid_buf) - 1); - break; - case ARG_COUNT: - app.message_count = atoi(optarg); - break; - case ARG_STANDALONE: - app.standalone = 1; - break; - case ARG_STAT_PERIOD: - app.stat_period = atoi(optarg); - break; - case ARG_VERBOSE: - case 'v': - app.verbose++; - break; - case 'h': - case ARG_HELP: - usage(argv[0]); - return 0; - default: - usage(argv[0]); - return 1; + app.peer_host = matches[2]; + app.peer_port = matches[3]; + } + app.domain = AF_INET; + break; + case ARG_CID: + strncpy(cid_buf, optarg, sizeof(cid_buf) - 1); + break; + case ARG_COUNT: + app.message_count = atoi(optarg); + break; + case ARG_STANDALONE: + app.standalone = 1; + break; + case ARG_STAT_PERIOD: + app.stat_period = atoi(optarg); + break; + case ARG_VERBOSE: + case 'v': + app.verbose++; + break; + case 'h': + case ARG_HELP: + usage(argv[0]); + return 0; + default: + usage(argv[0]); + return 1; } } char *matches[10]; memset(matches, 0, sizeof(matches)); - match_regex(AMQP_URL_REGEX, - matches, 10, app.amqp_con.url); + match_regex(AMQP_URL_REGEX, matches, 10, app.amqp_con.url); if (matches[2] != NULL) { app.amqp_con.user = strdup(matches[2]); } @@ -239,7 +272,7 @@ int main(int argc, char **argv) { app.amqp_con.password = strdup(matches[4]); } if (matches[5] == NULL || matches[8] == NULL) { - fprintf(stderr,"Invalid AMQP URL: %s", app.amqp_con.url); + fprintf(stderr, "Invalid AMQP URL: %s", app.amqp_con.url); exit(1); } app.amqp_con.host = strdup(matches[5]); @@ -247,11 +280,10 @@ int main(int argc, char **argv) { if (matches[7] != NULL) { app.amqp_con.port = strdup(matches[7]); } - if (app.standalone) { printf("Standalone mode\n"); - } + } app.rbin = rb_alloc(app.ring_buffer_count, app.ring_buffer_size); @@ -270,12 +302,14 @@ int main(int argc, char **argv) { while (1) { sleep(1); if (sleep_count == app.stat_period) { - printf("in: %ld(%ld), amqp_overrun: %ld(%ld), out: %ld(%ld), sock_overrun: %ld(%ld), batch_size: %f\n", + printf("in: %ld(%ld), amqp_overrun: %ld(%ld), out: %ld(%ld), " + "sock_overrun: %ld(%ld), batch_size: %f\n", app.amqp_received, app.amqp_received - last_amqp_received, app.rbin->overruns, app.rbin->overruns - last_overrun, app.sock_sent, app.sock_sent - last_out, - app.sock_would_block, app.sock_would_block - last_sock_overrun, - app.amqp_received/(float)app.amqp_total_batches); + app.sock_would_block, + app.sock_would_block - last_sock_overrun, + app.amqp_received / (float)app.amqp_total_batches); sleep_count = 0; } sleep_count++; diff --git a/bridge.h b/bridge.h index 0f3e558..8f841e6 100644 --- a/bridge.h +++ b/bridge.h @@ -16,7 +16,7 @@ #define DEFAULT_INET_HOST "127.0.0.1" #define DEFAULT_INET_PORT "30000" #define DEFAULT_INET_TARGET DEFAULT_INET_HOST ":" DEFAULT_INET_PORT -#define DEFAULT_CID "bridge-%x" +#define DEFAULT_CID "bridge-%x" #define DEFAULT_CONTAINER_ID_PATTERN "sa-%x" #define DEFAULT_STATS_PERIOD "0" #define DEFAULT_SOCKET_BLOCK "false" @@ -24,7 +24,8 @@ #define DEFAULT_RING_BUFFER_COUNT "5000" #define DEFAULT_RING_BUFFER_SIZE "2048" -#define AMQP_URL_REGEX "^amqp://(([a-z]+)(:([a-z]+))*@)*([a-zA-Z_0-9.-]+)(:([0-9]+))*(.+)$" +#define AMQP_URL_REGEX \ + "^amqp://(([a-z]+)(:([a-z]+))*@)*([a-zA-Z_0-9.-]+)(:([0-9]+))*(.+)$" //#define AMQP_URL_REGEX "^amqp://" typedef struct { @@ -36,12 +37,11 @@ typedef struct { char *url; } amqp_connection; - -typedef struct { +typedef struct { // Parameters section int standalone; int verbose; - int domain; // connection to SG, AF_UNIX || AF_INET + int domain; // connection to SG, AF_UNIX || AF_INET int stat_period; int ring_buffer_size; int ring_buffer_count; @@ -54,13 +54,13 @@ typedef struct { char *peer_host, *peer_port; - // Runtime + // Runtime pthread_t amqp_rcv_th; pthread_t socket_snd_th; int amqp_rcv_th_running; int socket_snd_th_running; - + pn_proactor_t *proactor; pn_listener_t *listener; pn_rwbytes_t msgout; /* Buffers for incoming/outgoing messages */ diff --git a/rb.c b/rb.c index e40cf08..84de234 100644 --- a/rb.c +++ b/rb.c @@ -104,8 +104,7 @@ pn_rwbytes_t *rb_put(rb_rwbytes_t *rb) { rb->ring_buffer[rb->head].size = 0; } - - return next_buffer; // May be NULL + return next_buffer; // May be NULL } pn_rwbytes_t *rb_get(rb_rwbytes_t *rb) { @@ -117,9 +116,9 @@ pn_rwbytes_t *rb_get(rb_rwbytes_t *rb) { next = (rb->tail + 1) % rb->count; while (next == rb->head) { - pthread_mutex_lock(&rb->rb_mutex); - pthread_cond_wait(&rb->rb_ready, &rb->rb_mutex); - pthread_mutex_unlock(&rb->rb_mutex); + pthread_mutex_lock(&rb->rb_mutex); + pthread_cond_wait(&rb->rb_ready, &rb->rb_mutex); + pthread_mutex_unlock(&rb->rb_mutex); next = (rb->tail + 1) % rb->count; rb->queue_block++; @@ -134,9 +133,7 @@ pn_rwbytes_t *rb_get(rb_rwbytes_t *rb) { return &rb->ring_buffer[rb->tail]; } -int rb_inuse_size(rb_rwbytes_t *rb) { - return rb->count - rb_free_size(rb); -} +int rb_inuse_size(rb_rwbytes_t *rb) { return rb->count - rb_free_size(rb); } int rb_free_size(rb_rwbytes_t *rb) { assert(rb->head != rb->tail); @@ -144,21 +141,13 @@ int rb_free_size(rb_rwbytes_t *rb) { int head = rb->head; int tail = rb->tail; - return head > tail ? rb->count - (head - tail) : tail - head ; + return head > tail ? rb->count - (head - tail) : tail - head; } -int rb_size(rb_rwbytes_t *rb) { - return rb->count; -} +int rb_size(rb_rwbytes_t *rb) { return rb->count; } -long rb_get_overruns(rb_rwbytes_t *rb) { - return rb->overruns; -} +long rb_get_overruns(rb_rwbytes_t *rb) { return rb->overruns; } -long rb_get_processed(rb_rwbytes_t *rb) { - return rb->processed; -} +long rb_get_processed(rb_rwbytes_t *rb) { return rb->processed; } -long rb_get_queue_block(rb_rwbytes_t *rb) { - return rb->queue_block; -} \ No newline at end of file +long rb_get_queue_block(rb_rwbytes_t *rb) { return rb->queue_block; } \ No newline at end of file diff --git a/rb.h b/rb.h index 8ed585f..1336348 100644 --- a/rb.h +++ b/rb.h @@ -2,8 +2,8 @@ #ifndef _RB_H #define _RB_H 1 -#include #include +#include #include @@ -34,13 +34,13 @@ typedef struct { extern rb_rwbytes_t *rb_alloc(int count, int buf_size); -extern pn_rwbytes_t *rb_get_head (rb_rwbytes_t *rb); +extern pn_rwbytes_t *rb_get_head(rb_rwbytes_t *rb); -extern pn_rwbytes_t *rb_get_tail( rb_rwbytes_t *rb); +extern pn_rwbytes_t *rb_get_tail(rb_rwbytes_t *rb); -extern pn_rwbytes_t *rb_put( rb_rwbytes_t *rb); +extern pn_rwbytes_t *rb_put(rb_rwbytes_t *rb); -extern pn_rwbytes_t *rb_get( rb_rwbytes_t *rb); +extern pn_rwbytes_t *rb_get(rb_rwbytes_t *rb); extern void rb_free(rb_rwbytes_t *rb); diff --git a/socket_snd_th.c b/socket_snd_th.c index df243d5..1310619 100644 --- a/socket_snd_th.c +++ b/socket_snd_th.c @@ -50,24 +50,22 @@ static int prepare_send_socket_inet(app_data_t *app) { memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC, - hints.ai_socktype = SOCK_DGRAM, - hints.ai_protocol = 0, - hints.ai_flags = AI_ADDRCONFIG; + hints.ai_family = AF_UNSPEC, hints.ai_socktype = SOCK_DGRAM, + hints.ai_protocol = 0, hints.ai_flags = AI_ADDRCONFIG; - int err = getaddrinfo(app->peer_host, app->peer_port, &hints, &peer_addrinfo); + int err = + getaddrinfo(app->peer_host, app->peer_port, &hints, &peer_addrinfo); if (err != 0) { - fprintf( - stderr, - "%s: getaddrinfo returned non-zero value: %d\n", __func__, - errno); + fprintf(stderr, "%s: getaddrinfo returned non-zero value: %d\n", + __func__, errno); perror("Error"); freeaddrinfo(peer_addrinfo); return -1; } - app->send_sock = socket(peer_addrinfo->ai_family, peer_addrinfo->ai_socktype, - peer_addrinfo->ai_protocol); + app->send_sock = + socket(peer_addrinfo->ai_family, peer_addrinfo->ai_socktype, + peer_addrinfo->ai_protocol); if (app->send_sock == -1) { fprintf(stderr, "%s: socket returned -1\n", __func__); perror("Error"); @@ -80,9 +78,11 @@ static int prepare_send_socket_inet(app_data_t *app) { void *ptr = &((struct sockaddr_in *)peer_addrinfo->ai_addr)->sin_addr; inet_ntop(peer_addrinfo->ai_family, ptr, addrstr, sizeof(addrstr)); - - printf("%s ==> (%s:%d)\n", app->container_id, addrstr, - ntohs((((struct sockaddr_in *)((struct sockaddr *)peer_addrinfo->ai_addr))->sin_port))); + printf( + "%s ==> (%s:%d)\n", app->container_id, addrstr, + ntohs( + (((struct sockaddr_in *)((struct sockaddr *)peer_addrinfo->ai_addr)) + ->sin_port))); memcpy(&app->sa, peer_addrinfo->ai_addr, peer_addrinfo->ai_addrlen); app->sa_len = peer_addrinfo->ai_addrlen; @@ -100,26 +100,26 @@ static int process_message_binary(app_data_t *app, pn_data_t *body) { if (sent_bytes <= 0) { // MSG_DONTWAIT is set switch (errno) { - case EAGAIN: - // Normal backup - app->sock_would_block++; - break; - case EBADF: - case ENOTSOCK: - // sockfd is not a valid file descriptor - // TODO reopen socket - perror("SG Send"); - return 1; - break; - case ECONNREFUSED: - break; - default: - perror("SG Send"); - printf("%d ",errno); - return 1; + case EAGAIN: + // Normal backup + app->sock_would_block++; + break; + case EBADF: + case ENOTSOCK: + // sockfd is not a valid file descriptor + // TODO reopen socket + perror("SG Send"); + return 1; + break; + case ECONNREFUSED: + break; + default: + perror("SG Send"); + printf("%d ", errno); + return 1; } } else { - app->sock_sent++; + app->sock_sent++; } } return 0; @@ -198,25 +198,25 @@ void *socket_snd_th(void *app_ptr) { // Create the send socket switch (app->domain) { - case AF_UNIX: - if (prepare_send_socket_unix(app) == -1) { - fprintf(stderr, "Failed to create socket... exiting!"); - return NULL; - } - break; + case AF_UNIX: + if (prepare_send_socket_unix(app) == -1) { + fprintf(stderr, "Failed to create socket... exiting!"); + return NULL; + } + break; - case AF_INET: - if (prepare_send_socket_inet(app) == -1) { - fprintf(stderr, "Failed to create socket... exiting!"); - return NULL; - } - break; + case AF_INET: + if (prepare_send_socket_inet(app) == -1) { + fprintf(stderr, "Failed to create socket... exiting!"); + return NULL; + } + break; - default: - fprintf(stderr, "Unknown domain type: %d", app->domain); - break; + default: + fprintf(stderr, "Unknown domain type: %d", app->domain); + break; } - + clock_gettime(CLOCK_MONOTONIC, &app->rbin->total_t2); while (1) { diff --git a/utils.c b/utils.c index 879202e..a88c460 100644 --- a/utils.c +++ b/utils.c @@ -2,33 +2,29 @@ #include void time_diff(struct timespec t1, struct timespec t2, struct timespec *diff) { - if(t2.tv_nsec < t1.tv_nsec) - { - /* If nanoseconds in t1 are larger than nanoseconds in t2, it - means that something like the following happened: - t1.tv_sec = 1000 t1.tv_nsec = 100000 - t2.tv_sec = 1001 t2.tv_nsec = 10 - In this case, less than a second has passed but subtracting - the tv_sec parts will indicate that 1 second has passed. To - fix this problem, we subtract 1 second from the elapsed - tv_sec and add one second to the elapsed tv_nsec. See - below: - */ - diff->tv_sec += t2.tv_sec - t1.tv_sec - 1; - diff->tv_nsec += t2.tv_nsec - t1.tv_nsec + 1000000000; - } - else - { - diff->tv_sec += t2.tv_sec - t1.tv_sec; - diff->tv_nsec += t2.tv_nsec - t1.tv_nsec; - } - + if (t2.tv_nsec < t1.tv_nsec) { + /* If nanoseconds in t1 are larger than nanoseconds in t2, it + means that something like the following happened: + t1.tv_sec = 1000 t1.tv_nsec = 100000 + t2.tv_sec = 1001 t2.tv_nsec = 10 + In this case, less than a second has passed but subtracting + the tv_sec parts will indicate that 1 second has passed. To + fix this problem, we subtract 1 second from the elapsed + tv_sec and add one second to the elapsed tv_nsec. See + below: + */ + diff->tv_sec += t2.tv_sec - t1.tv_sec - 1; + diff->tv_nsec += t2.tv_nsec - t1.tv_nsec + 1000000000; + } else { + diff->tv_sec += t2.tv_sec - t1.tv_sec; + diff->tv_nsec += t2.tv_nsec - t1.tv_nsec; + } } char *time_snprintf(char *buf, size_t n, struct timespec t1) { - double pct = (t1.tv_sec * 1000000000.0) + t1.tv_nsec / 1000000000.0; + double pct = (t1.tv_sec * 1000000000.0) + t1.tv_nsec / 1000000000.0; - snprintf(buf, n, "%f", pct); + snprintf(buf, n, "%f", pct); - return buf; + return buf; }