diff --git a/.github/workflows/t1-failure-no-template.yml b/.github/workflows/t1-failure-no-template.yml new file mode 100644 index 0000000..0a3ce0f --- /dev/null +++ b/.github/workflows/t1-failure-no-template.yml @@ -0,0 +1,18 @@ +name: 't1-failure-no-template' +on: + workflow_dispatch: + +jobs: + testJob: + runs-on: [ + "self-hosted", + "anka", + "anka-template:c092c6f6-198c-470f-9526-9c998efe7ab5", + "anka-template-tag:vanilla+port-forward-22+brew-git", + "run-id:${{ github.run_id }}", + "unique-id:1" + ] + steps: + - uses: actions/checkout@v3 + - run: | + echo "hello" \ No newline at end of file diff --git a/.github/workflows/t1-with-tag-1-failure.yml b/.github/workflows/t1-failure-tag-1-in-vm.yml similarity index 92% rename from .github/workflows/t1-with-tag-1-failure.yml rename to .github/workflows/t1-failure-tag-1-in-vm.yml index 92e47f3..bae920f 100644 --- a/.github/workflows/t1-with-tag-1-failure.yml +++ b/.github/workflows/t1-failure-tag-1-in-vm.yml @@ -1,4 +1,4 @@ -name: 't1-with-tag-1-failure' +name: 't1-failure-tag-1-in-vm' on: workflow_dispatch: diff --git a/.github/workflows/t2-dual-without-tag.yml b/.github/workflows/t2-dual-without-tag.yml index c330388..8bb3f65 100644 --- a/.github/workflows/t2-dual-without-tag.yml +++ b/.github/workflows/t2-dual-without-tag.yml @@ -7,7 +7,7 @@ jobs: runs-on: [ "self-hosted", "anka", - "anka-template:ae1d3b9b-314e-44bc-9e6b-0b8e2bbc823b", + "anka-template:d792c6f6-198c-470f-9526-9c998efe7ab4", "run-id:${{ github.run_id }}", "unique-id:1" ] @@ -22,7 +22,7 @@ jobs: runs-on: [ "self-hosted", "anka", - "anka-template:ae1d3b9b-314e-44bc-9e6b-0b8e2bbc823b", + "anka-template:c0847bc9-5d2d-4dbc-ba6a-240f7ff08032", "run-id:${{ github.run_id }}", "unique-id:2" ] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a88f45c --- /dev/null +++ b/LICENSE @@ -0,0 +1,557 @@ + Server Side Public License + VERSION 1, OCTOBER 16, 2018 + + Copyright © 2024 Veertu Inc. + + Everyone is permitted to copy and distribute verbatim copies of this + license document, but changing it is not allowed. + + TERMS AND CONDITIONS + + 0. Definitions. + + “This License” refers to Server Side Public License. + + “Copyright” also means copyright-like laws that apply to other kinds of + works, such as semiconductor masks. + + “The Program” refers to any copyrightable work licensed under this + License. Each licensee is addressed as “you”. “Licensees” and + “recipients” may be individuals or organizations. + + To “modify” a work means to copy from or adapt all or part of the work in + a fashion requiring copyright permission, other than the making of an + exact copy. The resulting work is called a “modified version” of the + earlier work or a work “based on” the earlier work. + + A “covered work” means either the unmodified Program or a work based on + the Program. + + To “propagate” a work means to do anything with it that, without + permission, would make you directly or secondarily liable for + infringement under applicable copyright law, except executing it on a + computer or modifying a private copy. Propagation includes copying, + distribution (with or without modification), making available to the + public, and in some countries other activities as well. + + To “convey” a work means any kind of propagation that enables other + parties to make or receive copies. Mere interaction with a user through a + computer network, with no transfer of a copy, is not conveying. + + An interactive user interface displays “Appropriate Legal Notices” to the + extent that it includes a convenient and prominently visible feature that + (1) displays an appropriate copyright notice, and (2) tells the user that + there is no warranty for the work (except to the extent that warranties + are provided), that licensees may convey the work under this License, and + how to view a copy of this License. If the interface presents a list of + user commands or options, such as a menu, a prominent item in the list + meets this criterion. + + 1. Source Code. + + The “source code” for a work means the preferred form of the work for + making modifications to it. “Object code” means any non-source form of a + work. + + A “Standard Interface” means an interface that either is an official + standard defined by a recognized standards body, or, in the case of + interfaces specified for a particular programming language, one that is + widely used among developers working in that language. The “System + Libraries” of an executable work include anything, other than the work as + a whole, that (a) is included in the normal form of packaging a Major + Component, but which is not part of that Major Component, and (b) serves + only to enable use of the work with that Major Component, or to implement + a Standard Interface for which an implementation is available to the + public in source code form. A “Major Component”, in this context, means a + major essential component (kernel, window system, and so on) of the + specific operating system (if any) on which the executable work runs, or + a compiler used to produce the work, or an object code interpreter used + to run it. + + The “Corresponding Source” for a work in object code form means all the + source code needed to generate, install, and (for an executable work) run + the object code and to modify the work, including scripts to control + those activities. However, it does not include the work's System + Libraries, or general-purpose tools or generally available free programs + which are used unmodified in performing those activities but which are + not part of the work. For example, Corresponding Source includes + interface definition files associated with source files for the work, and + the source code for shared libraries and dynamically linked subprograms + that the work is specifically designed to require, such as by intimate + data communication or control flow between those subprograms and other + parts of the work. + + The Corresponding Source need not include anything that users can + regenerate automatically from other parts of the Corresponding Source. + + The Corresponding Source for a work in source code form is that same work. + + 2. Basic Permissions. + + All rights granted under this License are granted for the term of + copyright on the Program, and are irrevocable provided the stated + conditions are met. This License explicitly affirms your unlimited + permission to run the unmodified Program, subject to section 13. The + output from running a covered work is covered by this License only if the + output, given its content, constitutes a covered work. This License + acknowledges your rights of fair use or other equivalent, as provided by + copyright law. Subject to section 13, you may make, run and propagate + covered works that you do not convey, without conditions so long as your + license otherwise remains in force. You may convey covered works to + others for the sole purpose of having them make modifications exclusively + for you, or provide you with facilities for running those works, provided + that you comply with the terms of this License in conveying all + material for which you do not control copyright. Those thus making or + running the covered works for you must do so exclusively on your + behalf, under your direction and control, on terms that prohibit them + from making any copies of your copyrighted material outside their + relationship with you. + + Conveying under any other circumstances is permitted solely under the + conditions stated below. Sublicensing is not allowed; section 10 makes it + unnecessary. + + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. + + No covered work shall be deemed part of an effective technological + measure under any applicable law fulfilling obligations under article 11 + of the WIPO copyright treaty adopted on 20 December 1996, or similar laws + prohibiting or restricting circumvention of such measures. + + When you convey a covered work, you waive any legal power to forbid + circumvention of technological measures to the extent such circumvention is + effected by exercising rights under this License with respect to the + covered work, and you disclaim any intention to limit operation or + modification of the work as a means of enforcing, against the work's users, + your or third parties' legal rights to forbid circumvention of + technological measures. + + 4. Conveying Verbatim Copies. + + You may convey verbatim copies of the Program's source code as you + receive it, in any medium, provided that you conspicuously and + appropriately publish on each copy an appropriate copyright notice; keep + intact all notices stating that this License and any non-permissive terms + added in accord with section 7 apply to the code; keep intact all notices + of the absence of any warranty; and give all recipients a copy of this + License along with the Program. You may charge any price or no price for + each copy that you convey, and you may offer support or warranty + protection for a fee. + + 5. Conveying Modified Source Versions. + + You may convey a work based on the Program, or the modifications to + produce it from the Program, in the form of source code under the terms + of section 4, provided that you also meet all of these conditions: + + a) The work must carry prominent notices stating that you modified it, + and giving a relevant date. + + b) The work must carry prominent notices stating that it is released + under this License and any conditions added under section 7. This + requirement modifies the requirement in section 4 to “keep intact all + notices”. + + c) You must license the entire work, as a whole, under this License to + anyone who comes into possession of a copy. This License will therefore + apply, along with any applicable section 7 additional terms, to the + whole of the work, and all its parts, regardless of how they are + packaged. This License gives no permission to license the work in any + other way, but it does not invalidate such permission if you have + separately received it. + + d) If the work has interactive user interfaces, each must display + Appropriate Legal Notices; however, if the Program has interactive + interfaces that do not display Appropriate Legal Notices, your work + need not make them do so. + + A compilation of a covered work with other separate and independent + works, which are not by their nature extensions of the covered work, and + which are not combined with it such as to form a larger program, in or on + a volume of a storage or distribution medium, is called an “aggregate” if + the compilation and its resulting copyright are not used to limit the + access or legal rights of the compilation's users beyond what the + individual works permit. Inclusion of a covered work in an aggregate does + not cause this License to apply to the other parts of the aggregate. + + 6. Conveying Non-Source Forms. + + You may convey a covered work in object code form under the terms of + sections 4 and 5, provided that you also convey the machine-readable + Corresponding Source under the terms of this License, in one of these + ways: + + a) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by the + Corresponding Source fixed on a durable physical medium customarily + used for software interchange. + + b) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by a written + offer, valid for at least three years and valid for as long as you + offer spare parts or customer support for that product model, to give + anyone who possesses the object code either (1) a copy of the + Corresponding Source for all the software in the product that is + covered by this License, on a durable physical medium customarily used + for software interchange, for a price no more than your reasonable cost + of physically performing this conveying of source, or (2) access to + copy the Corresponding Source from a network server at no charge. + + c) Convey individual copies of the object code with a copy of the + written offer to provide the Corresponding Source. This alternative is + allowed only occasionally and noncommercially, and only if you received + the object code with such an offer, in accord with subsection 6b. + + d) Convey the object code by offering access from a designated place + (gratis or for a charge), and offer equivalent access to the + Corresponding Source in the same way through the same place at no + further charge. You need not require recipients to copy the + Corresponding Source along with the object code. If the place to copy + the object code is a network server, the Corresponding Source may be on + a different server (operated by you or a third party) that supports + equivalent copying facilities, provided you maintain clear directions + next to the object code saying where to find the Corresponding Source. + Regardless of what server hosts the Corresponding Source, you remain + obligated to ensure that it is available for as long as needed to + satisfy these requirements. + + e) Convey the object code using peer-to-peer transmission, provided you + inform other peers where the object code and Corresponding Source of + the work are being offered to the general public at no charge under + subsection 6d. + + A separable portion of the object code, whose source code is excluded + from the Corresponding Source as a System Library, need not be included + in conveying the object code work. + + A “User Product” is either (1) a “consumer product”, which means any + tangible personal property which is normally used for personal, family, + or household purposes, or (2) anything designed or sold for incorporation + into a dwelling. In determining whether a product is a consumer product, + doubtful cases shall be resolved in favor of coverage. For a particular + product received by a particular user, “normally used” refers to a + typical or common use of that class of product, regardless of the status + of the particular user or of the way in which the particular user + actually uses, or expects or is expected to use, the product. A product + is a consumer product regardless of whether the product has substantial + commercial, industrial or non-consumer uses, unless such uses represent + the only significant mode of use of the product. + + “Installation Information” for a User Product means any methods, + procedures, authorization keys, or other information required to install + and execute modified versions of a covered work in that User Product from + a modified version of its Corresponding Source. The information must + suffice to ensure that the continued functioning of the modified object + code is in no case prevented or interfered with solely because + modification has been made. + + If you convey an object code work under this section in, or with, or + specifically for use in, a User Product, and the conveying occurs as part + of a transaction in which the right of possession and use of the User + Product is transferred to the recipient in perpetuity or for a fixed term + (regardless of how the transaction is characterized), the Corresponding + Source conveyed under this section must be accompanied by the + Installation Information. But this requirement does not apply if neither + you nor any third party retains the ability to install modified object + code on the User Product (for example, the work has been installed in + ROM). + + The requirement to provide Installation Information does not include a + requirement to continue to provide support service, warranty, or updates + for a work that has been modified or installed by the recipient, or for + the User Product in which it has been modified or installed. Access + to a network may be denied when the modification itself materially + and adversely affects the operation of the network or violates the + rules and protocols for communication across the network. + + Corresponding Source conveyed, and Installation Information provided, in + accord with this section must be in a format that is publicly documented + (and with an implementation available to the public in source code form), + and must require no special password or key for unpacking, reading or + copying. + + 7. Additional Terms. + + “Additional permissions” are terms that supplement the terms of this + License by making exceptions from one or more of its conditions. + Additional permissions that are applicable to the entire Program shall be + treated as though they were included in this License, to the extent that + they are valid under applicable law. If additional permissions apply only + to part of the Program, that part may be used separately under those + permissions, but the entire Program remains governed by this License + without regard to the additional permissions. When you convey a copy of + a covered work, you may at your option remove any additional permissions + from that copy, or from any part of it. (Additional permissions may be + written to require their own removal in certain cases when you modify the + work.) You may place additional permissions on material, added by you to + a covered work, for which you have or can give appropriate copyright + permission. + + Notwithstanding any other provision of this License, for material you add + to a covered work, you may (if authorized by the copyright holders of + that material) supplement the terms of this License with terms: + + a) Disclaiming warranty or limiting liability differently from the + terms of sections 15 and 16 of this License; or + + b) Requiring preservation of specified reasonable legal notices or + author attributions in that material or in the Appropriate Legal + Notices displayed by works containing it; or + + c) Prohibiting misrepresentation of the origin of that material, or + requiring that modified versions of such material be marked in + reasonable ways as different from the original version; or + + d) Limiting the use for publicity purposes of names of licensors or + authors of the material; or + + e) Declining to grant rights under trademark law for use of some trade + names, trademarks, or service marks; or + + f) Requiring indemnification of licensors and authors of that material + by anyone who conveys the material (or modified versions of it) with + contractual assumptions of liability to the recipient, for any + liability that these contractual assumptions directly impose on those + licensors and authors. + + All other non-permissive additional terms are considered “further + restrictions” within the meaning of section 10. If the Program as you + received it, or any part of it, contains a notice stating that it is + governed by this License along with a term that is a further restriction, + you may remove that term. If a license document contains a further + restriction but permits relicensing or conveying under this License, you + may add to a covered work material governed by the terms of that license + document, provided that the further restriction does not survive such + relicensing or conveying. + + If you add terms to a covered work in accord with this section, you must + place, in the relevant source files, a statement of the additional terms + that apply to those files, or a notice indicating where to find the + applicable terms. Additional terms, permissive or non-permissive, may be + stated in the form of a separately written license, or stated as + exceptions; the above requirements apply either way. + + 8. Termination. + + You may not propagate or modify a covered work except as expressly + provided under this License. Any attempt otherwise to propagate or modify + it is void, and will automatically terminate your rights under this + License (including any patent licenses granted under the third paragraph + of section 11). + + However, if you cease all violation of this License, then your license + from a particular copyright holder is reinstated (a) provisionally, + unless and until the copyright holder explicitly and finally terminates + your license, and (b) permanently, if the copyright holder fails to + notify you of the violation by some reasonable means prior to 60 days + after the cessation. + + Moreover, your license from a particular copyright holder is reinstated + permanently if the copyright holder notifies you of the violation by some + reasonable means, this is the first time you have received notice of + violation of this License (for any work) from that copyright holder, and + you cure the violation prior to 30 days after your receipt of the notice. + + Termination of your rights under this section does not terminate the + licenses of parties who have received copies or rights from you under + this License. If your rights have been terminated and not permanently + reinstated, you do not qualify to receive new licenses for the same + material under section 10. + + 9. Acceptance Not Required for Having Copies. + + You are not required to accept this License in order to receive or run a + copy of the Program. Ancillary propagation of a covered work occurring + solely as a consequence of using peer-to-peer transmission to receive a + copy likewise does not require acceptance. However, nothing other than + this License grants you permission to propagate or modify any covered + work. These actions infringe copyright if you do not accept this License. + Therefore, by modifying or propagating a covered work, you indicate your + acceptance of this License to do so. + + 10. Automatic Licensing of Downstream Recipients. + + Each time you convey a covered work, the recipient automatically receives + a license from the original licensors, to run, modify and propagate that + work, subject to this License. You are not responsible for enforcing + compliance by third parties with this License. + + An “entity transaction” is a transaction transferring control of an + organization, or substantially all assets of one, or subdividing an + organization, or merging organizations. If propagation of a covered work + results from an entity transaction, each party to that transaction who + receives a copy of the work also receives whatever licenses to the work + the party's predecessor in interest had or could give under the previous + paragraph, plus a right to possession of the Corresponding Source of the + work from the predecessor in interest, if the predecessor has it or can + get it with reasonable efforts. + + You may not impose any further restrictions on the exercise of the rights + granted or affirmed under this License. For example, you may not impose a + license fee, royalty, or other charge for exercise of rights granted + under this License, and you may not initiate litigation (including a + cross-claim or counterclaim in a lawsuit) alleging that any patent claim + is infringed by making, using, selling, offering for sale, or importing + the Program or any portion of it. + + 11. Patents. + + A “contributor” is a copyright holder who authorizes use under this + License of the Program or a work on which the Program is based. The work + thus licensed is called the contributor's “contributor version”. + + A contributor's “essential patent claims” are all patent claims owned or + controlled by the contributor, whether already acquired or hereafter + acquired, that would be infringed by some manner, permitted by this + License, of making, using, or selling its contributor version, but do not + include claims that would be infringed only as a consequence of further + modification of the contributor version. For purposes of this definition, + “control” includes the right to grant patent sublicenses in a manner + consistent with the requirements of this License. + + Each contributor grants you a non-exclusive, worldwide, royalty-free + patent license under the contributor's essential patent claims, to make, + use, sell, offer for sale, import and otherwise run, modify and propagate + the contents of its contributor version. + + In the following three paragraphs, a “patent license” is any express + agreement or commitment, however denominated, not to enforce a patent + (such as an express permission to practice a patent or covenant not to + sue for patent infringement). To “grant” such a patent license to a party + means to make such an agreement or commitment not to enforce a patent + against the party. + + If you convey a covered work, knowingly relying on a patent license, and + the Corresponding Source of the work is not available for anyone to copy, + free of charge and under the terms of this License, through a publicly + available network server or other readily accessible means, then you must + either (1) cause the Corresponding Source to be so available, or (2) + arrange to deprive yourself of the benefit of the patent license for this + particular work, or (3) arrange, in a manner consistent with the + requirements of this License, to extend the patent license to downstream + recipients. “Knowingly relying” means you have actual knowledge that, but + for the patent license, your conveying the covered work in a country, or + your recipient's use of the covered work in a country, would infringe + one or more identifiable patents in that country that you have reason + to believe are valid. + + If, pursuant to or in connection with a single transaction or + arrangement, you convey, or propagate by procuring conveyance of, a + covered work, and grant a patent license to some of the parties receiving + the covered work authorizing them to use, propagate, modify or convey a + specific copy of the covered work, then the patent license you grant is + automatically extended to all recipients of the covered work and works + based on it. + + A patent license is “discriminatory” if it does not include within the + scope of its coverage, prohibits the exercise of, or is conditioned on + the non-exercise of one or more of the rights that are specifically + granted under this License. You may not convey a covered work if you are + a party to an arrangement with a third party that is in the business of + distributing software, under which you make payment to the third party + based on the extent of your activity of conveying the work, and under + which the third party grants, to any of the parties who would receive the + covered work from you, a discriminatory patent license (a) in connection + with copies of the covered work conveyed by you (or copies made from + those copies), or (b) primarily for and in connection with specific + products or compilations that contain the covered work, unless you + entered into that arrangement, or that patent license was granted, prior + to 28 March 2007. + + Nothing in this License shall be construed as excluding or limiting any + implied license or other defenses to infringement that may otherwise be + available to you under applicable patent law. + + 12. No Surrender of Others' Freedom. + + If conditions are imposed on you (whether by court order, agreement or + otherwise) that contradict the conditions of this License, they do not + excuse you from the conditions of this License. If you cannot use, + propagate or convey a covered work so as to satisfy simultaneously your + obligations under this License and any other pertinent obligations, then + as a consequence you may not use, propagate or convey it at all. For + example, if you agree to terms that obligate you to collect a royalty for + further conveying from those to whom you convey the Program, the only way + you could satisfy both those terms and this License would be to refrain + entirely from conveying the Program. + + 13. Offering the Program as a Service. + + If you make the functionality of the Program or a modified version + available to third parties as a service, you must make the Service Source + Code available via network download to everyone at no charge, under the + terms of this License. Making the functionality of the Program or + modified version available to third parties as a service includes, + without limitation, enabling third parties to interact with the + functionality of the Program or modified version remotely through a + computer network, offering a service the value of which entirely or + primarily derives from the value of the Program or modified version, or + offering a service that accomplishes for users the primary purpose of the + Program or modified version. + + “Service Source Code” means the Corresponding Source for the Program or + the modified version, and the Corresponding Source for all programs that + you use to make the Program or modified version available as a service, + including, without limitation, management software, user interfaces, + application program interfaces, automation software, monitoring software, + backup software, storage software and hosting software, all such that a + user could run an instance of the service using the Service Source Code + you make available. + + 14. Revised Versions of this License. + + Veertu Inc. may publish revised and/or new versions of the Server Side + Public License from time to time. Such new versions will be similar in + spirit to the present version, but may differ in detail to address new + problems or concerns. + + Each version is given a distinguishing version number. If the Program + specifies that a certain numbered version of the Server Side Public + License “or any later version” applies to it, you have the option of + following the terms and conditions either of that numbered version or of + any later version published by Veertu Inc. If the Program does not + specify a version number of the Server Side Public License, you may + choose any version ever published by Veertu Inc. + + If the Program specifies that a proxy can decide which future versions of + the Server Side Public License can be used, that proxy's public statement + of acceptance of a version permanently authorizes you to choose that + version for the Program. + + Later license versions may give you additional or different permissions. + However, no additional obligations are imposed on any author or copyright + holder as a result of your choosing to follow a later version. + + 15. Disclaimer of Warranty. + + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY + APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT + HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM “AS IS” WITHOUT WARRANTY + OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, + THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM + IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF + ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. Limitation of Liability. + + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING + WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS + THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING + ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF + THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO + LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU + OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER + PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE + POSSIBILITY OF SUCH DAMAGES. + + 17. Interpretation of Sections 15 and 16. + + If the disclaimer of warranty and limitation of liability provided above + cannot be given local legal effect according to their terms, reviewing + courts shall apply local law that most closely approximates an absolute + waiver of all civil liability in connection with the Program, unless a + warranty or assumption of liability accompanies a copy of the Program in + return for a fee. + + END OF TERMS AND CONDITIONS \ No newline at end of file diff --git a/README.md b/README.md index 72af236..e72d732 100644 --- a/README.md +++ b/README.md @@ -477,3 +477,7 @@ Any of the services you run are done from within worker context. Each service al For example, the `github` plugin will update the metrics for the service it is running in to be `running`, `pulling`, and `idle` when it is done or has yet to pick up a new job. To do this, it uses `metrics.UpdateService` with the worker and service context. See `github` plugin for an example. But metrics.UpdateService can also update things like `LastSuccess`, and `LastFailure`. See `metrics.UpdateService` for more information. + + +## Copyright +All rights reserved, Veertu Inc. \ No newline at end of file diff --git a/anklet.code-workspace b/anklet.code-workspace index 6d118a1..690c17e 100644 --- a/anklet.code-workspace +++ b/anklet.code-workspace @@ -5,6 +5,9 @@ }, { "path": "../ci-tools" + }, + { + "path": "../webhooks" } ], "settings": {} diff --git a/go.mod b/go.mod index 4e9fe64..343cee6 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,10 @@ module github.com/veertuinc/anklet go 1.22.2 require ( - github.com/bradleyfalzon/ghinstallation/v2 v2.10.0 + github.com/bradleyfalzon/ghinstallation/v2 v2.11.0 github.com/gofri/go-github-ratelimit v1.1.0 - github.com/google/go-github/v61 v61.0.0 + github.com/google/go-github/v63 v63.0.0 github.com/google/uuid v1.6.0 - github.com/norsegaud/go-daemon v0.1.10 github.com/redis/go-redis/v9 v9.5.1 github.com/shirou/gopsutil/v3 v3.24.4 gopkg.in/yaml.v2 v2.4.0 @@ -18,9 +17,8 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect - github.com/google/go-github/v60 v60.0.0 // indirect + github.com/google/go-github/v62 v62.0.0 // indirect github.com/google/go-querystring v1.1.0 // indirect - github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect @@ -31,3 +29,5 @@ require ( ) replace github.com/veertuinc/anklet/plugins/github => ./plugins/github + +replace github.com/veertuinc/anklet/plugins/controllers => ./plugins/controllers diff --git a/go.sum b/go.sum index b04df06..e8b7a5f 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/bradleyfalzon/ghinstallation/v2 v2.10.0 h1:XWuWBRFEpqVrHepQob9yPS3Xg4K3Wr9QCx4fu8HbUNg= -github.com/bradleyfalzon/ghinstallation/v2 v2.10.0/go.mod h1:qoGA4DxWPaYTgVCrmEspVSjlTu4WYAiSxMIhorMRXXc= +github.com/bradleyfalzon/ghinstallation/v2 v2.11.0 h1:R9d0v+iobRHSaE4wKUnXFiZp53AL4ED5MzgEMwGTZag= +github.com/bradleyfalzon/ghinstallation/v2 v2.11.0/go.mod h1:0LWKQwOHewXO/1acI6TtyE0Xc4ObDb2rFN7eHBAG71M= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -22,20 +22,16 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-github/v60 v60.0.0 h1:oLG98PsLauFvvu4D/YPxq374jhSxFYdzQGNCyONLfn8= -github.com/google/go-github/v60 v60.0.0/go.mod h1:ByhX2dP9XT9o/ll2yXAu2VD8l5eNVg8hD4Cr0S/LmQk= -github.com/google/go-github/v61 v61.0.0 h1:VwQCBwhyE9JclCI+22/7mLB1PuU9eowCXKY5pNlu1go= -github.com/google/go-github/v61 v61.0.0/go.mod h1:0WR+KmsWX75G2EbpyGsGmradjo3IiciuI4BmdVCobQY= +github.com/google/go-github/v62 v62.0.0 h1:/6mGCaRywZz9MuHyw9gD1CwsbmBX8GWsbFkwMmHdhl4= +github.com/google/go-github/v62 v62.0.0/go.mod h1:EMxeUqGJq2xRu9DYBMwel/mr7kZrzUOfQmmpYrZn2a4= +github.com/google/go-github/v63 v63.0.0 h1:13xwK/wk9alSokujB9lJkuzdmQuVn2QCPeck76wR3nE= +github.com/google/go-github/v63 v63.0.0/go.mod h1:IqbcrgUmIcEaioWrGYei/09o+ge5vhffGOcxrO0AfmA= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA= -github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= -github.com/norsegaud/go-daemon v0.1.10 h1:JnYLPJ1uHU1SsZ29iDkXQbzvy0Fxecy2o2buDgnI528= -github.com/norsegaud/go-daemon v0.1.10/go.mod h1:Xfzc5NRusltO7M151oKbmC4bBXBi44uFviKbP2BUxQ8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= diff --git a/internal/config/config.go b/internal/config/config.go index 48801d8..b6cd14d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -63,6 +63,9 @@ type Service struct { AppID int `yaml:"app_id"` InstallationID int64 `yaml:"installation_id"` Workflows Workflow `yaml:"workflows"` + Port string `yaml:"port"` + Secret string `yaml:"secret"` + HookID int64 `yaml:"hook_id"` } func LoadConfig(configPath string) (Config, error) { @@ -145,14 +148,14 @@ func LoadInEnvs(config Config) (Config, error) { if workDir != "" { config.WorkDir = workDir } - pidFileDir := os.Getenv("ANKLET_PID_FILE_DIR") - if pidFileDir != "" { - config.PidFileDir = pidFileDir - } - logFileDir := os.Getenv("ANKLET_LOG_FILE_DIR") - if logFileDir != "" { - config.Log.FileDir = logFileDir - } + // pidFileDir := os.Getenv("ANKLET_PID_FILE_DIR") + // if pidFileDir != "" { + // config.PidFileDir = pidFileDir + // } + // logFileDir := os.Getenv("ANKLET_LOG_FILE_DIR") + // if logFileDir != "" { + // config.Log.FileDir = logFileDir + // } return config, nil } diff --git a/internal/database/database.go b/internal/database/database.go index e1b9b70..2cf354e 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -2,6 +2,7 @@ package database import ( "context" + "encoding/json" "errors" "fmt" "log/slog" @@ -112,3 +113,27 @@ func AddUniqueRunKey(ctx context.Context) (bool, error) { } return true, errors.New("unique run key already exists") } + +func UnwrapPayload[T any](payload string) (T, error, error) { + var wrappedPayload map[string]interface{} + var t T + err := json.Unmarshal([]byte(payload), &wrappedPayload) + if err != nil { + return t, err, nil + } + payloadBytes, err := json.Marshal(wrappedPayload["payload"]) + if err != nil { + return t, err, nil + } + if err := json.Unmarshal(payloadBytes, &t); err != nil { + return t, err, nil + } + jobType, ok := wrappedPayload["type"].(string) + if !ok { + return t, nil, errors.New("job type not found or not a string") + } + if jobType == "anka.VM" { + return t, nil, errors.New("job type " + jobType + " is not what we expect") + } + return t, nil, nil +} diff --git a/internal/github/client.go b/internal/github/client.go index 6ebee06..3c6ae70 100644 --- a/internal/github/client.go +++ b/internal/github/client.go @@ -4,7 +4,7 @@ import ( "context" "net/http" - "github.com/google/go-github/v61/github" + "github.com/google/go-github/v63/github" "github.com/veertuinc/anklet/internal/config" ) diff --git a/internal/metrics/aggregator.go b/internal/metrics/aggregator.go index 51291b4..e4a5bd6 100644 --- a/internal/metrics/aggregator.go +++ b/internal/metrics/aggregator.go @@ -15,7 +15,7 @@ import ( // Start runs the HTTP server func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.Logger) { - http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + http.HandleFunc("/metrics/v1", func(w http.ResponseWriter, r *http.Request) { databaseContainer, err := database.GetDatabaseFromContext(workerCtx) if err != nil { logger.ErrorContext(workerCtx, "error getting database client from context", "error", err) @@ -30,6 +30,10 @@ func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.L http.Error(w, "unsupported format, please use '?format=json' or '?format=prometheus'", http.StatusBadRequest) } }) + http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) + w.Write([]byte("please use /metrics/v1")) + }) http.ListenAndServe(":"+s.Port, nil) } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index f1cad8a..e7146c9 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -204,7 +204,7 @@ func NewServer(port string) *Server { // Start runs the HTTP server func (s *Server) Start(parentCtx context.Context, logger *slog.Logger) { - http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + http.HandleFunc("/metrics/v1", func(w http.ResponseWriter, r *http.Request) { // update system metrics each call metricsData := GetMetricsDataFromContext(parentCtx) UpdateSystemMetrics(parentCtx, logger, metricsData) @@ -217,6 +217,10 @@ func (s *Server) Start(parentCtx context.Context, logger *slog.Logger) { http.Error(w, "unsupported format, please use '?format=json' or '?format=prometheus'", http.StatusBadRequest) } }) + http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) + w.Write([]byte("please use /metrics/v1")) + }) http.ListenAndServe(":"+s.Port, nil) } diff --git a/internal/run/run.go b/internal/run/run.go index f76b217..69ea25d 100644 --- a/internal/run/run.go +++ b/internal/run/run.go @@ -6,10 +6,12 @@ import ( "github.com/veertuinc/anklet/internal/config" "github.com/veertuinc/anklet/internal/logging" + "github.com/veertuinc/anklet/internal/metrics" + github_controller "github.com/veertuinc/anklet/plugins/controllers" "github.com/veertuinc/anklet/plugins/github" ) -func Plugin(workerCtx context.Context, serviceCtx context.Context, logger *slog.Logger) { +func Plugin(workerCtx context.Context, serviceCtx context.Context, serviceCancel context.CancelFunc, logger *slog.Logger, firstServiceStarted chan bool) { service := config.GetServiceFromContext(serviceCtx) // fmt.Printf("%+v\n", service) serviceCtx = logging.AppendCtx(serviceCtx, slog.String("plugin", service.Plugin)) @@ -17,7 +19,26 @@ func Plugin(workerCtx context.Context, serviceCtx context.Context, logger *slog. panic("plugin is not set in yaml:services:" + service.Name + ":plugin") } if service.Plugin == "github" { - github.Run(workerCtx, serviceCtx, logger) + for { + select { + case <-serviceCtx.Done(): + serviceCancel() + return + default: + // notify the main thread that the service has started + select { + case <-firstServiceStarted: + default: + close(firstServiceStarted) + } + github.Run(workerCtx, serviceCtx, serviceCancel, logger) + metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{ + Status: "idle", + }) + } + } + } else if service.Plugin == "github_controller" { + github_controller.Run(workerCtx, serviceCtx, serviceCancel, logger, firstServiceStarted) } else { panic("plugin not found: " + service.Plugin) } diff --git a/main.go b/main.go index e2549c5..6c46d26 100644 --- a/main.go +++ b/main.go @@ -4,8 +4,8 @@ import ( "context" "flag" "fmt" - "log" "log/slog" + "net" "net/http" "net/url" "os" @@ -16,7 +16,6 @@ import ( "time" "github.com/gofri/go-github-ratelimit/github_ratelimit" - "github.com/norsegaud/go-daemon" "github.com/veertuinc/anklet/internal/anka" "github.com/veertuinc/anklet/internal/config" "github.com/veertuinc/anklet/internal/database" @@ -30,25 +29,25 @@ var ( runOnce = "false" versionFlag = flag.Bool("version", false, "Print the version") configFlag = flag.String("c", "", "Path to the config file (defaults to ~/.config/anklet/config.yml)") - signalFlag = flag.String("s", "", `Send signal to the daemon: - drain — graceful shutdown, will wait until all jobs finish before exiting - stop — best effort graceful shutdown, interrupting the job as soon as possible`) - attachFlag = flag.Bool("attach", false, "Attach to the anklet and don't background it (useful for containers)") - stop = make(chan struct{}) - done = make(chan struct{}) + // signalFlag = flag.String("s", "", `Send signal to the daemon: + // drain — graceful shutdown, will wait until all jobs finish before exiting + // stop — best effort graceful shutdown, interrupting the job as soon as possible`) + // attachFlag = flag.Bool("attach", false, "Attach to the anklet and don't background it (useful for containers)") + // stop = make(chan struct{}) + // done = make(chan struct{}) shutDownMessage = "anklet service shut down" ) -func termHandler(ctx context.Context, logger *slog.Logger) daemon.SignalHandlerFunc { - return func(sig os.Signal) error { - logger.WarnContext(ctx, "terminating anklet, please do not interrupt...") - stop <- struct{}{} - if sig == syscall.SIGQUIT { - <-done - } - return daemon.ErrStop - } -} +// func termHandler(ctx context.Context, logger *slog.Logger) daemon.SignalHandlerFunc { +// return func(sig os.Signal) error { +// logger.WarnContext(ctx, "terminating anklet, please do not interrupt...") +// stop <- struct{}{} +// if sig == syscall.SIGQUIT { +// <-done +// } +// return daemon.ErrStop +// } +// } func main() { @@ -64,8 +63,8 @@ func main() { fmt.Println(version) os.Exit(0) } - daemon.AddCommand(daemon.StringFlag(signalFlag, "drain"), syscall.SIGQUIT, termHandler(parentCtx, logger)) - daemon.AddCommand(daemon.StringFlag(signalFlag, "stop"), syscall.SIGTERM, termHandler(parentCtx, logger)) + // daemon.AddCommand(daemon.StringFlag(signalFlag, "drain"), syscall.SIGQUIT, termHandler(parentCtx, logger)) + // daemon.AddCommand(daemon.StringFlag(signalFlag, "stop"), syscall.SIGTERM, termHandler(parentCtx, logger)) homeDir, err := os.UserHomeDir() if err != nil { @@ -81,14 +80,14 @@ func main() { // obtain config loadedConfig, err := config.LoadConfig(configPath) if err != nil { - logger.InfoContext(parentCtx, "unable to load config.yml", "error", err) - // panic(err) + logger.ErrorContext(parentCtx, "unable to load config.yml (is it in the work_dir, or are you using an absolute path?)", "error", err) + panic(err) } - logger.InfoContext(parentCtx, "loaded config", slog.Any("config", loadedConfig)) loadedConfig, err = config.LoadInEnvs(loadedConfig) if err != nil { panic(err) } + logger.InfoContext(parentCtx, "loaded config", slog.Any("config", loadedConfig)) parentCtx = logging.AppendCtx(parentCtx, slog.String("ankletVersion", version)) @@ -98,12 +97,12 @@ func main() { } parentCtx = context.WithValue(parentCtx, config.ContextKey("suffix"), suffix) - if loadedConfig.Log.FileDir == "" { - loadedConfig.Log.FileDir = "./" - } - if loadedConfig.PidFileDir == "" { - loadedConfig.PidFileDir = "./" - } + // if loadedConfig.Log.FileDir == "" { + // loadedConfig.Log.FileDir = "./" + // } + // if loadedConfig.PidFileDir == "" { + // loadedConfig.PidFileDir = "./" + // } if loadedConfig.WorkDir == "" { loadedConfig.WorkDir = "./" } @@ -111,27 +110,27 @@ func main() { logger.DebugContext(parentCtx, "loaded config", slog.Any("config", loadedConfig)) parentCtx = context.WithValue(parentCtx, config.ContextKey("config"), &loadedConfig) - daemonContext := &daemon.Context{ - PidFileName: loadedConfig.PidFileDir + "anklet" + suffix + ".pid", - PidFilePerm: 0644, - LogFileName: loadedConfig.Log.FileDir + "anklet" + suffix + ".log", - LogFilePerm: 0640, - WorkDir: loadedConfig.WorkDir, - Umask: 027, - Args: []string{"anklet", "-c", configPath}, - } + // daemonContext := &daemon.Context{ + // PidFileName: loadedConfig.PidFileDir + "anklet" + suffix + ".pid", + // PidFilePerm: 0644, + // LogFileName: loadedConfig.Log.FileDir + "anklet" + suffix + ".log", + // LogFilePerm: 0640, + // WorkDir: loadedConfig.WorkDir, + // Umask: 027, + // Args: []string{"anklet", "-c", configPath}, + // } - if len(daemon.ActiveFlags()) > 0 { - d, err := daemonContext.Search() - if err != nil { - log.Fatalf("Unable send signal to the daemon: %s", err.Error()) - } - err = daemon.SendCommands(d) - if err != nil { - log.Fatalln(err.Error()) - } - return - } + // if len(daemon.ActiveFlags()) > 0 { + // d, err := daemonContext.Search() + // if err != nil { + // log.Fatalf("Unable send signal to the daemon: %s", err.Error()) + // } + // err = daemon.SendCommands(d) + // if err != nil { + // log.Fatalln(err.Error()) + // } + // return + // } pluginsPath := filepath.Join(homeDir, ".config", "anklet", "plugins") parentCtx = context.WithValue(parentCtx, config.ContextKey("globals"), config.Globals{ @@ -143,71 +142,83 @@ func main() { httpTransport := http.DefaultTransport parentCtx = context.WithValue(parentCtx, config.ContextKey("httpTransport"), httpTransport) - if !loadedConfig.Metrics.Aggregator { - githubServiceExists := false - for _, service := range loadedConfig.Services { - if service.Plugin == "github" { - githubServiceExists = true - } - } - if githubServiceExists { - rateLimiter, err := github_ratelimit.NewRateLimitWaiterClient(httpTransport) - if err != nil { - logger.ErrorContext(parentCtx, "error creating github_ratelimit.NewRateLimitWaiterClient", "err", err) - return - } - parentCtx = context.WithValue(parentCtx, config.ContextKey("rateLimiter"), rateLimiter) + githubServiceExists := false + for _, service := range loadedConfig.Services { + if service.Plugin == "github" || service.Plugin == "github_controller" { + githubServiceExists = true } } - - if !*attachFlag { - d, err := daemonContext.Reborn() + if githubServiceExists { + rateLimiter, err := github_ratelimit.NewRateLimitWaiterClient(httpTransport) if err != nil { - log.Fatalln(err) - } - if d != nil { + logger.ErrorContext(parentCtx, "error creating github_ratelimit.NewRateLimitWaiterClient", "err", err) return } - defer daemonContext.Release() + parentCtx = context.WithValue(parentCtx, config.ContextKey("rateLimiter"), rateLimiter) } - go worker(parentCtx, logger, loadedConfig) + // if !*attachFlag { + // d, err := daemonContext.Reborn() + // if err != nil { + // log.Fatalln(err) + // } + // if d != nil { + // return + // } + // defer daemonContext.Release() + // } - err = daemon.ServeSignals() - if err != nil { - log.Printf("Error: %s", err.Error()) - } + // Capture ctrl+c and handle sending cancellation + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + + //go + worker(parentCtx, logger, loadedConfig, sigChan) + + // err = daemon.ServeSignals() + // if err != nil { + // log.Printf("Error: %s", err.Error()) + // } } -func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.Config) { +func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.Config, sigChan chan os.Signal) { globals := config.GetGlobalsFromContext(parentCtx) toRunOnce := globals.RunOnce workerCtx, workerCancel := context.WithCancel(parentCtx) suffix := parentCtx.Value(config.ContextKey("suffix")).(string) logger.InfoContext(workerCtx, "starting anklet"+suffix) + returnToMainQueue := make(chan bool, 1) + workerCtx = context.WithValue(workerCtx, config.ContextKey("returnToMainQueue"), returnToMainQueue) var wg sync.WaitGroup - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGQUIT) go func() { defer signal.Stop(sigChan) defer close(sigChan) for sig := range sigChan { switch sig { - case syscall.SIGTERM: - logger.WarnContext(workerCtx, "best effort graceful shutdown, interrupting the job as soon as possible...") - workerCancel() - case syscall.SIGQUIT: + // case syscall.SIGTERM: + // logger.WarnContext(workerCtx, "best effort graceful shutdown, interrupting the job as soon as possible...") + // workerCancel() + case syscall.SIGQUIT: // doesn't work for controllers since they don't loop logger.WarnContext(workerCtx, "graceful shutdown, waiting for jobs to finish...") toRunOnce = "true" + default: + logger.WarnContext(workerCtx, "best effort graceful shutdown, interrupting the job as soon as possible...") + workerCancel() + returnToMainQueue <- true } } }() - // Setup Metrics Server and context metricsPort := "8080" if loadedConfig.Metrics.Port != "" { metricsPort = loadedConfig.Metrics.Port } + ln, err := net.Listen("tcp", ":"+metricsPort) + if err != nil { + logger.ErrorContext(workerCtx, "port already in use", "port", metricsPort, "error", err) + panic(fmt.Sprintf("port %s is already in use", metricsPort)) + } + ln.Close() metricsService := metrics.NewServer(metricsPort) if loadedConfig.Metrics.Aggregator { workerCtx = logging.AppendCtx(workerCtx, slog.Any("metrics_urls", loadedConfig.Metrics.MetricsURLs)) @@ -254,6 +265,10 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. }(metricsURL) } } else { + // firstServiceStarted: always make sure the first service in the config starts first before any others. + // this allows users to mix a controller with multiple other plugins, + // and let the controller do its thing to prepare the db first. + firstServiceStarted := make(chan bool, 1) metricsData := &metrics.MetricsDataLock{} workerCtx = context.WithValue(workerCtx, config.ContextKey("metrics"), metricsData) go metricsService.Start(workerCtx, logger) @@ -261,25 +276,24 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. metrics.UpdateSystemMetrics(workerCtx, logger, metricsData) ///////////// // Services - for _, service := range loadedConfig.Services { + for index, service := range loadedConfig.Services { wg.Add(1) + if index != 0 { + waitLoop: + for { + select { + case <-firstServiceStarted: + break waitLoop + case <-workerCtx.Done(): + return + default: + time.Sleep(100 * time.Millisecond) + } + } + } go func(service config.Service) { defer wg.Done() serviceCtx, serviceCancel := context.WithCancel(workerCtx) // Inherit from parent context - // sigChan := make(chan os.Signal, 1) - // signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGQUIT) - // go func() { - // defer signal.Stop(sigChan) - // defer close(sigChan) - // for sig := range sigChan { - // switch sig { - // case syscall.SIGTERM: - // serviceCancel() - // case syscall.SIGQUIT: - // runOnce = "true" - // } - // } - // }() if service.Name == "" { panic("name is required for services") @@ -291,7 +305,9 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. ankaCLI, err := anka.NewCLI(serviceCtx) if err != nil { - panic(fmt.Sprintf("unable to create anka cli: %v", err)) + serviceCancel() + logger.ErrorContext(serviceCtx, "unable to create anka cli", "error", err) + return } serviceCtx = context.WithValue(serviceCtx, config.ContextKey("ankacli"), ankaCLI) @@ -304,7 +320,8 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. serviceCtx = context.WithValue(serviceCtx, config.ContextKey("database"), databaseClient) } - logger.InfoContext(serviceCtx, "started service") + logger.InfoContext(serviceCtx, "starting service") + metricsData.AddService(metrics.Service{ Name: service.Name, PluginName: service.Plugin, @@ -317,17 +334,18 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. for { select { case <-serviceCtx.Done(): - serviceCancel() - logger.WarnContext(serviceCtx, shutDownMessage) metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{ Status: "stopped", }) + logger.WarnContext(serviceCtx, shutDownMessage) + serviceCancel() return default: - run.Plugin(workerCtx, serviceCtx, logger) + run.Plugin(workerCtx, serviceCtx, serviceCancel, logger, firstServiceStarted) if workerCtx.Err() != nil || toRunOnce == "true" { serviceCancel() - break + logger.WarnContext(serviceCtx, shutDownMessage) + return } metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{ Status: "idle", @@ -335,6 +353,7 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. select { case <-time.After(time.Duration(service.SleepInterval) * time.Second): case <-serviceCtx.Done(): + fmt.Println("serviceCtx.Done()") break } } diff --git a/plugins/controllers/github.go b/plugins/controllers/github.go new file mode 100644 index 0000000..a280f96 --- /dev/null +++ b/plugins/controllers/github.go @@ -0,0 +1,534 @@ +package github + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "strings" + "sync" + "time" + + "github.com/bradleyfalzon/ghinstallation/v2" + "github.com/google/go-github/v63/github" + "github.com/veertuinc/anklet/internal/config" + "github.com/veertuinc/anklet/internal/database" + internalGithub "github.com/veertuinc/anklet/internal/github" + "github.com/veertuinc/anklet/internal/logging" + "github.com/veertuinc/anklet/internal/metrics" +) + +// Server defines the structure for the API server +type Server struct { + Port string +} + +// NewServer creates a new instance of Server +func NewServer(port string) *Server { + return &Server{ + Port: port, + } +} + +func exists_in_array_exact(array_to_search_in []string, desired []string) bool { + for _, desired_string := range desired { + found := false + for _, item := range array_to_search_in { + if item == desired_string { + found = true + break + } + } + if !found { + return false + } + } + return true +} + +// passing the the queue and ID to check for +func InQueue(serviceCtx context.Context, logger *slog.Logger, jobID int64, queue string) (bool, error) { + databaseContainer, err := database.GetDatabaseFromContext(serviceCtx) + if err != nil { + logger.ErrorContext(serviceCtx, "error getting database client from context", "error", err) + return false, err + } + queued, err := databaseContainer.Client.LRange(serviceCtx, queue, 0, -1).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting list of queued jobs", "err", err) + return false, err + } + for _, queueItem := range queued { + workflowJobEvent, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](queueItem) + if err != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) + return false, err + } + if typeErr != nil { // not the type we want + continue + } + if *workflowJobEvent.WorkflowJob.ID == jobID { + return true, fmt.Errorf("job already in queue") + } + } + return false, nil +} + +// https://github.com/gofri/go-github-ratelimit has yet to support primary rate limits, so we have to do it ourselves. +func ExecuteGitHubClientFunction[T any](serviceCtx context.Context, logger *slog.Logger, executeFunc func() (*T, *github.Response, error)) (context.Context, *T, *github.Response, error) { + result, response, err := executeFunc() + if response != nil { + serviceCtx = logging.AppendCtx(serviceCtx, slog.Int("api_limit_remaining", response.Rate.Remaining)) + serviceCtx = logging.AppendCtx(serviceCtx, slog.String("api_limit_reset_time", response.Rate.Reset.Time.Format(time.RFC3339))) + serviceCtx = logging.AppendCtx(serviceCtx, slog.Int("api_limit", response.Rate.Limit)) + if response.Rate.Remaining <= 10 { // handle primary rate limiting + sleepDuration := time.Until(response.Rate.Reset.Time) + time.Second // Adding a second to ensure we're past the reset time + logger.WarnContext(serviceCtx, "GitHub API rate limit exceeded, sleeping until reset") + metricsData := metrics.GetMetricsDataFromContext(serviceCtx) + service := config.GetServiceFromContext(serviceCtx) + metricsData.UpdateService(serviceCtx, logger, metrics.Service{ + Name: service.Name, + Status: "limit_paused", + }) + select { + case <-time.After(sleepDuration): + metricsData.UpdateService(serviceCtx, logger, metrics.Service{ + Name: service.Name, + Status: "running", + }) + return ExecuteGitHubClientFunction(serviceCtx, logger, executeFunc) // Retry the function after waiting + case <-serviceCtx.Done(): + return serviceCtx, nil, nil, serviceCtx.Err() + } + } + } + if err != nil { + if err.Error() != "context canceled" { + if !strings.Contains(err.Error(), "try again later") { + logger.Error("error executing GitHub client function: " + err.Error()) + } + } + return serviceCtx, nil, nil, err + } + return serviceCtx, result, response, nil +} + +// Start runs the HTTP server +func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel context.CancelFunc, logger *slog.Logger, firstServiceStarted chan bool) { + service := config.GetServiceFromContext(serviceCtx) + databaseContainer, err := database.GetDatabaseFromContext(serviceCtx) + if err != nil { + logger.ErrorContext(serviceCtx, "error getting database client from context", "error", err) + return + } + rateLimiter := internalGithub.GetRateLimitWaiterClientFromContext(serviceCtx) + httpTransport := config.GetHttpTransportFromContext(serviceCtx) + var githubClient *github.Client + if service.PrivateKey != "" { + itr, err := ghinstallation.NewKeyFromFile(httpTransport, int64(service.AppID), int64(service.InstallationID), service.PrivateKey) + if err != nil { + logger.ErrorContext(serviceCtx, "error creating github app installation token", "err", err) + return + } + rateLimiter.Transport = itr + githubClient = github.NewClient(rateLimiter) + } else { + githubClient = github.NewClient(rateLimiter).WithAuthToken(service.Token) + } + + server := &http.Server{Addr: ":" + service.Port} + http.HandleFunc("/jobs/v1/receiver", func(w http.ResponseWriter, r *http.Request) { + databaseContainer, err := database.GetDatabaseFromContext(serviceCtx) + if err != nil { + logger.ErrorContext(serviceCtx, "error getting database client from context", "error", err) + return + } + payload, err := github.ValidatePayload(r, []byte(service.Secret)) + if err != nil { + logger.ErrorContext(serviceCtx, "error validating payload", "error", err) + return + } + event, err := github.ParseWebHook(github.WebHookType(r), payload) + if err != nil { + logger.ErrorContext(serviceCtx, "error parsing event", "error", err) + return + } + switch workflowJob := event.(type) { + case *github.WorkflowJobEvent: + logger.DebugContext(serviceCtx, "received workflow job to consider", + "workflowJob.Action", *workflowJob.Action, + "workflowJob.WorkflowJob.Labels", workflowJob.WorkflowJob.Labels, + "workflowJob.WorkflowJob.ID", *workflowJob.WorkflowJob.ID, + ) + if *workflowJob.Action == "queued" { + if exists_in_array_exact(workflowJob.WorkflowJob.Labels, []string{"self-hosted", "anka"}) { + // make sure it doesn't already exist + inQueue, err := InQueue(serviceCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/queued") + if err != nil { + logger.ErrorContext(serviceCtx, "error searching in queue", "error", err) + return + } + if !inQueue { // if it doesn't exist already + // push it to the queue + wrappedJobPayload := map[string]interface{}{ + "type": "WorkflowJobPayload", + "payload": workflowJob, + } + wrappedPayloadJSON, err := json.Marshal(wrappedJobPayload) + if err != nil { + logger.ErrorContext(serviceCtx, "error converting job payload to JSON", "error", err) + return + } + push := databaseContainer.Client.RPush(serviceCtx, "anklet/jobs/github/queued", wrappedPayloadJSON) + if push.Err() != nil { + logger.ErrorContext(serviceCtx, "error pushing job to queue", "error", push.Err()) + return + } + logger.InfoContext(serviceCtx, "job pushed to queued queue", "json", string(wrappedPayloadJSON)) + } + } + } else if *workflowJob.Action == "completed" { + if exists_in_array_exact(workflowJob.WorkflowJob.Labels, []string{"self-hosted", "anka"}) { + + queues := []string{} + // get all keys from database for the main queue and service queues as well as completed + queuedKeys, err := databaseContainer.Client.Keys(serviceCtx, "anklet/jobs/github/queued*").Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting list of keys", "err", err) + return + } + queues = append(queues, queuedKeys...) + + results := make(chan bool, len(queues)) + var wg sync.WaitGroup + for _, queue := range queues { + wg.Add(1) + go func(queue string) { + defer wg.Done() + inQueue, err := InQueue(serviceCtx, logger, *workflowJob.WorkflowJob.ID, queue) + if err != nil { + logger.WarnContext(serviceCtx, err.Error(), "queue", queue) + } + results <- inQueue + }(queue) + } + go func() { + wg.Wait() + close(results) + }() + inAQueue := false + for result := range results { + if result { + inAQueue = true + break + } + } + if inAQueue { // only add completed if it's in a queue + inCompletedQueue, err := InQueue(serviceCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/completed") + if err != nil { + logger.ErrorContext(serviceCtx, "error searching in queue", "error", err) + return + } + if !inCompletedQueue { + // push it to the queue + wrappedJobPayload := map[string]interface{}{ + "type": "WorkflowJobPayload", + "payload": workflowJob, + } + wrappedPayloadJSON, err := json.Marshal(wrappedJobPayload) + if err != nil { + logger.ErrorContext(serviceCtx, "error converting job payload to JSON", "error", err) + return + } + push := databaseContainer.Client.RPush(serviceCtx, "anklet/jobs/github/completed", wrappedPayloadJSON) + if push.Err() != nil { + logger.ErrorContext(serviceCtx, "error pushing job to queue", "error", push.Err()) + return + } + logger.InfoContext(serviceCtx, "job pushed to completed queue", "json", string(wrappedPayloadJSON)) + } + } + + // // make sure we don't orphan completed if there is nothing in queued or other lists for it + // inQueueQueue, err := InQueue(serviceCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/queue") + // if err != nil { + // logger.ErrorContext(serviceCtx, "error searching in queue", "error", err) + // return + // } + // if !inQueueQueue { + // // make sure it doesn't already exist + // inCompletedQueue, err := InQueue(serviceCtx, logger, *workflowJob.WorkflowJob.ID, "anklet/jobs/github/completed") + // if err != nil { + // logger.ErrorContext(serviceCtx, "error searching in queue", "error", err) + // return + // } + // if !inCompletedQueue { + // // push it to the queue + // wrappedJobPayload := map[string]interface{}{ + // "type": "WorkflowJobPayload", + // "payload": workflowJob, + // } + // wrappedPayloadJSON, err := json.Marshal(wrappedJobPayload) + // if err != nil { + // logger.ErrorContext(serviceCtx, "error converting job payload to JSON", "error", err) + // return + // } + // push := databaseContainer.Client.RPush(serviceCtx, "anklet/jobs/github/completed", wrappedPayloadJSON) + // if push.Err() != nil { + // logger.ErrorContext(serviceCtx, "error pushing job to queue", "error", push.Err()) + // return + // } + // logger.InfoContext(serviceCtx, "job pushed to completed queue", "json", string(wrappedPayloadJSON)) + // } + // } + } + } + } + w.WriteHeader(http.StatusOK) + // w.Write([]byte("v1 jobs endpoint")) + }) + http.HandleFunc("/jobs", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) + w.Write([]byte("please use /jobs/v1")) + }) + go func() { + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.ErrorContext(serviceCtx, "controller listener error", "error", err) + } + }() + // redeliver all hooks that failed while the service was down + githubWrapperClient := internalGithub.NewGitHubClientWrapper(githubClient) + serviceCtx = context.WithValue(serviceCtx, config.ContextKey("githubwrapperclient"), githubWrapperClient) + // Redeliver queued jobs + limitForHooks := time.Now().Add(-time.Hour * 24) // the time we want the stop search for redeliveries + var allHooks []map[string]interface{} + opts := &github.ListCursorOptions{PerPage: 10} + doneWithHooks := false + logger.InfoContext(serviceCtx, "listing hook deliveries to see if there any that we missed...") + for { + serviceCtx, hookDeliveries, response, err := ExecuteGitHubClientFunction(serviceCtx, logger, func() (*[]*github.HookDelivery, *github.Response, error) { + hookDeliveries, response, err := githubClient.Repositories.ListHookDeliveries(serviceCtx, service.Owner, service.Repo, service.HookID, opts) + if err != nil { + return nil, nil, err + } + return &hookDeliveries, response, nil + }) + if err != nil { + logger.ErrorContext(serviceCtx, "error listing hooks", "err", err) + return + } + for _, hookDelivery := range *hookDeliveries { + // fmt.Println("hookDelivery", *hookDelivery.ID, *hookDelivery.StatusCode, *hookDelivery.Redelivery, *hookDelivery.DeliveredAt) + if hookDelivery.StatusCode != nil && !*hookDelivery.Redelivery && *hookDelivery.Action != "in_progress" { + serviceCtx, gottenHookDelivery, _, err := ExecuteGitHubClientFunction(serviceCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + gottenHookDelivery, response, err := githubClient.Repositories.GetHookDelivery(serviceCtx, service.Owner, service.Repo, service.HookID, *hookDelivery.ID) + if err != nil { + return nil, nil, err + } + return gottenHookDelivery, response, nil + }) + if err != nil { + logger.ErrorContext(serviceCtx, "error listing hooks", "err", err) + return + } + var workflowJobEvent github.WorkflowJobEvent + err = json.Unmarshal(*gottenHookDelivery.Request.RawPayload, &workflowJobEvent) + if err != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling hook request raw payload to HookResponse", "err", err) + return + } + if len(allHooks) > 0 && limitForHooks.After(gottenHookDelivery.DeliveredAt.Time) { + doneWithHooks = true + break + } + if !exists_in_array_exact(workflowJobEvent.WorkflowJob.Labels, []string{"self-hosted", "anka"}) { + continue + } + allHooks = append(allHooks, map[string]interface{}{ + "hookDelivery": gottenHookDelivery, + "workflowJobEvent": workflowJobEvent, + }) + } + } + if response.Cursor == "" || doneWithHooks { + break + } + opts.Cursor = response.Cursor + } + + // get all keys from database for the main queue and service queues as well as completed + queuedKeys, err := databaseContainer.Client.Keys(serviceCtx, "anklet/jobs/github/queued*").Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting list of keys", "err", err) + return + } + var allQueuedJobs = make(map[string][]string) + for _, key := range queuedKeys { + queuedJobs, err := databaseContainer.Client.LRange(serviceCtx, key, 0, -1).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting list of queued jobs for key: "+key, "err", err) + return + } + allQueuedJobs[key] = queuedJobs + } + completedKeys, err := databaseContainer.Client.Keys(serviceCtx, "anklet/jobs/github/completed*").Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting list of keys", "err", err) + return + } + var allCompletedJobs = make(map[string][]string) + for _, key := range completedKeys { + completedJobs, err := databaseContainer.Client.LRange(serviceCtx, key, 0, -1).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting list of queued jobs for key: "+key, "err", err) + return + } + allCompletedJobs[key] = completedJobs + } + + if len(allHooks) > 0 { + MainLoop: + for i := len(allHooks) - 1; i >= 0; i-- { // make sure we process/redeliver queued before completed + hookWrapper := allHooks[i] + hookDelivery := hookWrapper["hookDelivery"].(*github.HookDelivery) + workflowJobEvent := hookWrapper["workflowJobEvent"].(github.WorkflowJobEvent) + + inQueued := false + // inQueuedListKey := "" + // inQueuedListIndex := 0 + inCompleted := false + inCompletedListKey := "" + inCompletedIndex := 0 + + // logger.DebugContext(serviceCtx, "hookDelivery", "hookDelivery", hookDelivery) + // logger.DebugContext(serviceCtx, "workflowJobEvent", "workflowJobEvent", workflowJobEvent) + + // Check if the job is already in the queue + if workflowJobEvent.WorkflowJob == nil || workflowJobEvent.WorkflowJob.ID == nil { + logger.WarnContext(serviceCtx, "WorkflowJob or WorkflowJob.ID is nil") + continue + } + + // Queued deliveries + // // always get queued jobs so that completed cleanup (when there is no queued but there is a completed) works + for _, queuedJobs := range allQueuedJobs { + for _, queuedJob := range queuedJobs { + if queuedJob == "" { + continue + } + wrappedPayload, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](queuedJob) + if err != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) + return + } + if typeErr != nil { // not the type we want + continue + } + if wrappedPayload.WorkflowJob.ID == nil { + continue + } + if workflowJobEvent.WorkflowJob.ID == nil { + continue + } + if *wrappedPayload.WorkflowJob.ID == *workflowJobEvent.WorkflowJob.ID { + inQueued = true + // inQueuedListKey = key + // inQueuedListIndex = index + break + } + } + } + + // Completed deliveries + if *hookDelivery.Action == "completed" { + for key, completedJobs := range allCompletedJobs { + for index, completedJob := range completedJobs { + wrappedPayload, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](completedJob) + if err != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) + return + } + if typeErr != nil { // not the type we want + continue + } + if *wrappedPayload.WorkflowJob.ID == *workflowJobEvent.WorkflowJob.ID { + inCompleted = true + inCompletedListKey = key + inCompletedIndex = index + break + } + } + } + } + + // if in queued, but also has completed; continue and do nothing + if inQueued && inCompleted { + continue + } + + // if in completed, but has no queued; remove from completed db + if inCompleted && !inQueued { + _, err = databaseContainer.Client.LRem(serviceCtx, inCompletedListKey, 1, allCompletedJobs[inCompletedListKey][inCompletedIndex]).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error removing completedJob from anklet/jobs/github/completed", "err", err, "completedJob", allCompletedJobs[inCompletedListKey][inCompletedIndex]) + return + } + continue + } + + // handle queued that have already been successfully delivered before. + if *hookDelivery.Action == "queued" { + // check if a completed hook exists, so we don't re-queue something already finished + for _, job := range allHooks { + otherHookDelivery := job["hookDelivery"].(*github.HookDelivery) + otherWorkflowJobEvent := job["workflowJobEvent"].(github.WorkflowJobEvent) + if *otherHookDelivery.Action == "completed" && *workflowJobEvent.WorkflowJob.ID == *otherWorkflowJobEvent.WorkflowJob.ID { + continue MainLoop + } + } + } + + // if in queued, and also has a successful completed, something is wrong and we need to re-deliver it. + if *hookDelivery.Action == "completed" && inQueued && *hookDelivery.StatusCode == 200 && !inCompleted { + logger.InfoContext(serviceCtx, "hook delivery has completed but is still in queued; redelivering") + } else if *hookDelivery.StatusCode == 200 || inCompleted { // all other cases (like when it's queued); continue + continue + } + + // Note; We cannot (and probably should not) stop completed from being redelivered. + + // Redeliver the hook + serviceCtx, redelivery, _, _ := ExecuteGitHubClientFunction(serviceCtx, logger, func() (*github.HookDelivery, *github.Response, error) { + redelivery, response, err := githubClient.Repositories.RedeliverHookDelivery(serviceCtx, service.Owner, service.Repo, service.HookID, *hookDelivery.ID) + if err != nil { + return nil, nil, err + } + return redelivery, response, nil + }) + // err doesn't matter here and it will always throw "job scheduled on GitHub side; try again later" + logger.InfoContext(serviceCtx, "hook redelivered", + "redelivery", redelivery, + "hookDelivery.GUID", *hookDelivery.GUID, + "workflowJobEvent.WorkflowJob.ID", *workflowJobEvent.WorkflowJob.ID, + "hookDelivery.Action", *hookDelivery.Action, + "hookDelivery.StatusCode", *hookDelivery.StatusCode, + "hookDelivery.Redelivery", *hookDelivery.Redelivery, + "inCompleted", inCompleted, + ) + } + } + // notify the main thread that the service has started + select { + case <-firstServiceStarted: + default: + close(firstServiceStarted) + } + logger.InfoContext(serviceCtx, "started service") + // wait for the context to be canceled + <-serviceCtx.Done() + logger.InfoContext(serviceCtx, "shutting down controller") + if err := server.Shutdown(serviceCtx); err != nil { + logger.ErrorContext(serviceCtx, "controller shutdown error", "error", err) + } +} diff --git a/plugins/github/README.md b/plugins/github/README.md index 8ab8127..6c16c8f 100644 --- a/plugins/github/README.md +++ b/plugins/github/README.md @@ -13,8 +13,8 @@ services: token: github_pat_XXX # Instead of PAT, you can create a github app for your org/repo and use its credentials instead. # private_key: /path/to/private/key - # app_id: 12345678 # Settings > Developer > settings > GitHub App > About item - # installation_id: 12345678 # Settings > Developer > settings > GitHub Apps > Advanced > Payload in Request tab + # app_id: 12345678 # Org > Settings > Developer settings > GitHub Apps > New GitHub App + # installation_id: 12345678 # You need to install the app (Org > Settings > Developer settings > GitHub Apps > click Edit for the app > Install App > select your Repo > then check the URL bar for the installation ID) registration: repo repo: anklet owner: veertuinc diff --git a/plugins/github/github.go b/plugins/github/github.go index c13dc88..dac336d 100644 --- a/plugins/github/github.go +++ b/plugins/github/github.go @@ -2,122 +2,161 @@ package github import ( "context" - "errors" + "encoding/json" "fmt" "log/slog" - "regexp" - "slices" - "sort" - "strconv" "strings" + "sync" "time" "github.com/bradleyfalzon/ghinstallation/v2" - "github.com/google/go-github/v61/github" + "github.com/google/go-github/v63/github" + "github.com/redis/go-redis/v9" "github.com/veertuinc/anklet/internal/anka" "github.com/veertuinc/anklet/internal/config" - dbFunctions "github.com/veertuinc/anklet/internal/database" + "github.com/veertuinc/anklet/internal/database" internalGithub "github.com/veertuinc/anklet/internal/github" "github.com/veertuinc/anklet/internal/logging" "github.com/veertuinc/anklet/internal/metrics" ) type WorkflowRunJobDetail struct { - Job github.WorkflowJob - WorkflowRunName string + JobID int64 + JobName string + JobURL string + WorkflowName string AnkaTemplate string AnkaTemplateTag string - RunID string + RunID int64 UniqueID string + Labels []string } -func exists_in_array_exact(array_to_search_in []string, desired []string) bool { - for _, desired_string := range desired { - found := false - for _, item := range array_to_search_in { - if item == desired_string { - found = true - break - } - } - if !found { - return false - } - } - return true -} +// func exists_in_array_exact(array_to_search_in []string, desired []string) bool { +// for _, desired_string := range desired { +// found := false +// for _, item := range array_to_search_in { +// if item == desired_string { +// found = true +// break +// } +// } +// if !found { +// return false +// } +// } +// return true +// } + +// func exists_in_array_regex(array_to_search_in []string, desired []string) bool { +// if len(desired) == 0 || desired[0] == "" { +// return false +// } +// for _, desired_string := range desired { +// // fmt.Printf(" desired_string: %s\n", desired_string) +// found := false +// for _, item := range array_to_search_in { +// // fmt.Printf(" item: %s\n", item) +// // Check if the desired_string is a valid regex pattern +// if rege, err := regexp.Compile(desired_string); err == nil { +// // If it's a valid regex, check for a regex match +// sanitizedSplit := slices.DeleteFunc(rege.Split(item, -1), func(e string) bool { +// return e == "" +// }) +// // fmt.Printf(" sanitizedSplit: %+v\n", sanitizedSplit) +// if len(sanitizedSplit) == 0 { +// // fmt.Println(" regex match") +// found = true +// break +// } +// } +// } +// if !found { +// return false +// } +// } +// return true +// } + +// func does_not_exist_in_array_regex(array_to_search_in []string, excluded []string) bool { +// if len(excluded) == 0 || excluded[0] == "" { +// return true +// } +// for _, excluded_string := range excluded { +// // fmt.Printf(" excluded_string: %s\n", excluded_string) +// found := false +// for _, item := range array_to_search_in { +// // fmt.Printf(" item: %s\n", item) +// // Check if the desired_string is a valid regex pattern +// if rege, err := regexp.Compile(excluded_string); err == nil { +// // If it's a valid regex, check for a regex match +// sanitizedSplit := slices.DeleteFunc(rege.Split(item, -1), func(e string) bool { +// return e == "" +// }) +// // fmt.Printf(" sanitizedSplit: %+v\n", sanitizedSplit) +// if len(sanitizedSplit) > 0 { +// // fmt.Println(" regex match") +// found = true +// break +// } +// } +// } +// if !found { +// return false +// } +// } +// return true +// } -func exists_in_array_regex(array_to_search_in []string, desired []string) bool { - if len(desired) == 0 || desired[0] == "" { - return false - } - for _, desired_string := range desired { - // fmt.Printf(" desired_string: %s\n", desired_string) - found := false - for _, item := range array_to_search_in { - // fmt.Printf(" item: %s\n", item) - // Check if the desired_string is a valid regex pattern - if rege, err := regexp.Compile(desired_string); err == nil { - // If it's a valid regex, check for a regex match - sanitizedSplit := slices.DeleteFunc(rege.Split(item, -1), func(e string) bool { - return e == "" - }) - // fmt.Printf(" sanitizedSplit: %+v\n", sanitizedSplit) - if len(sanitizedSplit) == 0 { - // fmt.Println(" regex match") - found = true - break - } - } - } - if !found { - return false +func extractLabelValue(labels []string, prefix string) string { + for _, label := range labels { + if strings.HasPrefix(label, prefix) { + return strings.TrimPrefix(label, prefix) } } - return true + return "" } -func does_not_exist_in_array_regex(array_to_search_in []string, excluded []string) bool { - if len(excluded) == 0 || excluded[0] == "" { - return true - } - for _, excluded_string := range excluded { - // fmt.Printf(" excluded_string: %s\n", excluded_string) - found := false - for _, item := range array_to_search_in { - // fmt.Printf(" item: %s\n", item) - // Check if the desired_string is a valid regex pattern - if rege, err := regexp.Compile(excluded_string); err == nil { - // If it's a valid regex, check for a regex match - sanitizedSplit := slices.DeleteFunc(rege.Split(item, -1), func(e string) bool { - return e == "" +func sendCancelWorkflowRun(serviceCtx context.Context, logger *slog.Logger, workflowRunID int64) error { + githubClient := internalGithub.GetGitHubClientFromContext(serviceCtx) + service := config.GetServiceFromContext(serviceCtx) + cancelSent := false + for { + serviceCtx, workflowRun, _, err := executeGitHubClientFunction[github.WorkflowRun](serviceCtx, logger, func() (*github.WorkflowRun, *github.Response, error) { + workflowRun, resp, err := githubClient.Actions.GetWorkflowRunByID(context.Background(), service.Owner, service.Repo, workflowRunID) + return workflowRun, resp, err + }) + if err != nil { + logger.ErrorContext(serviceCtx, "error getting workflow run by ID", "err", err) + return err + } + if *workflowRun.Status == "completed" || + (workflowRun.Conclusion != nil && *workflowRun.Conclusion == "cancelled") || + cancelSent { + break + } else { + logger.WarnContext(serviceCtx, "workflow run is still active... waiting for cancellation so we can clean up...", "workflow_run_id", workflowRunID) + if !cancelSent { // this has to happen here so that it doesn't error with "409 Cannot cancel a workflow run that is completed. " if the job is already cancelled + serviceCtx, cancelResponse, _, cancelErr := executeGitHubClientFunction[github.Response](serviceCtx, logger, func() (*github.Response, *github.Response, error) { + resp, err := githubClient.Actions.CancelWorkflowRunByID(context.Background(), service.Owner, service.Repo, workflowRunID) + return resp, nil, err }) - // fmt.Printf(" sanitizedSplit: %+v\n", sanitizedSplit) - if len(sanitizedSplit) > 0 { - // fmt.Println(" regex match") - found = true - break + // don't use cancelResponse.Response.StatusCode or else it'll error with SIGSEV + if cancelErr != nil && !strings.Contains(cancelErr.Error(), "try again later") { + logger.ErrorContext(serviceCtx, "error executing githubClient.Actions.CancelWorkflowRunByID", "err", cancelErr, "response", cancelResponse) + return cancelErr } + cancelSent = true + logger.WarnContext(serviceCtx, "sent cancel workflow run", "workflow_run_id", workflowRunID) } - } - if !found { - return false + time.Sleep(10 * time.Second) } } - return true -} - -func extractLabelValue(labels []string, prefix string) string { - for _, label := range labels { - if strings.HasPrefix(label, prefix) { - return strings.TrimPrefix(label, prefix) - } - } - return "" + return nil } // https://github.com/gofri/go-github-ratelimit has yet to support primary rate limits, so we have to do it ourselves. -func ExecuteGitHubClientFunction[T any](serviceCtx context.Context, logger *slog.Logger, executeFunc func() (*T, *github.Response, error)) (context.Context, *T, *github.Response, error) { +func executeGitHubClientFunction[T any](serviceCtx context.Context, logger *slog.Logger, executeFunc func() (*T, *github.Response, error)) (context.Context, *T, *github.Response, error) { result, response, err := executeFunc() if response != nil { serviceCtx = logging.AppendCtx(serviceCtx, slog.Int("api_limit_remaining", response.Rate.Remaining)) @@ -138,179 +177,314 @@ func ExecuteGitHubClientFunction[T any](serviceCtx context.Context, logger *slog Name: service.Name, Status: "running", }) - return ExecuteGitHubClientFunction(serviceCtx, logger, executeFunc) // Retry the function after waiting + return executeGitHubClientFunction(serviceCtx, logger, executeFunc) // Retry the function after waiting case <-serviceCtx.Done(): return serviceCtx, nil, nil, serviceCtx.Err() } } } - if err != nil && - err.Error() != "context canceled" && - !strings.Contains(err.Error(), "try again later") { - logger.Error("error executing GitHub client function: " + err.Error()) + if err != nil { + if err.Error() != "context canceled" { + if !strings.Contains(err.Error(), "try again later") { + logger.Error("error executing GitHub client function: " + err.Error()) + } + } return serviceCtx, nil, nil, err } return serviceCtx, result, response, nil } func setLoggingContext(serviceCtx context.Context, workflowRunJob WorkflowRunJobDetail) context.Context { - serviceCtx = logging.AppendCtx(serviceCtx, slog.String("workflowName", *workflowRunJob.Job.WorkflowName)) - serviceCtx = logging.AppendCtx(serviceCtx, slog.String("workflowRunName", workflowRunJob.WorkflowRunName)) - serviceCtx = logging.AppendCtx(serviceCtx, slog.Int64("workflowRunId", *workflowRunJob.Job.RunID)) - serviceCtx = logging.AppendCtx(serviceCtx, slog.Int64("workflowJobId", *workflowRunJob.Job.ID)) - serviceCtx = logging.AppendCtx(serviceCtx, slog.String("workflowJobName", *workflowRunJob.Job.Name)) + serviceCtx = logging.AppendCtx(serviceCtx, slog.String("workflowName", workflowRunJob.WorkflowName)) + // not available in webhooks + // serviceCtx = logging.AppendCtx(serviceCtx, slog.String("workflowRunName", workflowRunJob.WorkflowRunName)) + serviceCtx = logging.AppendCtx(serviceCtx, slog.Int64("workflowRunId", workflowRunJob.RunID)) + serviceCtx = logging.AppendCtx(serviceCtx, slog.Int64("workflowJobId", workflowRunJob.JobID)) + serviceCtx = logging.AppendCtx(serviceCtx, slog.String("workflowJobName", workflowRunJob.JobName)) serviceCtx = logging.AppendCtx(serviceCtx, slog.String("uniqueId", workflowRunJob.UniqueID)) serviceCtx = logging.AppendCtx(serviceCtx, slog.String("ankaTemplate", workflowRunJob.AnkaTemplate)) serviceCtx = logging.AppendCtx(serviceCtx, slog.String("ankaTemplateTag", workflowRunJob.AnkaTemplateTag)) - serviceCtx = logging.AppendCtx(serviceCtx, slog.String("jobURL", *workflowRunJob.Job.HTMLURL)) + serviceCtx = logging.AppendCtx(serviceCtx, slog.String("jobURL", workflowRunJob.JobURL)) return serviceCtx } -func getWorkflowRunJobs(serviceCtx context.Context, logger *slog.Logger) ([]WorkflowRunJobDetail, []string, context.Context, error) { - if serviceCtx.Err() != nil { - return nil, nil, serviceCtx, fmt.Errorf("context canceled before getWorkflowRunJobs") - } - githubClient := internalGithub.GetGitHubClientFromContext(serviceCtx) +func CheckForCompletedJobs( + workerCtx context.Context, + serviceCtx context.Context, + logger *slog.Logger, + checkForCompletedJobsMu *sync.Mutex, + completedJobChannel chan github.WorkflowJobEvent, + ranOnce chan struct{}, + runOnce bool, + failureChannel chan bool, +) { service := config.GetServiceFromContext(serviceCtx) - var allWorkflowRunJobDetails []WorkflowRunJobDetail - // WORKFLOWS - serviceCtx, workflows, _, err := ExecuteGitHubClientFunction[*github.Workflows](serviceCtx, logger, func() (**github.Workflows, *github.Response, error) { - workflows, resp, err := githubClient.Actions.ListWorkflows(context.Background(), service.Owner, service.Repo, &github.ListOptions{}) - return &workflows, resp, err - }) - - if serviceCtx.Err() != nil { - logger.WarnContext(serviceCtx, "context canceled during workflows listing") - return []WorkflowRunJobDetail{}, nil, serviceCtx, nil - } + databaseContainer, err := database.GetDatabaseFromContext(serviceCtx) if err != nil { - logger.ErrorContext(serviceCtx, "error executing githubClient.Actions.ListWorkflows", "err", err) - return []WorkflowRunJobDetail{}, nil, serviceCtx, errors.New("error executing githubClient.Actions.ListWorkflows") + logger.ErrorContext(serviceCtx, "error getting database from context", "err", err) + logging.Panic(workerCtx, serviceCtx, "error getting database from context") } - // fmt.Printf("%+v\n", service.Workflows) - var workflowsToScan []string - for _, workflow := range (*workflows).Workflows { - if *workflow.State == "active" { - - // fmt.Printf("1: %s\n", *workflow.Name) - // check service.Workflows.Include and Exclude - if exists_in_array_regex([]string{*workflow.Name}, service.Workflows.Exclude) { - if does_not_exist_in_array_regex([]string{*workflow.Name}, service.Workflows.Include) { - continue - } - } - - workflowsToScan = append(workflowsToScan, *workflow.Name) - - // WORKFLOW RUNS - serviceCtx, workflow_runs, _, err := ExecuteGitHubClientFunction[*github.WorkflowRuns](serviceCtx, logger, func() (**github.WorkflowRuns, *github.Response, error) { - workflow_runs, resp, err := githubClient.Actions.ListWorkflowRunsByID(context.Background(), service.Owner, service.Repo, *workflow.ID, &github.ListWorkflowRunsOptions{ - ListOptions: github.ListOptions{PerPage: 30}, - Status: "queued", - }) - return &workflow_runs, resp, err // Adjusted to return the direct result - }) - if err != nil { - if strings.Contains(err.Error(), "context canceled") { - logger.WarnContext(serviceCtx, "context canceled during githubClient.Actions.ListWorkflowRunsByID", "err", err) - } else { - logger.ErrorContext(serviceCtx, "error executing githubClient.Actions.ListWorkflowRunsByID", "err", err) - } - return []WorkflowRunJobDetail{}, nil, serviceCtx, errors.New("error executing githubClient.Actions.ListWorkflowRunsByID") + defer func() { + if checkForCompletedJobsMu != nil { + checkForCompletedJobsMu.Unlock() + } + // ensure, outside of needing to return on error, that the following always runs + select { + case <-ranOnce: + // already closed, do nothing + default: + close(ranOnce) + } + }() + for { + // BE VERY CAREFUL when you use return here. You could orphan the job if you're not careful. + checkForCompletedJobsMu.Lock() + // do not use 'continue' in the loop or else the ranOnce won't happen + // fmt.Println("CheckForCompletedJobs loop start: " + fmt.Sprint(runOnce)) + select { + case <-failureChannel: + logger.ErrorContext(serviceCtx, "CheckForCompletedJobs"+service.Name+" failureChannel") + returnToMainQueue, ok := workerCtx.Value(config.ContextKey("returnToMainQueue")).(chan bool) + if !ok { + logger.ErrorContext(serviceCtx, "error getting returnToMainQueue from context") + return } - for _, workflowRun := range (*workflow_runs).WorkflowRuns { - serviceCtx, workflowRunJobs, _, err := ExecuteGitHubClientFunction[github.Jobs](serviceCtx, logger, func() (*github.Jobs, *github.Response, error) { - workflowRunJobs, resp, err := githubClient.Actions.ListWorkflowJobs(context.Background(), service.Owner, service.Repo, *workflowRun.ID, &github.ListWorkflowJobsOptions{ - ListOptions: github.ListOptions{PerPage: 30}, - }) - return workflowRunJobs, resp, err - }) + returnToMainQueue <- true + return + case <-completedJobChannel: + return + case <-serviceCtx.Done(): + logger.WarnContext(serviceCtx, "CheckForCompletedJobs"+service.Name+" serviceCtx.Done()") + return + default: + } + // get the job ID + existingJobString, err := databaseContainer.Client.LIndex(serviceCtx, "anklet/jobs/github/queued/"+service.Name, 0).Result() + if runOnce && err == redis.Nil { // handle no job for service; needed so the github plugin resets and looks for new jobs again + logger.ErrorContext(serviceCtx, "CheckForCompletedJobs"+service.Name+" err == redis.Nil") + return + } else { + if err == nil { + // check if there is already a completed job queued for the service + // // this can happen if the service crashes or is stopped before it finalizes cleanup + count, err := databaseContainer.Client.LLen(serviceCtx, "anklet/jobs/github/completed/"+service.Name).Result() if err != nil { - if strings.Contains(err.Error(), "context canceled") { - logger.WarnContext(serviceCtx, "context canceled during githubClient.Actions.ListWorkflowJobs", "err", err) - return []WorkflowRunJobDetail{}, nil, serviceCtx, nil - } else { - logger.ErrorContext(serviceCtx, "error executing githubClient.Actions.ListWorkflowJobs", "err", err) - return []WorkflowRunJobDetail{}, nil, serviceCtx, errors.New("error executing githubClient.Actions.ListWorkflowJobs") - } + logger.ErrorContext(serviceCtx, "error getting count of objects in anklet/jobs/github/completed/"+service.Name, "err", err) + return } - for _, job := range workflowRunJobs.Jobs { - if *job.Status == "queued" { // I don't know why, but we'll get completed jobs back in the list - if exists_in_array_exact(job.Labels, []string{"self-hosted", "anka"}) { - serviceCtx = setLoggingContext(serviceCtx, WorkflowRunJobDetail{ - Job: *job, - WorkflowRunName: *workflowRun.Name, - }) - - // this ensures that jobs in the same workspace don't compete for the same runner - runID := extractLabelValue(job.Labels, "run-id:") - if runID == "" { - logger.WarnContext(serviceCtx, "run-id label not found or empty; something wrong with your workflow yaml") - continue - } - if runID != strconv.FormatInt(*job.RunID, 10) { // make sure the user set it properly - logger.WarnContext(serviceCtx, "run-id label does not match the job's run ID; potential misconfiguration in workflow yaml") - continue - } - - // get the unique unique-id for this job - // this ensures that multiple jobs in the same workflow run don't compete for the same runner - uniqueID := extractLabelValue(job.Labels, "unique-id:") - if uniqueID == "" { - logger.WarnContext(serviceCtx, "unique-id label not found or empty; something wrong with your workflow yaml") - continue - } - - ankaTemplate := extractLabelValue(job.Labels, "anka-template:") - if ankaTemplate == "" { - logger.WarnContext(serviceCtx, "warning: unable to find Anka Template specified in labels - skipping") - continue - } - ankaTemplateTag := extractLabelValue(job.Labels, "anka-template-tag:") - if ankaTemplateTag == "" { - ankaTemplateTag = "(using latest)" - } + existingJobEvent, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](existingJobString) + if err != nil || typeErr != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) + return + } + if count > 0 { + select { + case completedJobChannel <- existingJobEvent: + default: + // remove the completed job we found + _, err = databaseContainer.Client.Del(serviceCtx, "anklet/jobs/github/completed/"+service.Name).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error removing completedJob from anklet/jobs/github/completed/"+service.Name, "err", err) + return + } + } + } else { + completedJobs, err := databaseContainer.Client.LRange(serviceCtx, "anklet/jobs/github/completed", 0, -1).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting list of completed jobs", "err", err) + return + } + if existingJobEvent.WorkflowJob == nil { + logger.ErrorContext(serviceCtx, "existingJobEvent.WorkflowJob is nil") + return + } + for _, completedJob := range completedJobs { + completedJobWebhookEvent, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](completedJob) + if err != nil || typeErr != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) + return + } - // if a node is pulling, the job doesn't change from queued, so let's do a check to see if a node picked it up or not - exists, err := dbFunctions.CheckIfKeyExists(serviceCtx, fmt.Sprintf("%s:%s", runID, uniqueID)) + if *completedJobWebhookEvent.WorkflowJob.ID == *existingJobEvent.WorkflowJob.ID { + // remove the completed job we found + _, err = databaseContainer.Client.LRem(serviceCtx, "anklet/jobs/github/completed", 1, completedJob).Result() if err != nil { - if strings.Contains(err.Error(), "context canceled") { - logger.WarnContext(serviceCtx, "context was canceled while checking if key exists in database", "err", err) - return []WorkflowRunJobDetail{}, nil, serviceCtx, nil - } else { - logger.ErrorContext(serviceCtx, "error checking if key exists in database", "err", err) - return []WorkflowRunJobDetail{}, nil, serviceCtx, errors.New("error checking if key exists in database") - } + logger.ErrorContext(serviceCtx, "error removing completedJob from anklet/jobs/github/completed", "err", err, "completedJob", completedJobWebhookEvent) + return } - - if !exists { - allWorkflowRunJobDetails = append(allWorkflowRunJobDetails, WorkflowRunJobDetail{ - Job: *job, - WorkflowRunName: *workflowRun.Name, - AnkaTemplate: ankaTemplate, - AnkaTemplateTag: ankaTemplateTag, - RunID: runID, - UniqueID: uniqueID, - }) + // delete the existing service task + // _, err = databaseContainer.Client.Del(serviceCtx, serviceQueueDatabaseKeyName).Result() + // if err != nil { + // logger.ErrorContext(serviceCtx, "error deleting all objects from "+serviceQueueDatabaseKeyName, "err", err) + // return + // } + // add a task for the completed job so we know the clean up + _, err = databaseContainer.Client.LPush(serviceCtx, "anklet/jobs/github/completed/"+service.Name, completedJob).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error inserting completed job into list", "err", err) + return } + completedJobChannel <- completedJobWebhookEvent + return } } } } } + // ensure, outside of needing to return on error, that the following always runs + select { + case <-ranOnce: + // already closed, do nothing + default: + close(ranOnce) + } + if runOnce { + return + } + if checkForCompletedJobsMu != nil { + checkForCompletedJobsMu.Unlock() + } + time.Sleep(3 * time.Second) } +} - sort.Slice(allWorkflowRunJobDetails, func(i, j int) bool { - if allWorkflowRunJobDetails[i].Job.CreatedAt.Equal(*allWorkflowRunJobDetails[j].Job.CreatedAt) { - return *allWorkflowRunJobDetails[i].Job.Name < *allWorkflowRunJobDetails[j].Job.Name +// cleanup will pop off the last item from the list and, based on its type, perform the appropriate cleanup action +// this assumes the plugin code created a list item to represent the thing to clean up +func cleanup( + workerCtx context.Context, + serviceCtx context.Context, + logger *slog.Logger, + completedJobChannel chan github.WorkflowJobEvent, + cleanupMu *sync.Mutex, +) { + cleanupMu.Lock() + // create an idependent copy of the serviceCtx so we can do cleanup even if serviceCtx got "context canceled" + cleanupContext := context.Background() + service := config.GetServiceFromContext(serviceCtx) + returnToMainQueue, ok := workerCtx.Value(config.ContextKey("returnToMainQueue")).(chan bool) + if !ok { + logger.ErrorContext(serviceCtx, "error getting returnToMainQueue from context") + return + } + serviceDatabase, err := database.GetDatabaseFromContext(serviceCtx) + if err != nil { + logger.ErrorContext(serviceCtx, "error getting database from context", "err", err) + return + } + cleanupContext = context.WithValue(cleanupContext, config.ContextKey("database"), serviceDatabase) + cleanupContext, cancel := context.WithCancel(cleanupContext) + defer func() { + if cleanupMu != nil { + cleanupMu.Unlock() + } + cancel() + }() + databaseContainer, err := database.GetDatabaseFromContext(cleanupContext) + if err != nil { + logger.ErrorContext(serviceCtx, "error getting database from context", "err", err) + return + } + for { + var jobJSON string + exists, err := databaseContainer.Client.Exists(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning").Result() + if err != nil { + logger.ErrorContext(cleanupContext, "error checking if cleaning up already in progress", "err", err) + } + if exists == 1 { + logger.InfoContext(serviceCtx, "cleaning up already in progress; getting job") + jobJSON, err = databaseContainer.Client.LIndex(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning", 0).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error getting job from the list", "err", err) + return + } + } else { + // pop the job from the list and push it to the cleaning list + jobJSON, err = databaseContainer.Client.RPopLPush(cleanupContext, "anklet/jobs/github/queued/"+service.Name, "anklet/jobs/github/queued/"+service.Name+"/cleaning").Result() + if err == redis.Nil { + return // nothing to clean up + } else if err != nil { + logger.ErrorContext(serviceCtx, "error popping job from the list", "err", err) + return + } + } + var typedJob map[string]interface{} + if err := json.Unmarshal([]byte(jobJSON), &typedJob); err != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) + return } - return allWorkflowRunJobDetails[i].Job.CreatedAt.Time.Before(allWorkflowRunJobDetails[j].Job.CreatedAt.Time) - }) - return allWorkflowRunJobDetails, workflowsToScan, serviceCtx, nil + var payload map[string]interface{} + payloadJSON, err := json.Marshal(typedJob) + if err != nil { + logger.ErrorContext(serviceCtx, "error marshalling payload", "err", err) + return + } + if err := json.Unmarshal(payloadJSON, &payload); err != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) + return + } + payloadBytes, err := json.Marshal(payload["payload"]) + if err != nil { + logger.ErrorContext(serviceCtx, "error marshalling payload", "err", err) + return + } + switch typedJob["type"] { + case "anka.VM": + var vm anka.VM + err = json.Unmarshal(payloadBytes, &vm) + if err != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling payload to webhook.WorkflowJobPayload", "err", err) + return + } + ankaCLI := anka.GetAnkaCLIFromContext(serviceCtx) + ankaCLI.AnkaDelete(workerCtx, serviceCtx, &vm) + databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning") + continue // required to keep processing tasks in the db list + case "WorkflowJobPayload": // MUST COME LAST + var workflowJobEvent github.WorkflowJobEvent + err = json.Unmarshal(payloadBytes, &workflowJobEvent) + if err != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling payload to webhook.WorkflowJobPayload", "err", err) + return + } + // return it to the queue if the job isn't completed yet + // if we don't, we could suffer from a situation where a completed job comes in and is orphaned + select { + case <-completedJobChannel: + databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/completed/"+service.Name) + databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning") + break // break loop and delete /queued/servicename + default: + select { + case <-returnToMainQueue: + logger.WarnContext(serviceCtx, "pushing job back to anklet/jobs/github/queued") + _, err := databaseContainer.Client.RPopLPush(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning", "anklet/jobs/github/queued").Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error pushing job back to queued", "err", err) + return + } + databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning") + default: + logger.WarnContext(serviceCtx, "pushing job back to anklet/jobs/github/queued/"+service.Name) + _, err := databaseContainer.Client.RPopLPush(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning", "anklet/jobs/github/queued/"+service.Name).Result() + if err != nil { + logger.ErrorContext(serviceCtx, "error pushing job back to queued", "err", err) + return + } + databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+service.Name+"/cleaning") + } + } + default: + logger.ErrorContext(serviceCtx, "unknown job type", "job", typedJob) + return + } + return // don't delete the queued/servicename + } + // databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+service.Name) } -func Run(workerCtx context.Context, serviceCtx context.Context, logger *slog.Logger) { +func Run(workerCtx context.Context, serviceCtx context.Context, serviceCancel context.CancelFunc, logger *slog.Logger) { + logger.InfoContext(serviceCtx, "checking for jobs....") service := config.GetServiceFromContext(serviceCtx) @@ -350,90 +524,162 @@ func Run(workerCtx context.Context, serviceCtx context.Context, logger *slog.Log githubWrapperClient := internalGithub.NewGitHubClientWrapper(githubClient) serviceCtx = context.WithValue(serviceCtx, config.ContextKey("githubwrapperclient"), githubWrapperClient) - serviceCtx = logging.AppendCtx(serviceCtx, slog.String("repo", service.Repo)) serviceCtx = logging.AppendCtx(serviceCtx, slog.String("owner", service.Owner)) - repositoryURL := fmt.Sprintf("https://github.com/%s/%s", service.Owner, service.Repo) - // obtain all queued workflow runs and jobs - allWorkflowRunJobDetails, workflowsToScan, serviceCtx, err := getWorkflowRunJobs(serviceCtx, logger) + checkForCompletedJobsMu := &sync.Mutex{} + cleanupMu := &sync.Mutex{} + + failureChannel := make(chan bool, 1) + + completedJobChannel := make(chan github.WorkflowJobEvent, 1) + // wait group so we can wait for the goroutine to finish before exiting the service + var wg sync.WaitGroup + wg.Add(1) + + databaseContainer, err := database.GetDatabaseFromContext(serviceCtx) if err != nil { - logger.ErrorContext(serviceCtx, "error getting workflow run jobs", "err", err) + logger.ErrorContext(serviceCtx, "error getting database from context", "err", err) return } - if serviceCtx.Err() != nil { - logger.WarnContext(serviceCtx, "context canceled after getWorkflowRunJobs") + + defer func() { + wg.Wait() + cleanup(workerCtx, serviceCtx, logger, completedJobChannel, cleanupMu) + close(completedJobChannel) + }() + + // check constantly for a cancelled webhook to be received for our job + ranOnce := make(chan struct{}) + go func() { + CheckForCompletedJobs(workerCtx, serviceCtx, logger, checkForCompletedJobsMu, completedJobChannel, ranOnce, false, failureChannel) + wg.Done() + }() + <-ranOnce // wait for the goroutine to run at least once + // finalize cleanup if the service crashed mid-cleanup + cleanup(workerCtx, serviceCtx, logger, completedJobChannel, cleanupMu) + select { + case <-completedJobChannel: + logger.InfoContext(serviceCtx, "completed job found at start") + completedJobChannel <- github.WorkflowJobEvent{} + return + case <-serviceCtx.Done(): + logger.WarnContext(serviceCtx, "context canceled before completed job found") return + default: } - logger.DebugContext(serviceCtx, "workflows to scan", "workflows", workflowsToScan) - - // simplifiedWorkflowRuns := make([]map[string]interface{}, 0) - // for _, workflowRunJob := range allWorkflowRunJobDetails { - // simplifiedRun := map[string]interface{}{ - // "name": workflowRunJob.Job.Name, - // "created_at": workflowRunJob.Job.CreatedAt, - // "workflow_name": workflowRunJob.Job.WorkflowName, - // "workflow_run_name": workflowRunJob.WorkflowRunName, - // "run_id": workflowRunJob.Job.RunID, - // "unique_id": workflowRunJob.UniqueID, - // "html_url": workflowRunJob.Job.HTMLURL, - // "labels": workflowRunJob.Job.Labels, - // "status": workflowRunJob.Job.Status, - // } - // simplifiedWorkflowRuns = append(simplifiedWorkflowRuns, simplifiedRun) - // } - // allWorkflowRunJobsJSON, _ := json.MarshalIndent(simplifiedWorkflowRuns, "", " ") - // fmt.Printf("%s\n", allWorkflowRunJobsJSON) - - // Loop over all items, so we don't have to re-request the whole list of queued jobs if one is already running on another host - for _, workflowRunJob := range allWorkflowRunJobDetails { - serviceCtx = setLoggingContext(serviceCtx, workflowRunJob) - - // Check if the job is already running, and ensure in DB to prevent other runners from getting it - uniqueKey := fmt.Sprintf("%s:%s", workflowRunJob.RunID, workflowRunJob.UniqueID) - serviceCtx = dbFunctions.UpdateUniqueRunKey(serviceCtx, uniqueKey) - if already, err := dbFunctions.CheckIfKeyExists(serviceCtx, uniqueKey); err != nil { - logger.ErrorContext(serviceCtx, "error checking if already in db", "err", err) - return - } else if already { - logger.DebugContext(serviceCtx, "job already running, skipping") - // this would cause a double run problem if a job finished on hostA and hostB had an array of workflowRunJobs with queued still for the same job - // we get the latest workflow run jobs each run to prevent this - // also, we don't return and use continue below so that we can just use the next job in the list and not have to re-parse the entire thing or make more api calls - continue - } else if !already { - added, err := dbFunctions.AddUniqueRunKey(serviceCtx) - if added && err != nil { - logger.DebugContext(serviceCtx, "unique key already in db") - continue // go to next item so we don't have to query for all running jobs again if another host already picked it up - } - if !added && err != nil { - logger.ErrorContext(serviceCtx, "error adding unique run key", "err", err) - return - } + + var wrappedPayloadJSON string + // allow picking up where we left off + wrappedPayloadJSON, err = databaseContainer.Client.LIndex(serviceCtx, "anklet/jobs/github/queued/"+service.Name, -1).Result() + if err != nil && err != redis.Nil { + logger.ErrorContext(serviceCtx, "error getting last object from anklet/jobs/github/queued/"+service.Name, "err", err) + return + } + if wrappedPayloadJSON == "" { // if we haven't done anything before, get something from the main queue + eldestQueuedJob, err := databaseContainer.Client.LPop(serviceCtx, "anklet/jobs/github/queued").Result() + if err == redis.Nil { + logger.DebugContext(serviceCtx, "no queued jobs found") + completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine + return } - defer dbFunctions.RemoveUniqueKeyFromDB(serviceCtx) + if err != nil { + logger.ErrorContext(serviceCtx, "error getting queued jobs", "err", err) + return + } + databaseContainer.Client.RPush(serviceCtx, "anklet/jobs/github/queued/"+service.Name, eldestQueuedJob) + wrappedPayloadJSON = eldestQueuedJob + } - logger.InfoContext(serviceCtx, "handling anka workflow run job") - metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{ - Status: "running", - }) + queuedJob, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](wrappedPayloadJSON) + if err != nil || typeErr != nil { + logger.ErrorContext(serviceCtx, "error unmarshalling job", "err", err) + return + } + serviceCtx = logging.AppendCtx(serviceCtx, slog.Int64("workflowJobID", *queuedJob.WorkflowJob.ID)) + serviceCtx = logging.AppendCtx(serviceCtx, slog.String("workflowJobName", *queuedJob.WorkflowJob.Name)) + serviceCtx = logging.AppendCtx(serviceCtx, slog.Int64("workflowJobRunID", *queuedJob.WorkflowJob.RunID)) + + logger.DebugContext(serviceCtx, "queued job found", "queuedJob", queuedJob.Action) + + // check if the job is already completed, so we don't orphan if there is + // a job in anklet/jobs/github/queued and also a anklet/jobs/github/completed + CheckForCompletedJobs(workerCtx, serviceCtx, logger, checkForCompletedJobsMu, completedJobChannel, ranOnce, true, failureChannel) + select { + case <-completedJobChannel: + logger.InfoContext(serviceCtx, "completed job found by CheckForCompletedJobs") + completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine + return + case <-serviceCtx.Done(): + logger.WarnContext(serviceCtx, "context canceled before completed job found") + return + default: + } + + // get the unique unique-id for this job + // this ensures that multiple jobs in the same workflow run don't compete for the same runner + uniqueID := extractLabelValue(queuedJob.WorkflowJob.Labels, "unique-id:") + if uniqueID == "" { + logger.WarnContext(serviceCtx, "unique-id label not found or empty; something wrong with your workflow yaml") + return + } + serviceCtx = logging.AppendCtx(serviceCtx, slog.String("uniqueID", uniqueID)) + ankaTemplate := extractLabelValue(queuedJob.WorkflowJob.Labels, "anka-template:") + if ankaTemplate == "" { + logger.WarnContext(serviceCtx, "warning: unable to find Anka Template specified in labels - skipping") + return + } + serviceCtx = logging.AppendCtx(serviceCtx, slog.String("ankaTemplate", ankaTemplate)) + ankaTemplateTag := extractLabelValue(queuedJob.WorkflowJob.Labels, "anka-template-tag:") + if ankaTemplateTag == "" { + ankaTemplateTag = "(using latest)" + } + serviceCtx = logging.AppendCtx(serviceCtx, slog.String("ankaTemplateTag", ankaTemplateTag)) + + workflowJob := WorkflowRunJobDetail{ + JobID: *queuedJob.WorkflowJob.ID, + JobName: *queuedJob.WorkflowJob.Name, + JobURL: *queuedJob.WorkflowJob.HTMLURL, + WorkflowName: *queuedJob.WorkflowJob.WorkflowName, + AnkaTemplate: ankaTemplate, + AnkaTemplateTag: ankaTemplateTag, + RunID: *queuedJob.WorkflowJob.RunID, + UniqueID: uniqueID, + Labels: queuedJob.WorkflowJob.Labels, + } - // get anka CLI - ankaCLI := anka.GetAnkaCLIFromContext(serviceCtx) + // get anka CLI + ankaCLI := anka.GetAnkaCLIFromContext(serviceCtx) - // See if VM Template existing already - templateTagExistsError := ankaCLI.EnsureVMTemplateExists(workerCtx, serviceCtx, workflowRunJob.AnkaTemplate, workflowRunJob.AnkaTemplateTag) - if templateTagExistsError != nil { - logger.WarnContext(serviceCtx, "error ensuring vm template exists", "err", templateTagExistsError) - return + logger.InfoContext(serviceCtx, "handling anka workflow run job") + metrics.UpdateService(workerCtx, serviceCtx, logger, metrics.Service{ + Status: "running", + }) + + skipPrep := false // allows us to wait for the cancellation we sent to be received so we can clean up properly + + // See if VM Template existing already + //TODO: be able to interrupt this + templateTagExistsError := ankaCLI.EnsureVMTemplateExists(workerCtx, serviceCtx, workflowJob.AnkaTemplate, workflowJob.AnkaTemplateTag) + if templateTagExistsError != nil { + logger.WarnContext(serviceCtx, "error ensuring vm template exists on host", "err", templateTagExistsError) + err := sendCancelWorkflowRun(serviceCtx, logger, workflowJob.RunID) + if err != nil { + logger.ErrorContext(serviceCtx, "error sending cancel workflow run", "err", err) } + skipPrep = true + } - logger.DebugContext(serviceCtx, "handling job") + if serviceCtx.Err() != nil { + logger.WarnContext(serviceCtx, "context canceled during vm template check") + return + } + + if !skipPrep { // Get runner registration token - serviceCtx, repoRunnerRegistration, response, err := ExecuteGitHubClientFunction[github.RegistrationToken](serviceCtx, logger, func() (*github.RegistrationToken, *github.Response, error) { + serviceCtx, repoRunnerRegistration, response, err := executeGitHubClientFunction[github.RegistrationToken](serviceCtx, logger, func() (*github.RegistrationToken, *github.Response, error) { repoRunnerRegistration, resp, err := githubClient.Actions.CreateRegistrationToken(context.Background(), service.Owner, service.Repo) return repoRunnerRegistration, resp, err }) @@ -452,10 +698,35 @@ func Run(workerCtx context.Context, serviceCtx context.Context, logger *slog.Log } // Obtain Anka VM (and name) - serviceCtx, vm, err := ankaCLI.ObtainAnkaVM(workerCtx, serviceCtx, workflowRunJob.AnkaTemplate) - defer ankaCLI.AnkaDelete(workerCtx, serviceCtx, vm) + serviceCtx, vm, err := ankaCLI.ObtainAnkaVM(workerCtx, serviceCtx, workflowJob.AnkaTemplate) + wrappedVM := map[string]interface{}{ + "type": "anka.VM", + "payload": vm, + } + wrappedVmJSON, wrappedVmErr := json.Marshal(wrappedVM) + if wrappedVmErr != nil { + logger.ErrorContext(serviceCtx, "error marshalling vm to json", "err", wrappedVmErr) + ankaCLI.AnkaDelete(workerCtx, serviceCtx, vm) + failureChannel <- true + return + } + dbErr := databaseContainer.Client.RPush(serviceCtx, "anklet/jobs/github/queued/"+service.Name, wrappedVmJSON).Err() + if dbErr != nil { + logger.ErrorContext(serviceCtx, "error pushing vm data to database", "err", dbErr) + failureChannel <- true + return + } if err != nil { + // this is thrown, for example, when there is no capacity on the host + // we must be sure to create the DB entry so cleanup happens properly logger.ErrorContext(serviceCtx, "error obtaining anka vm", "err", err) + failureChannel <- true + return + } + + if serviceCtx.Err() != nil { + logger.WarnContext(serviceCtx, "context canceled after ObtainAnkaVM") + failureChannel <- true return } @@ -469,82 +740,108 @@ func Run(workerCtx context.Context, serviceCtx context.Context, logger *slog.Log ) if err != nil { logger.ErrorContext(serviceCtx, "error executing anka copy", "err", err) + failureChannel <- true return } - if serviceCtx.Err() != nil { + select { + case <-completedJobChannel: + logger.InfoContext(serviceCtx, "completed job found before installing runner") + completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine + return + case <-serviceCtx.Done(): logger.WarnContext(serviceCtx, "context canceled before install runner") return + default: } + installRunnerErr := ankaCLI.AnkaRun(serviceCtx, "./install-runner.bash") if installRunnerErr != nil { logger.ErrorContext(serviceCtx, "error executing install-runner.bash", "err", installRunnerErr) + failureChannel <- true return } // Register runner - if serviceCtx.Err() != nil { + select { + case <-completedJobChannel: + logger.InfoContext(serviceCtx, "completed job found before registering runner") + completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine + return + case <-serviceCtx.Done(): logger.WarnContext(serviceCtx, "context canceled before register runner") return + default: } registerRunnerErr := ankaCLI.AnkaRun(serviceCtx, "./register-runner.bash", - vm.Name, *repoRunnerRegistration.Token, repositoryURL, strings.Join(workflowRunJob.Job.Labels, ","), + vm.Name, *repoRunnerRegistration.Token, repositoryURL, strings.Join(workflowJob.Labels, ","), ) if registerRunnerErr != nil { logger.ErrorContext(serviceCtx, "error executing register-runner.bash", "err", registerRunnerErr) + failureChannel <- true return } - defer removeSelfHostedRunner(serviceCtx, *vm, *workflowRunJob.Job.RunID) + defer removeSelfHostedRunner(serviceCtx, *vm, workflowJob.RunID) // Install and Start runner - if serviceCtx.Err() != nil { + select { + case <-completedJobChannel: + logger.InfoContext(serviceCtx, "completed job found before starting runner") + completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine + return + case <-serviceCtx.Done(): logger.WarnContext(serviceCtx, "context canceled before start runner") return + default: } startRunnerErr := ankaCLI.AnkaRun(serviceCtx, "./start-runner.bash") if startRunnerErr != nil { logger.ErrorContext(serviceCtx, "error executing start-runner.bash", "err", startRunnerErr) + failureChannel <- true return } - if serviceCtx.Err() != nil { + select { + case <-completedJobChannel: + logger.InfoContext(serviceCtx, "completed job found before jobCompleted checks") + completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine + return + case <-serviceCtx.Done(): logger.WarnContext(serviceCtx, "context canceled before jobCompleted checks") return + default: } - // Watch for job completion - jobCompleted := false - logCounter := 0 - for !jobCompleted { - if serviceCtx.Err() != nil { - logger.WarnContext(serviceCtx, "context canceled while watching for job completion") - break - } - serviceCtx, currentJob, response, err := ExecuteGitHubClientFunction[github.WorkflowJob](serviceCtx, logger, func() (*github.WorkflowJob, *github.Response, error) { - currentJob, resp, err := githubClient.Actions.GetWorkflowJobByID(context.Background(), service.Owner, service.Repo, *workflowRunJob.Job.ID) - return currentJob, resp, err - }) - if err != nil { - logger.ErrorContext(serviceCtx, "error executing githubClient.Actions.GetWorkflowJobByID", "err", err, "response", response) - return - } - if *currentJob.Status == "completed" { + } // skipPrep + + logger.InfoContext(serviceCtx, "watching for job completion") + + // Watch for job completion + jobCompleted := false + logCounter := 0 + for !jobCompleted { + select { + case completedJobEvent := <-completedJobChannel: + if *completedJobEvent.Action == "completed" { jobCompleted = true - serviceCtx = logging.AppendCtx(serviceCtx, slog.String("conclusion", *currentJob.Conclusion)) - logger.InfoContext(serviceCtx, "job completed", "job_id", *workflowRunJob.Job.ID) - if *currentJob.Conclusion == "success" { + serviceCtx = logging.AppendCtx(serviceCtx, slog.String("conclusion", *completedJobEvent.WorkflowJob.Conclusion)) + logger.InfoContext(serviceCtx, "job completed", + "job_id", completedJobEvent.WorkflowJob.ID, + "conclusion", *completedJobEvent.WorkflowJob.Conclusion, + ) + if *completedJobEvent.WorkflowJob.Conclusion == "success" { metricsData := metrics.GetMetricsDataFromContext(workerCtx) metricsData.IncrementTotalSuccessfulRunsSinceStart() metricsData.UpdateService(serviceCtx, logger, metrics.Service{ Name: service.Name, LastSuccessfulRun: time.Now(), - LastSuccessfulRunJobUrl: *workflowRunJob.Job.HTMLURL, + LastSuccessfulRunJobUrl: *completedJobEvent.WorkflowJob.URL, }) - } else if *currentJob.Conclusion == "failure" { + } else if *completedJobEvent.WorkflowJob.Conclusion == "failure" { metricsData := metrics.GetMetricsDataFromContext(workerCtx) metricsData.IncrementTotalFailedRunsSinceStart() metricsData.UpdateService(serviceCtx, logger, metrics.Service{ Name: service.Name, LastFailedRun: time.Now(), - LastFailedRunJobUrl: *workflowRunJob.Job.HTMLURL, + LastFailedRunJobUrl: *completedJobEvent.WorkflowJob.URL, }) } } else if logCounter%2 == 0 { @@ -552,15 +849,57 @@ func Run(workerCtx context.Context, serviceCtx context.Context, logger *slog.Log logger.WarnContext(serviceCtx, "context canceled during job status check") return } - logger.InfoContext(serviceCtx, "job still in progress", "job_id", *workflowRunJob.Job.ID) - time.Sleep(5 * time.Second) // Wait before checking the job status again + } + jobCompleted = true + completedJobChannel <- github.WorkflowJobEvent{} // so cleanup can also see it as completed + return + case <-serviceCtx.Done(): + logger.WarnContext(serviceCtx, "context canceled while watching for job completion") + return + default: + time.Sleep(10 * time.Second) + if logCounter%2 == 0 { + logger.InfoContext(serviceCtx, "job still in progress", "job_id", workflowJob.JobID) } logCounter++ } - // Important return! - // only handle a single job for this service, then return so we get fresh context - // If we don't, we can pick up a job that has already run on another host but is still in the list of jobs we queried at the beginning - return + // serviceCtx, currentJob, response, err := ExecuteGitHubClientFunction[github.WorkflowJob](serviceCtx, logger, func() (*github.WorkflowJob, *github.Response, error) { + // currentJob, resp, err := githubClient.Actions.GetWorkflowJobByID(context.Background(), service.Owner, service.Repo, workflowRunJob.JobID) + // return currentJob, resp, err + // }) + // if err != nil { + // logger.ErrorContext(serviceCtx, "error executing githubClient.Actions.GetWorkflowJobByID", "err", err, "response", response) + // return + // } + // if *currentJob.Status == "completed" { + // jobCompleted = true + // serviceCtx = logging.AppendCtx(serviceCtx, slog.String("conclusion", *currentJob.Conclusion)) + // logger.InfoContext(serviceCtx, "job completed", "job_id", workflowRunJob.JobID) + // if *currentJob.Conclusion == "success" { + // metricsData := metrics.GetMetricsDataFromContext(workerCtx) + // metricsData.IncrementTotalSuccessfulRunsSinceStart() + // metricsData.UpdateService(serviceCtx, logger, metrics.Service{ + // Name: service.Name, + // LastSuccessfulRun: time.Now(), + // LastSuccessfulRunJobUrl: workflowRunJob.JobURL, + // }) + // } else if *currentJob.Conclusion == "failure" { + // metricsData := metrics.GetMetricsDataFromContext(workerCtx) + // metricsData.IncrementTotalFailedRunsSinceStart() + // metricsData.UpdateService(serviceCtx, logger, metrics.Service{ + // Name: service.Name, + // LastFailedRun: time.Now(), + // LastFailedRunJobUrl: workflowRunJob.JobURL, + // }) + // } + // } else if logCounter%2 == 0 { + // if serviceCtx.Err() != nil { + // logger.WarnContext(serviceCtx, "context canceled during job status check") + // return + // } + // logger.InfoContext(serviceCtx, "job still in progress", "job_id", workflowRunJob.JobID) + // time.Sleep(5 * time.Second) // Wait before checking the job status again + // } } } @@ -570,8 +909,8 @@ func removeSelfHostedRunner(serviceCtx context.Context, vm anka.VM, workflowRunI logger := logging.GetLoggerFromContext(serviceCtx) service := config.GetServiceFromContext(serviceCtx) githubClient := internalGithub.GetGitHubClientFromContext(serviceCtx) - serviceCtx, runnersList, response, err := ExecuteGitHubClientFunction[github.Runners](serviceCtx, logger, func() (*github.Runners, *github.Response, error) { - runnersList, resp, err := githubClient.Actions.ListRunners(context.Background(), service.Owner, service.Repo, &github.ListOptions{}) + serviceCtx, runnersList, response, err := executeGitHubClientFunction[github.Runners](serviceCtx, logger, func() (*github.Runners, *github.Response, error) { + runnersList, resp, err := githubClient.Actions.ListRunners(context.Background(), service.Owner, service.Repo, &github.ListRunnersOptions{}) return runnersList, resp, err }) if err != nil { @@ -592,36 +931,12 @@ func removeSelfHostedRunner(serviceCtx context.Context, vm anka.VM, workflowRunI "ankaTemplateTag": "(using latest)", "err": "DELETE https://api.github.com/repos/veertuinc/anklet/actions/runners/142: 422 Bad request - Runner \"anklet-vm-\u003cuuid\u003e\" is still running a job\" []", */ - cancelSent := false - for { - serviceCtx, workflowRun, _, err := ExecuteGitHubClientFunction[github.WorkflowRun](serviceCtx, logger, func() (*github.WorkflowRun, *github.Response, error) { - workflowRun, resp, err := githubClient.Actions.GetWorkflowRunByID(context.Background(), service.Owner, service.Repo, workflowRunID) - return workflowRun, resp, err - }) - if err != nil { - logger.ErrorContext(serviceCtx, "error getting workflow run by ID", "err", err) - return - } - if *workflowRun.Status == "completed" || (workflowRun.Conclusion != nil && *workflowRun.Conclusion == "cancelled") { - break - } else { - logger.WarnContext(serviceCtx, "workflow run is still active... waiting for cancellation so we can clean up the runner...", "workflow_run_id", workflowRunID) - if !cancelSent { // this has to happen here so that it doesn't error with "409 Cannot cancel a workflow run that is completed. " if the job is already cancelled - serviceCtx, cancelResponse, _, cancelErr := ExecuteGitHubClientFunction[github.Response](serviceCtx, logger, func() (*github.Response, *github.Response, error) { - resp, err := githubClient.Actions.CancelWorkflowRunByID(context.Background(), service.Owner, service.Repo, workflowRunID) - return resp, nil, err - }) - // don't use cancelResponse.Response.StatusCode or else it'll error with SIGSEV - if cancelErr != nil && !strings.Contains(cancelErr.Error(), "try again later") { - logger.ErrorContext(serviceCtx, "error executing githubClient.Actions.CancelWorkflowRunByID", "err", cancelErr, "response", cancelResponse) - break - } - cancelSent = true - } - time.Sleep(10 * time.Second) - } + err := sendCancelWorkflowRun(serviceCtx, logger, workflowRunID) + if err != nil { + logger.ErrorContext(serviceCtx, "error sending cancel workflow run", "err", err) + return } - serviceCtx, _, _, err = ExecuteGitHubClientFunction[github.Response](serviceCtx, logger, func() (*github.Response, *github.Response, error) { + serviceCtx, _, _, err = executeGitHubClientFunction[github.Response](serviceCtx, logger, func() (*github.Response, *github.Response, error) { response, err := githubClient.Actions.RemoveRunner(context.Background(), service.Owner, service.Repo, *runner.ID) return response, nil, err }) diff --git a/plugins/github/install-runner.bash b/plugins/github/install-runner.bash index 6b1ae42..3536c51 100755 --- a/plugins/github/install-runner.bash +++ b/plugins/github/install-runner.bash @@ -3,7 +3,7 @@ set -exo pipefail RUNNER_HOME="${RUNNER_HOME:-"$HOME/actions-runner"}" mkdir -p "${RUNNER_HOME}" cd "${RUNNER_HOME}" -GITHUB_RUNNER_VERSION="2.316.0" +GITHUB_RUNNER_VERSION="2.319.0" RUNNER_ARCH=${RUNNER_ARCH:-"$(uname -m)"} if [ "$RUNNER_ARCH" = "x86_64" ] || [ "$RUNNER_ARCH" = "x64" ]; then RUNNER_ARCH="x64"