-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathsqs_grep.node.txt
166 lines (119 loc) · 8.71 KB
/
sqs_grep.node.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
┏━━━━━━━━━━━━━━┓
┃ SQS_GREP ┃
┗━━━━━━━━━━━━━━┛
VERSION ==> #1.18.3
SUMMARY ==> #Scans every MESSAGE in the QUEUE
#For each MESSAGE, can output, copy, delete, move, publish to SNS
#Can filter and transform them
#Can interrupt manually, by time, or NUM of MESSAGEs
┌─────────┐
│ CLI │
└─────────┘
sqs-grep #
--OPT #OPTS.* below
--inputCredentials|i #BOOL (def: false). Sets OPTS.secretAccessKey|accessKeyId via stdin prompt
┌──────────────────┐
│ PROGRAMMATIC │
└──────────────────┘
new SqsGrep(OPTS) #SGREP
SGREP.options #OPTS
SGREP.run()->>SGREP_RES #Scans the whole QUEUE by repeating ReceiveMessage()
#Uses MESSAGE:
# - MaxNumberOfMessages 10
# - not WaitTimeSeconds|MessageSystemAttributeNames|ReceiveRequestAttemptId
SGREP[_RES].qtyScanned #NUM of MESSAGEs received
#Also printed in logs
SGREP[_RES].qtyMatched #NUM of MESSAGEs matched
#Also printed in logs
┌─────────┐
│ API │
└─────────┘
OPTS.secretAccessKey
OPTS.accessKeyId
OPTS.sessionToken #COPTS.* (def: use SDK's default)
OPTS.region|r #COPTS.region (def: 'us-east-1')
OPTS.endpointUrl #COPTS.endpoint
OPTS.maxRetries #NUM (def: 3). COPTS.maxAttempts
SGREP|OPTS.sqs|sns #SQS|SNS CLIENT (def: automatically used)
#If set, cannot set OPTS.secretAccessKey|accessKeyId|sessionToken|region|endpointUrl|maxRetries|verbose
#Not with CLI
┌───────────┐
│ INPUT │
└───────────┘
'QUEUE' #In OPTS.queue|moveTo|copyTo, either:
# - 'QUEUE_URL'
# - 'QUEUE' name: calls GetQueueUrl()
#Always calls GetQueueAttributes()
OPTS.queue|q #'QUEUE'
OPTS.inputFile #'PATH.json' with newline-separated MESSAGE_RES
#If set, does not use OPTS.queue nor calls ReceiveMessage()
#Can use OPTS.outputFile + OPTS.full to produce it, i.e. to cache
┌────────────┐
│ OUTPUT │
└────────────┘
OPTS.outputFile|o #'PATH.json' with each 'MESSAGE_BODY', newline-separated
OPTS.full|f #BOOL (def: false). Makes each line in OPTS.outputFile be MESSAGE_RES instead (only Body|Attributes|MessageAttributes)
OPTS.silent|s #BOOL (def: false). Make OPTS.outputFile an noop
SGREP|OPTS.log #FUNC(STR) (def: console.log). Progress|info logs
#Not with CLI
OPTS.verbose #BOOL (def: false). Passes OPTS.log to COPTS.logger, i.e. logs API calls
┌──────────┐
│ STOP │
└──────────┘
OPTS.maxMessages|m #NUM (def: 0). Stops when matching NUM MESSAGEs
OPTS.emptyReceives|e #NUM (def: 5). Stops when NUM consecutive ReceiveMessage() return 0 MESSAGEs
OPTS.timeout|t #NUM (in secs, def: 1m). Stops when reaching timeout
#MESSAGE.VisibilityTimeout is also set according to it
#Cannot be 0
SGREP.interrupt() #Manually stops SGREP.run()
#With CLI, called on SIGINT
┌──────────────┐
│ THROTTLE │
└──────────────┘
OPTS.wait|w #NUM (def: 0) of secs to wait each time ReceiveMessage() returns 0 MESSAGEs
OPTS.maxTPS #NUM (def: 0, i.e. no limits). Throttles max NUM matched MESSAGEs per second
OPTS.parallel|j #NUM (def: 1) of parallel API calls at once
┌────────────┐
│ FILTER │
└────────────┘
OPTS.all #BOOL (def: false). If true, no filters
#Must be true if no OPTS.body|attribute
OPTS.body|b #'REGEXP'. Filter by 'MESSAGE_BODY'
OPTS.attribute|a #OBJ_ARR: attr 'MESSAGE_ATTR', regexp REGEXP
#Filter by any of those MESSAGE_ATTRs
#Not with ATTR_VALUE.DataType 'Binary'
#In CLI: multiple 'MESSAGE_ATTR=REGEXP'
OPTS.negate|n #BOOL (def: false). Inverts OPTS.body|attribute
┌───────────────┐
│ TRANSFORM │
└───────────────┘
OPTS.scriptFile #'PATH.js', loaded with require(), exporting EXPORT
EXPORT.preProcessMessage #FUNC(MESSAGE_RES)->> on every received MESSAGE
#Can modify MESSAGE_RES.*
#`this` is SGREP
EXPORT.preProcessMatchedMessage #Same but on every matched MESSAGE
┌────────────┐
│ ACTION │
└────────────┘
OPTS.copyTo #'QUEUE2' (def: none). Copy matched MESSAGEs to another 'QUEUE2' by calling SendMessage()
#Sets default for MESSAGE_GROUP_ID (def: 'fifo') and MESSAGE_DID (def: 'MESSAGE_MID')
#Keeps MESSAGE_BODY, MESSAGE_ATTRS, MESSAGE_GROUP_ID, MESSAGE_DID
#Does not keep other SYS_ATTRS
OPTS.delete #BOOL (def: false). Delete matched MESSAGEs by calling DeleteMessage()
OPTS.moveTo #'QUEUE2' (def: none). Like OPTS.copyTo + OPTS.delete
OPTS.redrive #BOOL (def: false). Same as OPTS.moveTo SOURCE_QUEUE
#I.e. OPTS.queue must be a DLQ
#Uses ListDeadLetterSourceQueues()
OPTS.stripAttributes #BOOL. If false (def), keeps MESSAGE_ATTRS with OPTS.copyTo|moveTo|publishTo|republish
┌─────────┐
│ SNS │
└─────────┘
OPTS.publishTo #'TOPIC_ARN' (def: none). Copy to SNS using Publish()
#Keeps MESSAGE_BODY, MESSAGE_ATTRS
#Does not set MessageGroupId, MessageStructure, PhoneNumber, Subject, TargetArn
#If MESSAGE_BODY is a SNS NOTIFICATION
# - i.e. was sent from a SNS SUB to SQS
# - re-send the original SNS MESSAGE_BODY|MESSAGE_ATTRS
# - i.e. the one wrapped inside the SQS MESSAGE_BODY
OPTS.republish #BOOL (def: false). Same but MESSAGE must have been sent from a SNS SUB to SQS
#Other MESSAGEs are ignored