Skip to content

Latest commit

 

History

History

core

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

core

Ce plugin propose une série d'instructions natives. Elles sont constamment disponibles car chargées automatiquement.

installation

npm install @ezs/core

détails

Plusieurs instructions permettent de créer des sous-flux (sub pipeline), à partir d'un fichier d’instructions ou d'instructions imbriquées. Si elles s'utilisent toutes de la même manière (avec les mêmes paramètres) certaines peuvent apparaître comme similaires mais leur fonctionnement est différent :

  • [delegate] : 1 sous-flux pour tous les éléments
  • [swing] : 1 sous-flux pour tous les éléments filtrés selon une condition
  • [spawn] : 1 sous-flux par élément
  • [loop] : 1 sous-flux par élément
  • [expand] : 1 sous-flux pour N éléments (N = size), seul le champ sélectionné est envoyé dans le pipeline
  • [combine] : 1 sous-flux pour tous les éléments, seul le champ sélectionné est comparé avec le résultat du sous-flux
  • [singleton] : 1 sous-flux pour le premier élément

usage

Table of Contents

assign

Affecte une valeur à un champ de l'objet courant. Si le champ existe déjà, sa valeur est écrasée, sinon il est créé

Entrée:

[{
    "nom": "un",
    "valeur": 1
},
{
    "nom": "deux",
    "valeur": 2
},
{
    "nom": "trois",
    "valeur": 3
},
{
    "nom": "quatre",
    "valeur": 4
}]

Script:

[assign]
path = valeur
value = get("valeur").multiply(2)

Output:

[{
    "nom": "un",
    "valeur": 2
},
{
    "nom": "deux",
    "valeur": 4
},
{
    "nom": "trois",
    "valeur": 6
},
{
    "nom": "quatre",
    "valeur": 8
}]

Le path peut être le nom simple d'un champ présent à la racine de l'élément traité, ou un chemin en notation pointée, en utilisant une syntaxe proche de celle de la fonction get de Lodash.

Parameters

  • path String? chemin du champ à affecter
  • value String? valeur à affecter

Returns Object

combine

Takes an Object and substitute a field with the corresponding value found in a external pipeline the internal pipeline must produce a stream of special object (id, value)

[
          { year: 2000, dept: 54 },
          { year: 2001, dept: 55 },
          { year: 2003, dept: 54 },
]

Script:

[use]
plugin = analytics

[combine]
path = dept
file = ./departement.ini

Output:

 [
          { year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } },
          { year: 2001, dept: { id: 55, value: 'Meuse' } },
          { year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } },
 ]

Parameters

  • path String? the path to substitute
  • default String? value if no substitution (otherwise value stay unchanged)
  • primer String Data to send to the external pipeline (optional, default n/a)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors
  • cacheName String? Enable cache, with dedicated name

Returns Object

concat

Take all String, concat them and throw just one.

[
     "a",
     "b",
     "c"
]

Script:

[concat]
beginWith = <
joinWith = |
endWith = >

Output:

[
     "<a|b|c>"
]

Parameters

  • beginWith String? Add value at the begin
  • joinWith String? use value to join 2 chunk
  • endWith String? Add value at the end

Returns String

debug

Take Object, print it (with its index number), and throw the same object.

Parameters

  • level String DEBUG ezs level (depends of DEBUG env variable, see cli parameters) (optional, default info)
  • text String text before the dump (optional, default valueOf)
  • path String? path of field to print

Returns Object

dedupe

Take Object, and check that the object identifier has not already been used previously

Parameters

  • data
  • feed
  • path String path containing the object Identifier (optional, default uri)
  • ignore Boolean Just ignore duplicate object (optional, default false)

Returns Object

delegate

Break the stream if the control file cannot be checked

Parameters

  • fusible String? file to check

Returns Object

delegate

Delegate processing to an external pipeline.

Note: works like spawn, but each chunk share the same external pipeline.

Parameters

  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors

Returns Object

dispatch

Dispatch processing to an external pipeline on one or more servers.

Parameters

  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command

Returns Object

dump

Take all Objects and generate a JSON array

[
    { "a": 1 },
    { "a": 2 },
    { "a": 3 },
    { "a": 4 },
    { "a": 5 }
]

Script:

[dump]
indent = true

Output:

 [{
   "a": 1
  },
  {
   "a": 2
  },
  {
   "a": 3
  },
  {
   "a": 4
  },
  {
   "a": 5
  }
]

Parameters

  • indent boolean indent JSON (optional, default false)

