diff --git a/examples/pipelines/contextful/data/pathway-docs.jsonl b/examples/pipelines/contextful/data/pathway-docs.jsonl index 8aadede..929737d 100644 --- a/examples/pipelines/contextful/data/pathway-docs.jsonl +++ b/examples/pipelines/contextful/data/pathway-docs.jsonl @@ -382,7 +382,7 @@ {"doc": "---\ntitle: Pathway API\nsidebar: 'API'\nnavigation: true\n---\n# Pathway API\nReference for all the Pathway classes and functions.\nSee Table API for the main Table class.\nclass pw.AsyncTransformer(input_table)\nAllows to perform async transformations on a table.\n`invoke()` will be called asynchronously for each row of an input_table.\nOutput table can be acccesed via `result`.\nExample:\n```python\npw.debug.compute_and_print(t2, include_id=False)\n```\n::\nResult\n```\ncolA | colB\n1 | 5\n2 | 9\n```\n::\n::\n"} {"doc": "---\ntitle: Pathway API\nsidebar: 'API'\nnavigation: true\n---\n# Pathway API\nReference for all the Pathway classes and functions.\nSee Table API for the main Table class.\nclass pw.AsyncTransformer(input_table)\nAllows to perform async transformations on a table.\n`invoke()` will be called asynchronously for each row of an input_table.\nOutput table can be acccesed via `result`.\nExample:\n```python\nt3 = t2.select(colB = pw.unwrap(t2.colB))\nt3.schema\n```\n::\nResult\n```\n}>\n```\n::\n::\n"} {"doc": "---\ntitle: Pathway API\nsidebar: 'API'\nnavigation: true\n---\n# Pathway API\nReference for all the Pathway classes and functions.\nSee Table API for the main Table class.\nclass pw.AsyncTransformer(input_table)\nAllows to perform async transformations on a table.\n`invoke()` will be called asynchronously for each row of an input_table.\nOutput table can be acccesed via `result`.\nExample:\n```python\npw.debug.compute_and_print(t3, include_id=False)\n```\n::\nResult\n```\ncolB\n5\n9\n```\n::\n::\n"} -{"doc": "---\ntitle: pathway.io.http package\nsidebar: 'API'\nnavigation: false\n---\n# pathway.io.http package\nclass pw.io.http.RetryPolicy(first_delay_ms, backoff_factor, jitter_ms)\nClass representing policy of delays or backoffs for the retries.\nFunctions\npw.io.http.read(url, *, schema=None, method='GET', payload=None, headers=None, response_mapper=None, format='json', delimiter=None, n_retries=0, retry_policy=, connect_timeout_ms=None, request_timeout_ms=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504), autocommit_duration_ms=10000, debug_data=None, value_columns=None, primary_key=None, types=None, default_values=None)\nReads a table from an HTTP stream.\n* Parameters\n * url (`str`) \u2013 the full URL of streaming endpoint to fetch data from.\n * schema (`Optional`\\`type`\\[[`Schema`\\]\\]) \u2013 Schema of the resulting table.\n * method (`str`) \u2013 request method for streaming. It should be one of\n HTTP request methods.\n * payload (`Optional`\\[`Any`\\]) \u2013 data to be send in the body of the request.\n * headers (`Optional`\\[`dict`\\[`str`, `str`\\]\\]) \u2013 request headers in the form of dict. Wildcards are allowed both, in\n keys and in values.\n * response_mapper (`Optional`\\[`Callable`\\[\\[`str` | `bytes`\\], `bytes`\\]\\]) \u2013 in case a response needs to be processed, this method can be\n provided. It will be applied to each slice of a stream.\n * format (`str`) \u2013 format of the data, \u201cjson\u201d or \u201craw\u201d. In case of a \u201craw\u201d format,\n table with single \u201cdata\u201d column will be produced. For \u201cjson\u201d format, bytes\n encoded json is expected.\n * delimiter (`UnionType`\\[`str`, `bytes`, `None`\\]) \u2013 delimiter used to split stream into messages.\n * n_retries (`int`) \u2013 how many times to retry the failed request.\n * retry_policy (`RetryPolicy`) \u2013 policy of delays or backoffs for the retries.\n * connect_timeout_ms (`Optional`\\[`int`\\]) \u2013 connection timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on connection duration will be applied.\n * request_timeout_ms (`Optional`\\[`int`\\]) \u2013 request timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on request duration will be applied.\n * allow_redirects (`bool`) \u2013 whether to allow redirects.\n * retry_codes (`Optional`\\[`tuple`\\]) \u2013 HTTP status codes that trigger retries.\n * content_type \u2013 content type of the data to send. In case the chosen format is\n JSON, it will be defaulted to \u201capplication/json\u201d.\n * autocommit_duration_ms (`int`) \u2013 the maximum time between two commits. Every\n autocommit_duration_ms milliseconds, the updates received by the connector are\n committed and pushed into Pathway\u2019s computation graph.\n * debug_data \u2013 static data replacing original one when debug mode is active.\n * value_columns (`Optional`\\[`list`\\[`str`\\]\\]) \u2013 columns to extract for a table. \\[will be deprecated soon\\]\n * primary_key (`Optional`\\[`list`\\[`str`\\]\\]) \u2013 in case the table should have a primary key generated according to\n a subset of its columns, the set of columns should be specified in this field.\n Otherwise, the primary key will be generated as uuid4. \\[will be deprecated soon\\]\n * types (`Optional`\\[`dict`\\[`str`, `PathwayType`\\]\\]) \u2013 dictionary containing the mapping between the columns and the data types\n (`pw.Type`) of the values of those columns. This parameter is optional, and\n if not provided the default type is `pw.Type.ANY`. \\[will be deprecated soon\\]\n * default_values (`Optional`\\[`dict`\\[`str`, `Any`\\]\\]) \u2013 dictionary containing default values for columns replacing\n blank entries. The default value of the column must be specified explicitly,\n otherwise there will be no default value. \\[will be deprecated soon\\]\nExamples:\nRaw format:\n```python\nimport os\nimport pathway as pw\ntable = pw.io.http.read(\n \"https://localhost:8000/stream\",\n method=\"GET\",\n headers={\"Authorization\": f\"Bearer {os.environ['BEARER_TOKEN']}\"},\n format=\"raw\",\n)\n```\nJSON with response mapper:\nInput can be adjusted using a mapping function that will be applied to each\nslice of a stream. The mapping function should return bytes.\n```python\ndef mapper(msg: bytes) -> bytes:\n result = json.loads(msg.decode())\n return json.dumps({\"key\": result[\"id\"], \"text\": result[\"data\"]}).encode()\nclass InputSchema(pw.Schema):\n key: int\n text: str\nt = pw.io.http.read(\n \"https://localhost:8000/stream\",\n method=\"GET\",\n headers={\"Authorization\": f\"Bearer {os.environ['BEARER_TOKEN']}\"},\n schema=InputSchema,\n response_mapper=mapper\n)\n```\npw.io.http.rest_connector(host, port, *, route='/', schema=None, autocommit_duration_ms=1500, keep_queries=None, delete_completed_queries=None)\nRuns a lightweight HTTP server and inputs a collection from the HTTP endpoint,\nconfigured by the parameters of this method.\nOn the output, the method provides a table and a callable, which needs to accept\nthe result table of the computation, which entries will be tracked and put into\nrespective request\u2019s responses.\n* Parameters\n * host (`str`) \u2013 TCP/IP host or a sequence of hosts for the created endpoint;\n * port (`int`) \u2013 port for the created endpoint;\n * route (`str`) \u2013 route which will be listened to by the web server;\n * schema (`Optional`\\`type`\\[[`Schema`\\]\\]) \u2013 schema of the resulting table;\n * autocommit_duration_ms \u2013 the maximum time between two commits. Every\n autocommit_duration_ms milliseconds, the updates received by the connector are\n committed and pushed into Pathway\u2019s computation graph;\n * keep_queries (`Optional`\\[`bool`\\]) \u2013 whether to keep queries after processing; defaults to False. \\[deprecated\\]\n * delete_completed_queries (`Optional`\\[`bool`\\]) \u2013 whether to send a deletion entry after the query is processed.\n Allows to remove it from the system if it is stored by operators such as `join` or `groupby`;\n* Returns\n *table* \u2013 the table read;\n response_writer: a callable, where the result table should be provided.\npw.io.http.write(table, url, *, method='POST', format='json', request_payload_template=None, n_retries=0, retry_policy=, connect_timeout_ms=None, request_timeout_ms=None, content_type=None, headers=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504))\nSends the stream of updates from the table to the specified HTTP API.\n* Parameters\n * table (`Table`) \u2013 table to be tracked.\n * method (`str`) \u2013 request method for streaming. It should be one of\n HTTP request methods.\n * url (`str`) \u2013 the full URL of the endpoint to push data into. Can contain wildcards.\n * format (`str`) \u2013 the payload format, one of {\u201cjson\u201d, \u201ccustom\u201d}. If \u201cjson\u201d is\n specified, the plain JSON will be formed and sent. Otherwise, the contents of the\n field request_payload_template will be used.\n * request_payload_template (`Optional`\\[`str`\\]) \u2013 the template to format and send in case \u201ccustom\u201d was\n specified in the format field. Can include wildcards.\n * n_retries (`int`) \u2013 how many times to retry the failed request.\n * retry_policy (`RetryPolicy`) \u2013 policy of delays or backoffs for the retries.\n * connect_timeout_ms (`Optional`\\[`int`\\]) \u2013 connection timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on connection duration will be applied.\n * request_timeout_ms (`Optional`\\[`int`\\]) \u2013 request timeout, specified in milliseconds. In case it\u2019s\n None, no restrictions on request duration will be applied.\n * allow_redirects (`bool`) \u2013 Whether to allow redirects.\n * retry_codes (`Optional`\\[`tuple`\\]) \u2013 HTTP status codes that trigger retries.\n * content_type (`Optional`\\[`str`\\]) \u2013 content type of the data to send. In case the chosen format is\n JSON, it will be defaulted to \u201capplication/json\u201d.\n * headers (`Optional`\\[`dict`\\[`str`, `str`\\]\\]) \u2013 request headers in the form of dict. Wildcards are allowed both, in\n keys and in values.\nWildcards:\nWildcards are the proposed way to customize the HTTP requests composed. The\nengine will replace all entries of `{table.}` with a value from the\ncolumn `` in the row sent. This wildcard resolving will happen in url,\nrequest payload template and headers.\nExamples:\nFor the sake of demonstation, let\u2019s try different ways to send the stream of changes\non a table `pets`, containing data about pets and their owners. The table contains\njust two columns: the pet and the owner\u2019s name.\n```python\nimport pathway as pw\npets = pw.debug.table_from_markdown(\"owner pet \\n Alice dog \\n Bob cat \\n Alice cat\")\n```\nConsider that there is a need to send the stream of changes on such table to the\nexternal API endpoint (let\u2019s pick some exemplary URL for the sake of demonstation).\nTo keep things simple, we can suppose that this API accepts flat JSON objects, which\nare sent in POST requests. Then, the communication can be done with a simple code\nsnippet:\n```python\npw.io.http.write(pets, \"http://www.example.com/api/event\")\n```\nNow let\u2019s do something more custom. Suppose that the API endpoint requires us to\ncommunicate via PUT method and to pass the values as CGI-parameters. In this case,\nwildcards are the way to go:\n```python\npw.io.http.write(\n pets,\n \"http://www.example.com/api/event?owner={table.owner}&pet={table.pet}\",\n method=\"PUT\"\n)\n```\nA custom payload can also be formed from the outside. What if the endpoint requires\nthe data in tskv format in request body?\nFirst of all, let\u2019s form a template for the message body:\n```python\nmessage_template_tokens = [\n \"owner={table.owner}\",\n \"pet={table.pet}\",\n \"time={table.time}\",\n \"diff={table.diff}\",\n]\nmessage_template = \"\\t\".join(message_template_tokens)\n```\nNow, we can use this template and the custom format, this way:\n```python\npw.io.http.write(\n pets,\n \"http://www.example.com/api/event\",\n method=\"POST\",\n format=\"custom\",\n request_payload_template=message_template\n)\n```\n"} +{"doc": "---\ntitle: pathway.io.http package\nsidebar: 'API'\nnavigation: false\n---\n# pathway.io.http package\nclass pw.io.http.RetryPolicy(first_delay_ms, backoff_factor, jitter_ms)\nClass representing policy of delays or backoffs for the retries.\nFunctions\npw.io.http.read(url, *, schema=None, method='GET', payload=None, headers=None, response_mapper=None, format='json', delimiter=None, n_retries=0, retry_policy=, connect_timeout_ms=None, request_timeout_ms=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504), autocommit_duration_ms=10000, debug_data=None, value_columns=None, primary_key=None, types=None, default_values=None)\nReads a table from an HTTP stream.\n* Parameters\n * url (`str`) \u2013 the full URL of streaming endpoint to fetch data from.\n * schema (`Optional`\\`type`\\[[`Schema`\\]\\]) \u2013 Schema of the resulting table.\n * method (`str`) \u2013 request method for streaming. It should be one of\n HTTP request methods.\n * payload (`Optional`\\[`Any`\\]) \u2013 data to be send in the body of the request.\n * headers (`Optional`\\[`dict`\\[`str`, `str`\\]\\]) \u2013 request headers in the form of dict. Wildcards are allowed both, in\n keys and in values.\n * response_mapper (`Optional`\\[`Callable`\\[\\[`str` | `bytes`\\], `bytes`\\]\\]) \u2013 in case a response needs to be processed, this method can be\n provided. It will be applied to each slice of a stream.\n * format (`str`) \u2013 format of the data, \u201cjson\u201d or \u201craw\u201d. In case of a \u201craw\u201d format,\n table with single \u201cdata\u201d column will be produced. For \u201cjson\u201d format, bytes\n encoded json is expected.\n * delimiter (`UnionType`\\[`str`, `bytes`, `None`\\]) \u2013 delimiter used to split stream into messages.\n * n_retries (`int`) \u2013 how many times to retry the failed request.\n * retry_policy (`RetryPolicy`) \u2013 policy of delays or backoffs for the retries.\n * connect_timeout_ms (`Optional`\\[`int`\\]) \u2013 connection timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on connection duration will be applied.\n * request_timeout_ms (`Optional`\\[`int`\\]) \u2013 request timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on request duration will be applied.\n * allow_redirects (`bool`) \u2013 whether to allow redirects.\n * retry_codes (`Optional`\\[`tuple`\\]) \u2013 HTTP status codes that trigger retries.\n * content_type \u2013 content type of the data to send. In case the chosen format is\n JSON, it will be defaulted to \u201capplication/json\u201d.\n * autocommit_duration_ms (`int`) \u2013 the maximum time between two commits. Every\n autocommit_duration_ms milliseconds, the updates received by the connector are\n committed and pushed into Pathway\u2019s computation graph.\n * debug_data \u2013 static data replacing original one when debug mode is active.\n * value_columns (`Optional`\\[`list`\\[`str`\\]\\]) \u2013 columns to extract for a table. \\[will be deprecated soon\\]\n * primary_key (`Optional`\\[`list`\\[`str`\\]\\]) \u2013 in case the table should have a primary key generated according to\n a subset of its columns, the set of columns should be specified in this field.\n Otherwise, the primary key will be generated as uuid4. \\[will be deprecated soon\\]\n * types (`Optional`\\[`dict`\\[`str`, `PathwayType`\\]\\]) \u2013 dictionary containing the mapping between the columns and the data types\n (`pw.Type`) of the values of those columns. This parameter is optional, and\n if not provided the default type is `pw.Type.ANY`. \\[will be deprecated soon\\]\n * default_values (`Optional`\\[`dict`\\[`str`, `Any`\\]\\]) \u2013 dictionary containing default values for columns replacing\n blank entries. The default value of the column must be specified explicitly,\n otherwise there will be no default value. \\[will be deprecated soon\\]\nExamples:\nRaw format:\n```python\nimport os\nimport pathway as pw\ntable = pw.io.http.read(\n \"https://localhost:8000/stream\",\n method=\"GET\",\n headers={\"Authorization\": f\"Bearer {os.environ['BEARER_TOKEN']}\"},\n format=\"raw\",\n)\n```\nJSON with response mapper:\nInput can be adjusted using a mapping function that will be applied to each\nslice of a stream. The mapping function should return bytes.\n```python\ndef mapper(msg: bytes) -> bytes:\n result = json.loads(msg.decode())\n return json.dumps({\"key\": result[\"id\"], \"text\": result[\"data\"]}).encode()\nclass InputSchema(pw.Schema):\n key: int\n text: str\nt = pw.io.http.read(\n \"https://localhost:8000/stream\",\n method=\"GET\",\n headers={\"Authorization\": f\"Bearer {os.environ['BEARER_TOKEN']}\"},\n schema=InputSchema,\n response_mapper=mapper\n)\n```\npw.io.http.rest_connector(host, port, *, route='/', schema=None, autocommit_duration_ms=1500, keep_queries=None, delete_completed_queries=None)\nRuns a lightweight HTTP server and inputs a collection from the HTTP endpoint,\nconfigured by the parameters of this method.\nOn the output, the method provides a table and a callable, which needs to accept\nthe result table of the computation, which entries will be tracked and put into\nrespective request\u2019s responses.\n* Parameters\n * host (`str`) \u2013 TCP/IP host or a sequence of hosts for the created endpoint;\n * port (`int`) \u2013 port for the created endpoint;\n * route (`str`) \u2013 route which will be listened to by the web server;\n * schema (`Optional`\\`type`\\[[`Schema`\\]\\]) \u2013 schema of the resulting table;\n * autocommit_duration_ms \u2013 the maximum time between two commits. Every\n autocommit_duration_ms milliseconds, the updates received by the connector are\n committed and pushed into Pathway\u2019s computation graph;\n * keep_queries (`Optional`\\[`bool`\\]) \u2013 whether to keep queries after processing; defaults to False. \\[deprecated\\]\n * delete_completed_queries (`Optional`\\[`bool`\\]) \u2013 whether to send a deletion entry after the query is processed.\n Allows to remove it from the system if it is stored by operators such as `join` or `groupby`;\n* Returns\n *table* \u2013 the table read;\n response_writer: a callable, where the result table should be provided.\npw.io.http.write(table, url, *, method='POST', format='json', request_payload_template=None, n_retries=0, retry_policy=, connect_timeout_ms=None, request_timeout_ms=None, content_type=None, headers=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504))\nSends the stream of updates from the table to the specified HTTP API.\n* Parameters\n * table (`Table`) \u2013 table to be tracked.\n * method (`str`) \u2013 request method for streaming. It should be one of\n HTTP request methods.\n * url (`str`) \u2013 the full URL of the endpoint to push data into. Can contain wildcards.\n * format (`str`) \u2013 the payload format, one of {\u201cjson\u201d, \u201ccustom\u201d}. If \u201cjson\u201d is\n specified, the plain JSON will be formed and sent. Otherwise, the contents of the\n field request_payload_template will be used.\n * request_payload_template (`Optional`\\[`str`\\]) \u2013 the template to format and send in case \u201ccustom\u201d was\n specified in the format field. Can include wildcards.\n * n_retries (`int`) \u2013 how many times to retry the failed request.\n * retry_policy (`RetryPolicy`) \u2013 policy of delays or backoffs for the retries.\n * connect_timeout_ms (`Optional`\\[`int`\\]) \u2013 connection timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on connection duration will be applied.\n * request_timeout_ms (`Optional`\\[`int`\\]) \u2013 request timeout, specified in milliseconds. In case it\u2019s\n None, no restrictions on request duration will be applied.\n * allow_redirects (`bool`) \u2013 Whether to allow redirects.\n * retry_codes (`Optional`\\[`tuple`\\]) \u2013 HTTP status codes that trigger retries.\n * content_type (`Optional`\\[`str`\\]) \u2013 content type of the data to send. In case the chosen format is\n JSON, it will be defaulted to \u201capplication/json\u201d.\n * headers (`Optional`\\[`dict`\\[`str`, `str`\\]\\]) \u2013 request headers in the form of dict. Wildcards are allowed both, in\n keys and in values.\nWildcards:\nWildcards are the proposed way to customize the HTTP requests composed. The\nengine will replace all entries of `{table.}` with a value from the\ncolumn `` in the row sent. This wildcard resolving will happen in url,\nrequest payload template and headers.\nExamples:\nFor the sake of demonstration, let\u2019s try different ways to send the stream of changes\non a table `pets`, containing data about pets and their owners. The table contains\njust two columns: the pet and the owner\u2019s name.\n```python\nimport pathway as pw\npets = pw.debug.table_from_markdown(\"owner pet \\n Alice dog \\n Bob cat \\n Alice cat\")\n```\nConsider that there is a need to send the stream of changes on such table to the\nexternal API endpoint (let\u2019s pick some exemplary URL for the sake of demonstration).\nTo keep things simple, we can suppose that this API accepts flat JSON objects, which\nare sent in POST requests. Then, the communication can be done with a simple code\nsnippet:\n```python\npw.io.http.write(pets, \"http://www.example.com/api/event\")\n```\nNow let\u2019s do something more custom. Suppose that the API endpoint requires us to\ncommunicate via PUT method and to pass the values as CGI-parameters. In this case,\nwildcards are the way to go:\n```python\npw.io.http.write(\n pets,\n \"http://www.example.com/api/event?owner={table.owner}&pet={table.pet}\",\n method=\"PUT\"\n)\n```\nA custom payload can also be formed from the outside. What if the endpoint requires\nthe data in tskv format in request body?\nFirst of all, let\u2019s form a template for the message body:\n```python\nmessage_template_tokens = [\n \"owner={table.owner}\",\n \"pet={table.pet}\",\n \"time={table.time}\",\n \"diff={table.diff}\",\n]\nmessage_template = \"\\t\".join(message_template_tokens)\n```\nNow, we can use this template and the custom format, this way:\n```python\npw.io.http.write(\n pets,\n \"http://www.example.com/api/event\",\n method=\"POST\",\n format=\"custom\",\n request_payload_template=message_template\n)\n```\n"} {"doc": "pathway.xpacks.spatial.h3 module\npw.xpacks.spatial.h3.h3_cover_geojson(geojson, h3_level)\nCovers geojson with H3 cells at the given level.\nBuilt-in h3.polyfill is not enough as it outputs H3 cells for which their centroids fall into geojson.\n"} {"doc": "pathway.xpacks.spatial.geofencing module\nclass pw.xpacks.spatial.geofencing.GeofenceIndex(data, geojson_geometry, resolution_meters, instance=None)\nH3-based geospatial index allowing for efficient point location inside geofences.\nGeofences are mapped to the corresponding cells id at a fixed hierarchy level.\nSee https://h3geo.org/docs/highlights/indexing/ for the description of H3 index structure.\nParameters:\ndata (pw.Table): The table containing the data to be indexed.\ngeometry (pw.ColumnExpression): The column expression representing geofences as geojsons.\nresolution_meters (float): approximately determines how large covering H3 cells should be\ninstance (pw.ColumnExpression or None): The column expression representing the instance of the index\n> allowing for creating multiple indexes at once.\nCaveats:\nGeofences crossing antimeridian are not yet supported.\njoin_enclosing_geofences(query_table, *, lat, lon, instance=None)\nEfficiently joins (via left_join) rows of query table with rows of indexed geofences\nfor which the query point is inside a target geofence.\nParameters:\nquery_table (pw.Table): The table containing the queries.\nlat (pw.ColumnExpression): The column expression representing latitudes (degrees) in the query_table.\nlon (pw.ColumnExpression): The column expression representing longitudes (degrees) in the query_table.\ninstance (pw.ColumnExpression or None): The column expression representing the instance of the index\n> allowing for parallel queries to multiple indexes at once.\n* Returns\n *pw.JoinResult* \u2013 result of a join between query_table and indexed data table\nExample:\nCode\n```python\nimport pathway as pw\nqueries = pw.debug.table_from_markdown('''\n | lon | lat | sample_data\n1 | 11.0 | 1.0 | foo\n2 | 11.0 | 21.0 | bar\n3 | 20.0 | 1.0 | baz\n''')\n@pw.udf\ndef json_parse(col: str) -> pw.Json:\n return pw.Json.parse(col)\ndata = pw.debug.table_from_markdown('''\n | other_data | geometry\n111 | AAA | {\"coordinates\":[[[10.0,0.0],[12.0,0.0],[12.0,2.0],[10.0,2.0]]],\"type\":\"Polygon\"}\n222 | BBB | {\"coordinates\":[[[10.0,20.0],[12.0,20.0],[12.0,22.0],[10.0,22.0]]],\"type\":\"Polygon\"}\n''').with_columns(geometry=json_parse(pw.this.geometry))\nindex = pw.xpacks.spatial.geofencing.GeofenceIndex(\n data, data.geometry, resolution_meters=100_000,\n)\nres = index.join_enclosing_geofences(\n queries,\n lat=queries.lat,\n lon=queries.lon,\n).select(\n queries.sample_data,\n pw.right.other_data,\n)\npw.debug.compute_and_print(res, include_id=False)\n```\n::\nResult\n```\nsample_data | other_data\nbar | BBB\nbaz |\nfoo | AAA\n```\n::\n::\nFunctions\npw.xpacks.spatial.geofencing.is_in_geofence(lat, lon, geojson_geometry)\nTest if point is inside a geojson polygon\n"} {"doc": "pathway.xpacks.spatial.index module\nclass pw.xpacks.spatial.index.H3Index(data, lat, lon, radius_meters, instance=None)\nH3-based geospatial index allowing for finding nearby lat lon points.\nLat lon points are mapped to the corresponding cell id at a fixed hierarchy level.\nThey are also mapped to the neighboring cells for fast closeby points retrieval.\nSee https://h3geo.org/docs/highlights/indexing/ for the description of H3 index structure.\nParameters:\ndata (pw.Table): The table containing the data to be indexed.\nlat (pw.ColumnExpression): The column expression representing latitudes (degrees) in the data.\nlon (pw.ColumnExpression): The column expression representing longitudes (degrees) in the data.\nradius_meters (float): maximum distance supported\ninstance (pw.ColumnExpression or None): The column expression representing the instance of the index\n> allowing for creating multiple indexes at once.\njoin_on_distance(query_table, query_lat, query_lon, distance_meters=None, instance=None)\nThis method efficiently joins (via left_join) rows of query table with rows of indexed data\nsuch that two points are within a certain distance.\nParameters:\nquery_table (pw.Table): The table containing the queries.\nlat (pw.ColumnExpression): The column expression representing latitudes (degrees) in the query_table.\nlon (pw.ColumnExpression): The column expression representing longitudes (degrees) in the query_table.\ninstance (pw.ColumnExpression or None): The column expression representing the instance of the index\n> allowing for parallel queries to multiple indexes at once.\n* Returns\n *pw.JoinResult* \u2013 result of a (distance-limited) join between query_table and indexed data table\nExample:\nCode\n```python\nimport pathway as pw\nqueries = pw.debug.table_from_markdown('''\n | instance | lat | lon | sample_data\n1 | 1 | 51.1000 | 17.0300 | foo\n2 | 1 | 51.1010 | 17.0310 | bar\n3 | 2 | 40.0000 | 179.999 | baz\n4 | 2 | 10.0000 | 10.0000 | zzz\n''')\ndata = pw.debug.table_from_markdown('''\n | instance | lat | lon | other_data\n111 | 1 | 51.0990 | 17.0290 | AAA\n112 | 1 | 51.1000 | 17.0300 | BBB\n113 | 1 | 51.1010 | 17.0310 | CCC\n114 | 1 | 51.1020 | 17.0320 | DDD\n311 | 2 | 40.0000 | 179.999 | EEE\n313 | 2 | 40.0000 | -179.999 | FFF\n314 | 2 | 40.0000 | -179.980 | GGG\n412 | 2 | 51.1000 | 17.0300 | HHH\n''')\nindex = pw.xpacks.spatial.index.H3Index(\n data, data.lat, data.lon, instance=data.instance, radius_meters=200,\n)\nres = index.join_on_distance(\n queries,\n queries.lat,\n queries.lon,\n instance=queries.instance,\n).select(\n instance=queries.instance,\n sample_data=queries.sample_data,\n other_data=pw.right.other_data,\n dist_meters=pw.left.dist_meters.num.fill_na(-1).num.round(1),\n)\npw.debug.compute_and_print(res, include_id=False)\n```\n::\nResult\n```\ninstance | sample_data | other_data | dist_meters\n1 | bar | BBB | 131.5\n1 | bar | CCC | 0.0\n1 | bar | DDD | 131.5\n1 | foo | AAA | 131.5\n1 | foo | BBB | 0.0\n1 | foo | CCC | 131.5\n2 | baz | EEE | 0.0\n2 | baz | FFF | 170.8\n2 | zzz | | -1.0\n```\n::\n::\n"} diff --git a/examples/pipelines/contextful_geometric/data/pathway-docs.jsonl b/examples/pipelines/contextful_geometric/data/pathway-docs.jsonl index 8aadede..929737d 100644 --- a/examples/pipelines/contextful_geometric/data/pathway-docs.jsonl +++ b/examples/pipelines/contextful_geometric/data/pathway-docs.jsonl @@ -382,7 +382,7 @@ {"doc": "---\ntitle: Pathway API\nsidebar: 'API'\nnavigation: true\n---\n# Pathway API\nReference for all the Pathway classes and functions.\nSee Table API for the main Table class.\nclass pw.AsyncTransformer(input_table)\nAllows to perform async transformations on a table.\n`invoke()` will be called asynchronously for each row of an input_table.\nOutput table can be acccesed via `result`.\nExample:\n```python\npw.debug.compute_and_print(t2, include_id=False)\n```\n::\nResult\n```\ncolA | colB\n1 | 5\n2 | 9\n```\n::\n::\n"} {"doc": "---\ntitle: Pathway API\nsidebar: 'API'\nnavigation: true\n---\n# Pathway API\nReference for all the Pathway classes and functions.\nSee Table API for the main Table class.\nclass pw.AsyncTransformer(input_table)\nAllows to perform async transformations on a table.\n`invoke()` will be called asynchronously for each row of an input_table.\nOutput table can be acccesed via `result`.\nExample:\n```python\nt3 = t2.select(colB = pw.unwrap(t2.colB))\nt3.schema\n```\n::\nResult\n```\n}>\n```\n::\n::\n"} {"doc": "---\ntitle: Pathway API\nsidebar: 'API'\nnavigation: true\n---\n# Pathway API\nReference for all the Pathway classes and functions.\nSee Table API for the main Table class.\nclass pw.AsyncTransformer(input_table)\nAllows to perform async transformations on a table.\n`invoke()` will be called asynchronously for each row of an input_table.\nOutput table can be acccesed via `result`.\nExample:\n```python\npw.debug.compute_and_print(t3, include_id=False)\n```\n::\nResult\n```\ncolB\n5\n9\n```\n::\n::\n"} -{"doc": "---\ntitle: pathway.io.http package\nsidebar: 'API'\nnavigation: false\n---\n# pathway.io.http package\nclass pw.io.http.RetryPolicy(first_delay_ms, backoff_factor, jitter_ms)\nClass representing policy of delays or backoffs for the retries.\nFunctions\npw.io.http.read(url, *, schema=None, method='GET', payload=None, headers=None, response_mapper=None, format='json', delimiter=None, n_retries=0, retry_policy=, connect_timeout_ms=None, request_timeout_ms=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504), autocommit_duration_ms=10000, debug_data=None, value_columns=None, primary_key=None, types=None, default_values=None)\nReads a table from an HTTP stream.\n* Parameters\n * url (`str`) \u2013 the full URL of streaming endpoint to fetch data from.\n * schema (`Optional`\\`type`\\[[`Schema`\\]\\]) \u2013 Schema of the resulting table.\n * method (`str`) \u2013 request method for streaming. It should be one of\n HTTP request methods.\n * payload (`Optional`\\[`Any`\\]) \u2013 data to be send in the body of the request.\n * headers (`Optional`\\[`dict`\\[`str`, `str`\\]\\]) \u2013 request headers in the form of dict. Wildcards are allowed both, in\n keys and in values.\n * response_mapper (`Optional`\\[`Callable`\\[\\[`str` | `bytes`\\], `bytes`\\]\\]) \u2013 in case a response needs to be processed, this method can be\n provided. It will be applied to each slice of a stream.\n * format (`str`) \u2013 format of the data, \u201cjson\u201d or \u201craw\u201d. In case of a \u201craw\u201d format,\n table with single \u201cdata\u201d column will be produced. For \u201cjson\u201d format, bytes\n encoded json is expected.\n * delimiter (`UnionType`\\[`str`, `bytes`, `None`\\]) \u2013 delimiter used to split stream into messages.\n * n_retries (`int`) \u2013 how many times to retry the failed request.\n * retry_policy (`RetryPolicy`) \u2013 policy of delays or backoffs for the retries.\n * connect_timeout_ms (`Optional`\\[`int`\\]) \u2013 connection timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on connection duration will be applied.\n * request_timeout_ms (`Optional`\\[`int`\\]) \u2013 request timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on request duration will be applied.\n * allow_redirects (`bool`) \u2013 whether to allow redirects.\n * retry_codes (`Optional`\\[`tuple`\\]) \u2013 HTTP status codes that trigger retries.\n * content_type \u2013 content type of the data to send. In case the chosen format is\n JSON, it will be defaulted to \u201capplication/json\u201d.\n * autocommit_duration_ms (`int`) \u2013 the maximum time between two commits. Every\n autocommit_duration_ms milliseconds, the updates received by the connector are\n committed and pushed into Pathway\u2019s computation graph.\n * debug_data \u2013 static data replacing original one when debug mode is active.\n * value_columns (`Optional`\\[`list`\\[`str`\\]\\]) \u2013 columns to extract for a table. \\[will be deprecated soon\\]\n * primary_key (`Optional`\\[`list`\\[`str`\\]\\]) \u2013 in case the table should have a primary key generated according to\n a subset of its columns, the set of columns should be specified in this field.\n Otherwise, the primary key will be generated as uuid4. \\[will be deprecated soon\\]\n * types (`Optional`\\[`dict`\\[`str`, `PathwayType`\\]\\]) \u2013 dictionary containing the mapping between the columns and the data types\n (`pw.Type`) of the values of those columns. This parameter is optional, and\n if not provided the default type is `pw.Type.ANY`. \\[will be deprecated soon\\]\n * default_values (`Optional`\\[`dict`\\[`str`, `Any`\\]\\]) \u2013 dictionary containing default values for columns replacing\n blank entries. The default value of the column must be specified explicitly,\n otherwise there will be no default value. \\[will be deprecated soon\\]\nExamples:\nRaw format:\n```python\nimport os\nimport pathway as pw\ntable = pw.io.http.read(\n \"https://localhost:8000/stream\",\n method=\"GET\",\n headers={\"Authorization\": f\"Bearer {os.environ['BEARER_TOKEN']}\"},\n format=\"raw\",\n)\n```\nJSON with response mapper:\nInput can be adjusted using a mapping function that will be applied to each\nslice of a stream. The mapping function should return bytes.\n```python\ndef mapper(msg: bytes) -> bytes:\n result = json.loads(msg.decode())\n return json.dumps({\"key\": result[\"id\"], \"text\": result[\"data\"]}).encode()\nclass InputSchema(pw.Schema):\n key: int\n text: str\nt = pw.io.http.read(\n \"https://localhost:8000/stream\",\n method=\"GET\",\n headers={\"Authorization\": f\"Bearer {os.environ['BEARER_TOKEN']}\"},\n schema=InputSchema,\n response_mapper=mapper\n)\n```\npw.io.http.rest_connector(host, port, *, route='/', schema=None, autocommit_duration_ms=1500, keep_queries=None, delete_completed_queries=None)\nRuns a lightweight HTTP server and inputs a collection from the HTTP endpoint,\nconfigured by the parameters of this method.\nOn the output, the method provides a table and a callable, which needs to accept\nthe result table of the computation, which entries will be tracked and put into\nrespective request\u2019s responses.\n* Parameters\n * host (`str`) \u2013 TCP/IP host or a sequence of hosts for the created endpoint;\n * port (`int`) \u2013 port for the created endpoint;\n * route (`str`) \u2013 route which will be listened to by the web server;\n * schema (`Optional`\\`type`\\[[`Schema`\\]\\]) \u2013 schema of the resulting table;\n * autocommit_duration_ms \u2013 the maximum time between two commits. Every\n autocommit_duration_ms milliseconds, the updates received by the connector are\n committed and pushed into Pathway\u2019s computation graph;\n * keep_queries (`Optional`\\[`bool`\\]) \u2013 whether to keep queries after processing; defaults to False. \\[deprecated\\]\n * delete_completed_queries (`Optional`\\[`bool`\\]) \u2013 whether to send a deletion entry after the query is processed.\n Allows to remove it from the system if it is stored by operators such as `join` or `groupby`;\n* Returns\n *table* \u2013 the table read;\n response_writer: a callable, where the result table should be provided.\npw.io.http.write(table, url, *, method='POST', format='json', request_payload_template=None, n_retries=0, retry_policy=, connect_timeout_ms=None, request_timeout_ms=None, content_type=None, headers=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504))\nSends the stream of updates from the table to the specified HTTP API.\n* Parameters\n * table (`Table`) \u2013 table to be tracked.\n * method (`str`) \u2013 request method for streaming. It should be one of\n HTTP request methods.\n * url (`str`) \u2013 the full URL of the endpoint to push data into. Can contain wildcards.\n * format (`str`) \u2013 the payload format, one of {\u201cjson\u201d, \u201ccustom\u201d}. If \u201cjson\u201d is\n specified, the plain JSON will be formed and sent. Otherwise, the contents of the\n field request_payload_template will be used.\n * request_payload_template (`Optional`\\[`str`\\]) \u2013 the template to format and send in case \u201ccustom\u201d was\n specified in the format field. Can include wildcards.\n * n_retries (`int`) \u2013 how many times to retry the failed request.\n * retry_policy (`RetryPolicy`) \u2013 policy of delays or backoffs for the retries.\n * connect_timeout_ms (`Optional`\\[`int`\\]) \u2013 connection timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on connection duration will be applied.\n * request_timeout_ms (`Optional`\\[`int`\\]) \u2013 request timeout, specified in milliseconds. In case it\u2019s\n None, no restrictions on request duration will be applied.\n * allow_redirects (`bool`) \u2013 Whether to allow redirects.\n * retry_codes (`Optional`\\[`tuple`\\]) \u2013 HTTP status codes that trigger retries.\n * content_type (`Optional`\\[`str`\\]) \u2013 content type of the data to send. In case the chosen format is\n JSON, it will be defaulted to \u201capplication/json\u201d.\n * headers (`Optional`\\[`dict`\\[`str`, `str`\\]\\]) \u2013 request headers in the form of dict. Wildcards are allowed both, in\n keys and in values.\nWildcards:\nWildcards are the proposed way to customize the HTTP requests composed. The\nengine will replace all entries of `{table.}` with a value from the\ncolumn `` in the row sent. This wildcard resolving will happen in url,\nrequest payload template and headers.\nExamples:\nFor the sake of demonstation, let\u2019s try different ways to send the stream of changes\non a table `pets`, containing data about pets and their owners. The table contains\njust two columns: the pet and the owner\u2019s name.\n```python\nimport pathway as pw\npets = pw.debug.table_from_markdown(\"owner pet \\n Alice dog \\n Bob cat \\n Alice cat\")\n```\nConsider that there is a need to send the stream of changes on such table to the\nexternal API endpoint (let\u2019s pick some exemplary URL for the sake of demonstation).\nTo keep things simple, we can suppose that this API accepts flat JSON objects, which\nare sent in POST requests. Then, the communication can be done with a simple code\nsnippet:\n```python\npw.io.http.write(pets, \"http://www.example.com/api/event\")\n```\nNow let\u2019s do something more custom. Suppose that the API endpoint requires us to\ncommunicate via PUT method and to pass the values as CGI-parameters. In this case,\nwildcards are the way to go:\n```python\npw.io.http.write(\n pets,\n \"http://www.example.com/api/event?owner={table.owner}&pet={table.pet}\",\n method=\"PUT\"\n)\n```\nA custom payload can also be formed from the outside. What if the endpoint requires\nthe data in tskv format in request body?\nFirst of all, let\u2019s form a template for the message body:\n```python\nmessage_template_tokens = [\n \"owner={table.owner}\",\n \"pet={table.pet}\",\n \"time={table.time}\",\n \"diff={table.diff}\",\n]\nmessage_template = \"\\t\".join(message_template_tokens)\n```\nNow, we can use this template and the custom format, this way:\n```python\npw.io.http.write(\n pets,\n \"http://www.example.com/api/event\",\n method=\"POST\",\n format=\"custom\",\n request_payload_template=message_template\n)\n```\n"} +{"doc": "---\ntitle: pathway.io.http package\nsidebar: 'API'\nnavigation: false\n---\n# pathway.io.http package\nclass pw.io.http.RetryPolicy(first_delay_ms, backoff_factor, jitter_ms)\nClass representing policy of delays or backoffs for the retries.\nFunctions\npw.io.http.read(url, *, schema=None, method='GET', payload=None, headers=None, response_mapper=None, format='json', delimiter=None, n_retries=0, retry_policy=, connect_timeout_ms=None, request_timeout_ms=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504), autocommit_duration_ms=10000, debug_data=None, value_columns=None, primary_key=None, types=None, default_values=None)\nReads a table from an HTTP stream.\n* Parameters\n * url (`str`) \u2013 the full URL of streaming endpoint to fetch data from.\n * schema (`Optional`\\`type`\\[[`Schema`\\]\\]) \u2013 Schema of the resulting table.\n * method (`str`) \u2013 request method for streaming. It should be one of\n HTTP request methods.\n * payload (`Optional`\\[`Any`\\]) \u2013 data to be send in the body of the request.\n * headers (`Optional`\\[`dict`\\[`str`, `str`\\]\\]) \u2013 request headers in the form of dict. Wildcards are allowed both, in\n keys and in values.\n * response_mapper (`Optional`\\[`Callable`\\[\\[`str` | `bytes`\\], `bytes`\\]\\]) \u2013 in case a response needs to be processed, this method can be\n provided. It will be applied to each slice of a stream.\n * format (`str`) \u2013 format of the data, \u201cjson\u201d or \u201craw\u201d. In case of a \u201craw\u201d format,\n table with single \u201cdata\u201d column will be produced. For \u201cjson\u201d format, bytes\n encoded json is expected.\n * delimiter (`UnionType`\\[`str`, `bytes`, `None`\\]) \u2013 delimiter used to split stream into messages.\n * n_retries (`int`) \u2013 how many times to retry the failed request.\n * retry_policy (`RetryPolicy`) \u2013 policy of delays or backoffs for the retries.\n * connect_timeout_ms (`Optional`\\[`int`\\]) \u2013 connection timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on connection duration will be applied.\n * request_timeout_ms (`Optional`\\[`int`\\]) \u2013 request timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on request duration will be applied.\n * allow_redirects (`bool`) \u2013 whether to allow redirects.\n * retry_codes (`Optional`\\[`tuple`\\]) \u2013 HTTP status codes that trigger retries.\n * content_type \u2013 content type of the data to send. In case the chosen format is\n JSON, it will be defaulted to \u201capplication/json\u201d.\n * autocommit_duration_ms (`int`) \u2013 the maximum time between two commits. Every\n autocommit_duration_ms milliseconds, the updates received by the connector are\n committed and pushed into Pathway\u2019s computation graph.\n * debug_data \u2013 static data replacing original one when debug mode is active.\n * value_columns (`Optional`\\[`list`\\[`str`\\]\\]) \u2013 columns to extract for a table. \\[will be deprecated soon\\]\n * primary_key (`Optional`\\[`list`\\[`str`\\]\\]) \u2013 in case the table should have a primary key generated according to\n a subset of its columns, the set of columns should be specified in this field.\n Otherwise, the primary key will be generated as uuid4. \\[will be deprecated soon\\]\n * types (`Optional`\\[`dict`\\[`str`, `PathwayType`\\]\\]) \u2013 dictionary containing the mapping between the columns and the data types\n (`pw.Type`) of the values of those columns. This parameter is optional, and\n if not provided the default type is `pw.Type.ANY`. \\[will be deprecated soon\\]\n * default_values (`Optional`\\[`dict`\\[`str`, `Any`\\]\\]) \u2013 dictionary containing default values for columns replacing\n blank entries. The default value of the column must be specified explicitly,\n otherwise there will be no default value. \\[will be deprecated soon\\]\nExamples:\nRaw format:\n```python\nimport os\nimport pathway as pw\ntable = pw.io.http.read(\n \"https://localhost:8000/stream\",\n method=\"GET\",\n headers={\"Authorization\": f\"Bearer {os.environ['BEARER_TOKEN']}\"},\n format=\"raw\",\n)\n```\nJSON with response mapper:\nInput can be adjusted using a mapping function that will be applied to each\nslice of a stream. The mapping function should return bytes.\n```python\ndef mapper(msg: bytes) -> bytes:\n result = json.loads(msg.decode())\n return json.dumps({\"key\": result[\"id\"], \"text\": result[\"data\"]}).encode()\nclass InputSchema(pw.Schema):\n key: int\n text: str\nt = pw.io.http.read(\n \"https://localhost:8000/stream\",\n method=\"GET\",\n headers={\"Authorization\": f\"Bearer {os.environ['BEARER_TOKEN']}\"},\n schema=InputSchema,\n response_mapper=mapper\n)\n```\npw.io.http.rest_connector(host, port, *, route='/', schema=None, autocommit_duration_ms=1500, keep_queries=None, delete_completed_queries=None)\nRuns a lightweight HTTP server and inputs a collection from the HTTP endpoint,\nconfigured by the parameters of this method.\nOn the output, the method provides a table and a callable, which needs to accept\nthe result table of the computation, which entries will be tracked and put into\nrespective request\u2019s responses.\n* Parameters\n * host (`str`) \u2013 TCP/IP host or a sequence of hosts for the created endpoint;\n * port (`int`) \u2013 port for the created endpoint;\n * route (`str`) \u2013 route which will be listened to by the web server;\n * schema (`Optional`\\`type`\\[[`Schema`\\]\\]) \u2013 schema of the resulting table;\n * autocommit_duration_ms \u2013 the maximum time between two commits. Every\n autocommit_duration_ms milliseconds, the updates received by the connector are\n committed and pushed into Pathway\u2019s computation graph;\n * keep_queries (`Optional`\\[`bool`\\]) \u2013 whether to keep queries after processing; defaults to False. \\[deprecated\\]\n * delete_completed_queries (`Optional`\\[`bool`\\]) \u2013 whether to send a deletion entry after the query is processed.\n Allows to remove it from the system if it is stored by operators such as `join` or `groupby`;\n* Returns\n *table* \u2013 the table read;\n response_writer: a callable, where the result table should be provided.\npw.io.http.write(table, url, *, method='POST', format='json', request_payload_template=None, n_retries=0, retry_policy=, connect_timeout_ms=None, request_timeout_ms=None, content_type=None, headers=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504))\nSends the stream of updates from the table to the specified HTTP API.\n* Parameters\n * table (`Table`) \u2013 table to be tracked.\n * method (`str`) \u2013 request method for streaming. It should be one of\n HTTP request methods.\n * url (`str`) \u2013 the full URL of the endpoint to push data into. Can contain wildcards.\n * format (`str`) \u2013 the payload format, one of {\u201cjson\u201d, \u201ccustom\u201d}. If \u201cjson\u201d is\n specified, the plain JSON will be formed and sent. Otherwise, the contents of the\n field request_payload_template will be used.\n * request_payload_template (`Optional`\\[`str`\\]) \u2013 the template to format and send in case \u201ccustom\u201d was\n specified in the format field. Can include wildcards.\n * n_retries (`int`) \u2013 how many times to retry the failed request.\n * retry_policy (`RetryPolicy`) \u2013 policy of delays or backoffs for the retries.\n * connect_timeout_ms (`Optional`\\[`int`\\]) \u2013 connection timeout, specified in milliseconds. In case\n it\u2019s None, no restrictions on connection duration will be applied.\n * request_timeout_ms (`Optional`\\[`int`\\]) \u2013 request timeout, specified in milliseconds. In case it\u2019s\n None, no restrictions on request duration will be applied.\n * allow_redirects (`bool`) \u2013 Whether to allow redirects.\n * retry_codes (`Optional`\\[`tuple`\\]) \u2013 HTTP status codes that trigger retries.\n * content_type (`Optional`\\[`str`\\]) \u2013 content type of the data to send. In case the chosen format is\n JSON, it will be defaulted to \u201capplication/json\u201d.\n * headers (`Optional`\\[`dict`\\[`str`, `str`\\]\\]) \u2013 request headers in the form of dict. Wildcards are allowed both, in\n keys and in values.\nWildcards:\nWildcards are the proposed way to customize the HTTP requests composed. The\nengine will replace all entries of `{table.}` with a value from the\ncolumn `` in the row sent. This wildcard resolving will happen in url,\nrequest payload template and headers.\nExamples:\nFor the sake of demonstration, let\u2019s try different ways to send the stream of changes\non a table `pets`, containing data about pets and their owners. The table contains\njust two columns: the pet and the owner\u2019s name.\n```python\nimport pathway as pw\npets = pw.debug.table_from_markdown(\"owner pet \\n Alice dog \\n Bob cat \\n Alice cat\")\n```\nConsider that there is a need to send the stream of changes on such table to the\nexternal API endpoint (let\u2019s pick some exemplary URL for the sake of demonstration).\nTo keep things simple, we can suppose that this API accepts flat JSON objects, which\nare sent in POST requests. Then, the communication can be done with a simple code\nsnippet:\n```python\npw.io.http.write(pets, \"http://www.example.com/api/event\")\n```\nNow let\u2019s do something more custom. Suppose that the API endpoint requires us to\ncommunicate via PUT method and to pass the values as CGI-parameters. In this case,\nwildcards are the way to go:\n```python\npw.io.http.write(\n pets,\n \"http://www.example.com/api/event?owner={table.owner}&pet={table.pet}\",\n method=\"PUT\"\n)\n```\nA custom payload can also be formed from the outside. What if the endpoint requires\nthe data in tskv format in request body?\nFirst of all, let\u2019s form a template for the message body:\n```python\nmessage_template_tokens = [\n \"owner={table.owner}\",\n \"pet={table.pet}\",\n \"time={table.time}\",\n \"diff={table.diff}\",\n]\nmessage_template = \"\\t\".join(message_template_tokens)\n```\nNow, we can use this template and the custom format, this way:\n```python\npw.io.http.write(\n pets,\n \"http://www.example.com/api/event\",\n method=\"POST\",\n format=\"custom\",\n request_payload_template=message_template\n)\n```\n"} {"doc": "pathway.xpacks.spatial.h3 module\npw.xpacks.spatial.h3.h3_cover_geojson(geojson, h3_level)\nCovers geojson with H3 cells at the given level.\nBuilt-in h3.polyfill is not enough as it outputs H3 cells for which their centroids fall into geojson.\n"} {"doc": "pathway.xpacks.spatial.geofencing module\nclass pw.xpacks.spatial.geofencing.GeofenceIndex(data, geojson_geometry, resolution_meters, instance=None)\nH3-based geospatial index allowing for efficient point location inside geofences.\nGeofences are mapped to the corresponding cells id at a fixed hierarchy level.\nSee https://h3geo.org/docs/highlights/indexing/ for the description of H3 index structure.\nParameters:\ndata (pw.Table): The table containing the data to be indexed.\ngeometry (pw.ColumnExpression): The column expression representing geofences as geojsons.\nresolution_meters (float): approximately determines how large covering H3 cells should be\ninstance (pw.ColumnExpression or None): The column expression representing the instance of the index\n> allowing for creating multiple indexes at once.\nCaveats:\nGeofences crossing antimeridian are not yet supported.\njoin_enclosing_geofences(query_table, *, lat, lon, instance=None)\nEfficiently joins (via left_join) rows of query table with rows of indexed geofences\nfor which the query point is inside a target geofence.\nParameters:\nquery_table (pw.Table): The table containing the queries.\nlat (pw.ColumnExpression): The column expression representing latitudes (degrees) in the query_table.\nlon (pw.ColumnExpression): The column expression representing longitudes (degrees) in the query_table.\ninstance (pw.ColumnExpression or None): The column expression representing the instance of the index\n> allowing for parallel queries to multiple indexes at once.\n* Returns\n *pw.JoinResult* \u2013 result of a join between query_table and indexed data table\nExample:\nCode\n```python\nimport pathway as pw\nqueries = pw.debug.table_from_markdown('''\n | lon | lat | sample_data\n1 | 11.0 | 1.0 | foo\n2 | 11.0 | 21.0 | bar\n3 | 20.0 | 1.0 | baz\n''')\n@pw.udf\ndef json_parse(col: str) -> pw.Json:\n return pw.Json.parse(col)\ndata = pw.debug.table_from_markdown('''\n | other_data | geometry\n111 | AAA | {\"coordinates\":[[[10.0,0.0],[12.0,0.0],[12.0,2.0],[10.0,2.0]]],\"type\":\"Polygon\"}\n222 | BBB | {\"coordinates\":[[[10.0,20.0],[12.0,20.0],[12.0,22.0],[10.0,22.0]]],\"type\":\"Polygon\"}\n''').with_columns(geometry=json_parse(pw.this.geometry))\nindex = pw.xpacks.spatial.geofencing.GeofenceIndex(\n data, data.geometry, resolution_meters=100_000,\n)\nres = index.join_enclosing_geofences(\n queries,\n lat=queries.lat,\n lon=queries.lon,\n).select(\n queries.sample_data,\n pw.right.other_data,\n)\npw.debug.compute_and_print(res, include_id=False)\n```\n::\nResult\n```\nsample_data | other_data\nbar | BBB\nbaz |\nfoo | AAA\n```\n::\n::\nFunctions\npw.xpacks.spatial.geofencing.is_in_geofence(lat, lon, geojson_geometry)\nTest if point is inside a geojson polygon\n"} {"doc": "pathway.xpacks.spatial.index module\nclass pw.xpacks.spatial.index.H3Index(data, lat, lon, radius_meters, instance=None)\nH3-based geospatial index allowing for finding nearby lat lon points.\nLat lon points are mapped to the corresponding cell id at a fixed hierarchy level.\nThey are also mapped to the neighboring cells for fast closeby points retrieval.\nSee https://h3geo.org/docs/highlights/indexing/ for the description of H3 index structure.\nParameters:\ndata (pw.Table): The table containing the data to be indexed.\nlat (pw.ColumnExpression): The column expression representing latitudes (degrees) in the data.\nlon (pw.ColumnExpression): The column expression representing longitudes (degrees) in the data.\nradius_meters (float): maximum distance supported\ninstance (pw.ColumnExpression or None): The column expression representing the instance of the index\n> allowing for creating multiple indexes at once.\njoin_on_distance(query_table, query_lat, query_lon, distance_meters=None, instance=None)\nThis method efficiently joins (via left_join) rows of query table with rows of indexed data\nsuch that two points are within a certain distance.\nParameters:\nquery_table (pw.Table): The table containing the queries.\nlat (pw.ColumnExpression): The column expression representing latitudes (degrees) in the query_table.\nlon (pw.ColumnExpression): The column expression representing longitudes (degrees) in the query_table.\ninstance (pw.ColumnExpression or None): The column expression representing the instance of the index\n> allowing for parallel queries to multiple indexes at once.\n* Returns\n *pw.JoinResult* \u2013 result of a (distance-limited) join between query_table and indexed data table\nExample:\nCode\n```python\nimport pathway as pw\nqueries = pw.debug.table_from_markdown('''\n | instance | lat | lon | sample_data\n1 | 1 | 51.1000 | 17.0300 | foo\n2 | 1 | 51.1010 | 17.0310 | bar\n3 | 2 | 40.0000 | 179.999 | baz\n4 | 2 | 10.0000 | 10.0000 | zzz\n''')\ndata = pw.debug.table_from_markdown('''\n | instance | lat | lon | other_data\n111 | 1 | 51.0990 | 17.0290 | AAA\n112 | 1 | 51.1000 | 17.0300 | BBB\n113 | 1 | 51.1010 | 17.0310 | CCC\n114 | 1 | 51.1020 | 17.0320 | DDD\n311 | 2 | 40.0000 | 179.999 | EEE\n313 | 2 | 40.0000 | -179.999 | FFF\n314 | 2 | 40.0000 | -179.980 | GGG\n412 | 2 | 51.1000 | 17.0300 | HHH\n''')\nindex = pw.xpacks.spatial.index.H3Index(\n data, data.lat, data.lon, instance=data.instance, radius_meters=200,\n)\nres = index.join_on_distance(\n queries,\n queries.lat,\n queries.lon,\n instance=queries.instance,\n).select(\n instance=queries.instance,\n sample_data=queries.sample_data,\n other_data=pw.right.other_data,\n dist_meters=pw.left.dist_meters.num.fill_na(-1).num.round(1),\n)\npw.debug.compute_and_print(res, include_id=False)\n```\n::\nResult\n```\ninstance | sample_data | other_data | dist_meters\n1 | bar | BBB | 131.5\n1 | bar | CCC | 0.0\n1 | bar | DDD | 131.5\n1 | foo | AAA | 131.5\n1 | foo | BBB | 0.0\n1 | foo | CCC | 131.5\n2 | baz | EEE | 0.0\n2 | baz | FFF | 170.8\n2 | zzz | | -1.0\n```\n::\n::\n"}