Returns String

env

Crée une variable d'environnement globale à tout le script.

On l'utilise en général au début du script (après [use]).

Pour utiliser la variable, il faut employer la fonction env().

Entrée:

[{
    "nom": "un",
    "valeur": 1
},
{
    "nom": "deux",
    "valeur": 2
}]

Script:

[use]
plugin = basics

[env]
path = nom
value = NOM GÉNÉRIQUE

[JSONParse]

[assign]
path = nom
value = env("nom")

[dump]
indent = true

Sortie:

[{
    "nom": "NOM GÉNÉRIQUE",
    "valeur": 1
},
{
    "nom": "NOM GÉNÉRIQUE",
    "valeur": 2
}]

Parameters

  • path String? nom de la variable à créer
  • value String? valeur de la variable

Returns Object

exchange

Remplace tout un objet par un autre (au sens JSON).

Entrée:

[{
   "nom": "un",
   "valeur": 1
},
{
   "nom": "deux",
   "valeur": 2
}]

Script:

```ini
[use] plugin = basics

[JSONParse]

[exchange] value = get("nom")

[dump]

Sortie:

["un","deux"]

Ici, 'objet {"nom":"un","valeur":1} a été remplacé par l'« objet » (au sens JSON, une chaîne de caractères, tout autant qu'un nombre, constitue un objet) "un".

Note: assign ne permet pas de remplacer tout l'objet, mais seulement une de ses propriétés.

Entrée:

[{
   "a": "abcdefg", "b": "1234567", "c": "XXXXXXX"
},
{
   "a": "abcdefg", "b": "1234567", "c": "XXXXXXX"
}]

Script:

[exchange]
value = omit('c')

Output:

[{
   "a": "abcdefg",
   "b": "1234567"
},
{
   "a": "abcdefg",
   "b": "1234567"
}]

Ici, on a remplacé un objet avec trois propriétés par le même objet sans la propriété c (voir la function omit de Lodash).

Parameters

  • value String? la valeur de remplacement de l'objet courant

Returns Object

expand

Takes an Object and substitute a field with the corresponding value found in a external pipeline the internal pipeline receive a special object { id, value } id is the item identifier & value is the item path value The internal pipeline can expand value with another

[
          { year: 2000, dept: 54 },
          { year: 2001, dept: 55 },
          { year: 2003, dept: 54 },
]

Script:

[use]
plugin = analytics

[expand]
path = dept
file = ./departement.ini

Output:

 [
          { year: 2000, dept: { id: 54, value: 'Meurthe et moselle' } },
          { year: 2001, dept: { id: 55, value: 'Meuse' } },
          { year: 2003, dept: { id: 54, value: 'Meurthe et moselle' } },
 ]

Parameters

  • path String? the path to substitute
  • size Number How many chunk for sending to the external pipeline (optional, default 1)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors
  • cacheName String? Enable cache, with dedicated name
  • token String? add token values in the subpipeline (optional)

Returns Object

extract

Extrait de l'objet courant les valeurs de certains champs, et renvoie directement les valeurs dans le flux de sortie.

Note: extract ne peut pas fournir des valeurs undefined ou null.

Entrée:

[{
   "nom": "un",
   "valeur": 1,
   "important": false
},
{
   "nom": "deux",
   "valeur": 2,
   "important": true
}]

Script:

[use]
plugin = basics

[JSONParse]

[extract]
path = valeur
path = nom

[dump]

Sortie:

[[1,"un"],[2,"deux"]]

Parameters

  • path String? chemin d'un champ à extraire

Returns Object

fork

fork the current pipeline

Note: but each chunk is sent to the same external pipeline.

Parameters

  • standalone Boolean The current pipeline will be able to end without waiting for the end of the external pipeline (optional, default false)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors
  • target String choose the key to set with the forked request identifier (optional, default x-request-id)

Returns Object

group

Take all chunks, and throw them grouped by length.

See also ungroup.

[
     "a",
     "b",
     "c",
     "d",
     "e",
     "f",
     "g",
     "h"
]

Script:

[group]
length = 3

Output:

[
     [ "a", "b", "c" ],
     [ "d", "e", "f" ],
     [ "g", "h" ]
]

Parameters

  • length Number? Size of each partition

Returns String

identify

Take Object, and compute & add an identifier

Parameters

  • data
  • feed
  • scheme String scheme to use (uid or sha) (optional, default uid)
  • path String path containing the object Identifier (optional, default uri)

Returns String

ignore

Takes all the chunks, and ignore the firtst N chunk

Input file:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
},
{
   "a": 4
},
{
   "a": 5
}]

Script:

[ignore]
length = 3

Output:

[{
   "a": 4
},
{
   "a": 5
}]

Parameters

  • length Number? Length of the feed to ignore

Returns any

keep

Throw input Object but keep only specific fields.

Input file:

[{
   "a": "abcdefg",
   "b": "1234567",
   "c": "XXXXXXX"
},
{
   "a": "abcdefg",
   "b": "1234567",
   "c": "XXXXXXX"
}]

Script:

[keep]
path = a
path = b

Output:

[{
   "a": "abcdefg",
   "b": "1234567"
},
{
   "a": "abcdefg",
   "b": "1234567"
}]

Parameters

  • path String? path of field to keep

Returns Object

loop

Loop on external pipeline, until test will be true

Note: works like delegate, but each chunk use its own external pipeline

Parameters

  • test String? if test is true
  • reverse Boolean to reverse the test (optional, default false)
  • maxDepth Number to limit the number of loops (optional, default 100000)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in an object
  • command String? the external pipeline is described in an URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors
  • fusible String? Can be set with the ezs server fusible see env('request.fusible')

Returns Object

map

From an array field delegate processing of each items to an external pipeline

Note: works like delegate, but each chunk use its own external pipeline

Parameters

  • path String? the path to substitute
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in an object
  • command String? the external pipeline is described in an URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors

Returns Object

metrics

  • See: ../server/knownPipeline.js

Take Object, and throw the same object.

This statement will only be used if :

  • EZS_METRICS is enabled
  • ezs is running in server mode

WARNING: avoid setting bucket to "input" or "output", as these labels are used by ezs. If you do, you risk distorting the associated metrics.

Parameters

  • pathName String to identify the script (optional, default auto)
  • bucket String to identify the moment of measurement (optional, default unknow)

Returns Object

overturn

Takes an Object and substitute twice a field with the corresponding value found in a external pipeline the internal pipeline receive a special object { id, value, token } :

  • id is the item identifier
  • value is the item path value,
  • token is an array containing stream id and an number (0 for first time, 1 for the second tme The internal pipeline can overturn value with another.

It's work like [expand] but the second call starts only when all the values of the stream have been sent once

[
          { year: 2000, dept: 'Meuse' },
          { year: 2001, dept: 'Moselle' },
          { year: 2003, dept: 'Vosges'},
]

Script #1:

[overturn]
path = dept

[overturn/assign]
path = value
value = get('value').split('').reverse().join('')

Output:

 [
          { year: 2000, dept: 'Meuse' },
          { year: 2001, dept: 'Moselle' },
          { year: 2003, dept: 'Vosges' },
 ]

Script #2:

[overturn]
path = dept

[overturn/drop]
  path = token.1
  if = 0

[overturn/assign]
path = value
value = get('value').split('').reverse().join('')

Output:

 [
          { year: 2000, dept: 'esueM' },
          { year: 2001, dept: 'ellesoM' },
          { year: 2003, dept: 'segsoV' },
 ]

Parameters

  • path String? the path to overturn
  • size Number How many chunk for sending to the external pipeline (optional, default 1)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command

Returns Object

pack

Take all Object, throw encoded String

Returns String

parallel

Takes an Object delegate processing to X internal pipelines

Parameters

  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors

Returns Object

pop

  • See: shift

Return the last Object and close the feed

Input file:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
},
{
   "a": 4
},
{
   "a": 5
}]

Script:

[shift]

Output:

[{
   "a": 5
}]

Returns Object

remove

Take Object and remove it from the feed if test is true Input file:

[{
   a: "a"
},
{
   a: 2
},
{
   a: "b"
},
{
   a: 4
},
{
   a: "c"
}]

Script:

[remove]
test = get('a).isInteger()
reverse = true

Output:

[
    {
       a: 2
    },
    {
       a: 4
    }
]

Parameters

  • test String? if test is true
  • reverse String reverse the test (optional, default false)

Returns Object

replace

Take Object and replace it with a new object with some fields.

See also exchange and assign.

Input file:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
},
{
   "a": 4
},
{
   "a": 5
}]

Script:

[replace]
path = b.c
value = 'X'

Output:

[{
   "b": { "c": "X" }
},
{
   "b": { "c": "X" }
},
{
   "b": { "c": "X" }
},
{
   "b": { "c": "X" }
},
{
   "b": { "c": "X" }
}]

Parameters

  • path String? path of the new field
  • value String? value of the new field

Returns Object

shift

Return the first Object and close the feed

Input file:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
},
{
   "a": 4
},
{
   "a": 5
}]

Script:

[shift]

Output:

[{
   "a": 1
}]

Returns Object

shuffle

Take Object, shuffle data of the whole object or only some fields specified by path

Input file:

[{
   "a": "abcdefg",
   "b": "1234567"
},
{
   "a": "abcdefg",
   "b": "1234567"
}]

Script:

[shuffle]
path = a

Output:

[{
   "a": "cadbefg",
   "b": "1234567"
},
{
   "a": "dcaegbf",
   "b": "1234567"
}]

Parameters

  • path String? path of field to shuffle

Returns Object

singleton

Takes only the first Object delegate processing to a external pipeline

Parameters

  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in a object
  • command String? the external pipeline is described in a URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors

Returns Object

spawn

Delegate processing to an external pipeline, throw each chunk from the result.

Note: works like delegate, but each chunk use its own external pipeline

Parameters

  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in an object
  • command String? the external pipeline is described in an URL-like command
  • logger String? A dedicaded pipeline described in a file to trap or log errors
  • cache String? Use a specific ezs statement to run commands (advanced)

Returns Object

swing

Delegate processing to an external pipeline under specifics conditions

Note: works like spawn, but each chunk shares the same external pipeline.

Parameters

  • test String? if test is true
  • reverse String reverse the test (optional, default false)
  • file String? the external pipeline is described in a file
  • script String? the external pipeline is described in a string of characters
  • commands String? the external pipeline is described in an object
  • command String? the external pipeline is described in an URL-like
  • logger String? A dedicaded pipeline described in a file to trap or log errors command

Returns Object

throttle

Take Object and return the same object

[{
         { id: 'x', value: 2 },
         { id: 't', value: 2 },
}]

Script:

[use]
plugin = analytics

[throttle]
bySecond = 2

Output:

[
         { id: 'x', value: 2 },
         { id: 't', value: 2 },
]

Parameters

  • bySecond Number Number of object by second (optional, default 1)

Returns Object

time

Measure the execution time of a script, on each chunk of input.

Parameters

Examples

Input

[1]

Program

const script = `
[transit]
`;
from([1])
    .pipe(ezs('time', { script }))

Output

[{
  data: 1,
  time: 15 // milliseconds
}]

Returns object

tracer

Take Object, print a character and throw the same object. Useful to see the progress in the stream.

Parameters

  • print String character to print at each object (optional, default .)
  • last String character to print at last call (optional, default .)
  • first String character to print at first call (optional, default .)

Returns Object

transit

Take Object and throw the same object again.

Input file:

[{
   "a": 1
},
{
   "a": 2
}]

Script:

[transit]

Output:

[{
   "a": 1
},
{
   "a": 2
}]

Returns Object

truncate

Takes all the chunks, and closes the feed when the total length is equal to the parameter.

Input file:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
},
{
   "a": 4
},
{
   "a": 5
}]

Script:

[truncate]
length = 3

Output:

[{
   "a": 1
},
{
   "a": 2
},
{
   "a": 3
}]

Parameters

  • length Number? Length of the feed

Returns any

ungroup

Take all chunks, and throw one item for every chunk.

See also group.

[
     [ "a", "b", "c" ],
     [ "d", "e", "f" ],
     [ "g", "h" ]
]

Script:

[ungroup]

Output:

[
     "a",
     "b",
     "c",
     "d",
     "e",
     "f",
     "g",
     "h"
]

Returns Array<any>

unpack

Take Strings or Buffers and throw Object builded by JSON.parse on each line.

Returns object

use

Take all String, concat them and throw just one.

Script:

[use]
plugin = basics
plugin = analytics

Parameters

  • beginWith String? Add value at the begin
  • joinWith String? use value to join 2 chunk
  • endWith String? Add value at the end

Returns String

validate

From an Object, throw the same object if all rules pass

See

Input file:

[{
   "a": 1,
   "b": "titi"
},
{
   "a": 2,
   "b": "toto"
},
{
   "a": false
},
]

Script:

[validate]
path = a
rule = required|number

path = a
rule = required|string

Output:

[{
   "a": 1,
   "b": "titi"
},
{
   "a": 2,
   "b": "toto"
}]

Parameters

  • path String? path of the field
  • rule String? rule to validate the field

Returns Object