-
-
Notifications
You must be signed in to change notification settings - Fork 117
/
Copy pathdevice.py
2669 lines (2338 loc) · 143 KB
/
device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Cpppo -- Communication Protocol Python Parser and Originator
#
# Copyright (c) 2013, Hard Consulting Corporation.
#
# Cpppo is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version. See the LICENSE file at the top of the source tree.
#
# Cpppo is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
from __future__ import absolute_import, print_function, division
try:
from future_builtins import zip, map # Use Python 3 "lazy" zip, map
except ImportError:
pass
__author__ = "Perry Kundert"
__email__ = "[email protected]"
__copyright__ = "Copyright (c) 2013 Hard Consulting Corporation"
__license__ = "Dual License: GPLv3 (or later) and Commercial (see LICENSE)"
"""
enip.device -- support for implementing an EtherNet/IP device Objects and Attributes
"""
__all__ = ['dialect', 'lookup_reset', 'lookup', 'resolve', 'resolve_element',
'redirect_tag', 'resolve_tag',
'parse_int', 'parse_path', 'parse_path_elements', 'parse_path_component',
'port_link', 'parse_route_path', 'parse_connection_path',
'RequestUnrecognized', 'Object', 'Attribute',
'Connection_Manager', 'Message_Router', 'Identity', 'TCPIP']
import ast
import contextlib
import itertools
import json
import logging
import random
import struct
import sys
import threading
import traceback
try:
import reprlib
except ImportError:
import repr as reprlib
import configparser # Python2 requires 'pip install configparser'
from ...dotdict import dotdict
from ...automata import ( type_str_base,
peekable, rememberable,
decide,
dfa, dfa_post, state )
from ... import misc
from . import defaults
from .parser import ( UDINT, DINT, DWORD, INT, UINT, WORD, USINT,
EPATH, EPATH_padded, SSTRING, STRING, IFACEADDRS,
typed_data,
octets, octets_encode, octets_noop, octets_drop, move_if,
enip_format, status )
# Default "dialect" of EtherNet/IP CIP protocol. If no Message Router object is available (eg. we
# are a "Client", not a "Controller"), then we need to know the dialect of EtherNet/IP CIP to use.
# For that, we need a parser defined. All client/connector instances use the same globally-defined
# cpppo.server.enip.client.dialect parser. The default dialect is "Logix" (eg. Read/Write Tag
# [Fragmented], etc.)
dialect = None # Default: typically logix.Logix
log = logging.getLogger( "enip.dev" )
#
# directory -- All available device Objects and Attributes (including the "class" instance 0)
# lookup -- Find a class/instance/attribute
#
# Object/Instance/Attribute lookup. The Object is stored at (invalid)
# attribute_id 0. For example:
#
# directory.6.0 Class 6, Instance 0: (metaclass) directory of Object/Attributes
# directory.6.1 Class 6, Instance 1: (instance) directory of Object/Attributes
# directory.6.1.0 Class 6, Instance 1: device.Object (python instance)
# directory.6.1.1 Class 6, Instance 1, Attribute 1 device.Attribute (python instance)
#
directory = dotdict()
def __directory_path( class_id, instance_id=0, attribute_id=None ):
"""It is not possible to in produce a path with an attribute_id=0; this is
not a valid Attribute ID. The '0' entry is reserved for the Object, which is
only accessible with attribute_id=None."""
assert attribute_id != 0, \
"Class %5d/0x%04x, Instance %3d; Invalid Attribute ID 0"
return str( class_id ) \
+ '.' + str( instance_id ) \
+ '.' + ( str( attribute_id if attribute_id else 0 ))
def lookup( class_id, instance_id=0, attribute_id=None ):
"""Lookup by path ("#.#.#" string type), or numeric class/instance/attribute ID"""
exception = None
try:
key = class_id
if not isinstance( class_id, type_str_base ):
assert type( class_id ) is int
key = __directory_path(
class_id=class_id, instance_id=instance_id, attribute_id=attribute_id )
res = directory.get( key, None )
except Exception as exc:
exception = exc
res = None
finally:
log.detail( "Class %5d/0x%04X, Instance %3d, Attribute %5r ==> %s",
class_id, class_id, instance_id, attribute_id,
res if not exception else ( "Failed: %s" % exception ))
return res
#
# symbol -- All known symbolic address
# redirect_tag -- Direct a tag to a class, instance and attribute
# resolve* -- Resolve the class, instance [and attribute] from a path or tag.
#
# A path is something of the form:
#
# {
# 'size':6,
# 'segment':[
# {'symbolic':'SCADA'},
# {'element':123}]
# }
#
# Multiple symbolic and element entries are allowed. This is used for addressing structures:
#
# boo[3].foo
#
# or for indexing multi-dimensional arrays:
#
# table[3][4]
#
# or returning arbitrary sets of elements from an array:
#
# array[3,56,179]
#
# The initial segments of the path must address a class and instance.
#
#TODO: A Tag must be able to (optionally) specify an element
symbol = {}
symbol_keys = ('class', 'instance', 'attribute')
def lookup_reset():
"""Clear any known CIP Objects, and any Tags referencing to their Attributes. Note that each CIP
Object-derived class will retain its .max_instance variable, so future instances will get new
(higher) Instance IDs, unless you provide an instance_id=... to the constructor.
WARNING: This is really mostly for testing multiple CIP Object configurations in a single Python
interpreter run, and is not recommended for production usage. It suffers from removing
references to Object and Attribute instances that are necessary for internal consistency in the
CIP heirarchy, so if the directory is reset, avoid interogating any Object or Attribute from the
prior configuration. See MaxInstance.value, for example.
"""
global directory
global symbol
directory = dotdict()
symbol = {}
def redirect_tag( tag, address ):
"""Establish (or change) a tag, redirecting it to the specified class/instance/attribute address.
Make sure we stay with only str type tags (mostly for Python2, in case somehow we get a Unicode
tag). Multi-segment symbolic tags are expected to be looked up as: symbol["<symbol1>.<symbol2>"]
All Tag lookups are case-insensitive, so are stored lower-case.
"""
assert isinstance( address, dict )
assert all( k in symbol_keys for k in address )
assert all( k in address for k in symbol_keys )
symbol[str( tag ).lower()] = address
return tuple( address[k] for k in symbol_keys )
def resolve_tag( tag ):
"""Return the (class_id, instance_id, attribute_id) tuple corresponding to tag, or None if not specified"""
address = symbol.get( str( tag ).lower(), None )
if address:
return tuple( address[k] for k in symbol_keys )
return None
def resolve( path, attribute=False ):
"""Given a path, returns the fully resolved (class,instance[,attribute]) tuple required to lookup an
Object/Attribute. Won't allow over-writing existing elements (eg. 'class') with symbolic data
results; build up Tags from multiple symbolic paths, eg. "Symbol.Subsymbol". We only recognize
{'symbolic':<str>}, ... and {'class':<int>}, {'instance':<int>}, {'attribute':<int>} paths.
Other valid paths segments (eg. {'port':...}, {'connection':...}) are not presently usable in
our Controller communication simulation.
Call with attribute=<Truthy> to force resolving to the Attribute level; otherwise, always returns
None for the attribute. If an attribute is required, but is not supplied in the path, then we
can default one. This is an (unexpected) feature of the C*Logix PLCs; you can do a Read Tag
Fragmented asking for eg. @0x6b/0x0008[0] (a Class, Instance and Element but no Attribute!).
Presumably there is an implied default Attribute number; we're guessing it's 1? Undocumented.
"""
result = { 'class': None, 'instance': None, 'attribute': None }
tag = '' # developing symbolic tag "Symbol.Subsymbol"
for term in path['segment']:
if ( result['class'] is not None # Got Class already
and result['instance'] is not None # Got Instance already
and (
result['attribute'] is not None # Got Attribute already
or not attribute # or no Attribute desired (must return None)
or ( attribute is not True # or a default attribute is supplied
and 'attribute' not in term ) # and the term didn't contain a supplied one
)
):
break # All desired terms specified; done! (ie. ignore subsequent 'element')
working = dict( term )
while working:
# Each term is something like {'class':5}, {'instance':1}, or (from symbol table):
# {'class':5,'instance':1}. Pull each key (eg. 'class') from working into result,
# but only if not already defined.
for key in result:
if key in working:
# If we hit non-symbolic segments, any tag resolution had better be complete
assert result[key] is None, \
"Failed to override %r==%r with %r from path segment %r in path %r" % (
key, result[key], working[key], term, path['segment'] )
result[key] = working.pop( key ) # single 'class'/'instance'/'attribute' seg.
if working:
assert 'symbolic' in working, \
( "Unrecognized symbolic name %r found in path %r" % ( tag, path['segment'] )
if tag
else "Invalid term %r found in path %r" % ( working, path['segment'] ))
tag += ( '.' if tag else '' ) + str( working['symbolic'] )
working = None
if tag.lower() in symbol:
working = dict( symbol[tag.lower()] )
tag = ''
# Any tag not recognized will remain after all resolution complete
assert not tag, \
"Unrecognized symbolic name %r found in path %r" % ( tag, path['segment'] )
# Handle the case where a default Attribute value was provided, and none was supplied
if result['attribute'] is None and attribute and attribute is not True:
assert isinstance( attribute, int )
result['attribute'] = attribute
# Make sure we got everything we required
assert ( result['class'] is not None and result['instance'] is not None
and ( not attribute or result['attribute'] is not None )), \
"Failed to resolve required Class (%r), Instance (%r) %s Attribute(%r) from path: %r" % (
result['class'], result['instance'], "and the" if attribute else "but not",
result['attribute'], path['segment'] )
result = result['class'], result['instance'], result['attribute'] if attribute else None
log.detail( "Class %5d/0x%04x, Instance %3d, Attribute %5r <== %r",
result[0], result[0], result[1], result[2], path['segment'] )
return result
def resolve_element( path ):
"""Resolve an element index tuple from the path; defaults to (0, ) (the 0th element of a
single-dimensional array).
"""
element = []
for term in path['segment']:
if 'element' in term:
element.append( term['element'] )
break
return tuple( element ) if element else (0, )
def parse_int( x, base=10 ):
"""Try parsing in the target base, but then also try deducing the base (eg. if we are provided with
an explicit base such as 0x..., 0o..., 0b...).
The reason this is necessary (instead of just using int( x, base=0 ) directly) is because we
don't want leading zeros (eg. "012") to be interpreted as indicating octal (which is the default).
"""
try:
return int( x, base=base )
except ValueError:
return int( x, base=0 )
#
# Parsing of a symbolic tag like: 'Tag.Sub_Tag[<index>].Yet_More[<index>-<index>]', or a numeric tag
# like: '@<class>/<instance>/<attribute>/<element>' or "@<class>/{"connection":123}/<attribute>".
#
# parse_path -- Returns a list containing EPATH segments.
# parse_path_elements -- Returns '.'-separated EPATH segments, w/ element, count if any (otherwise None)
# parse_path_component -- Parses a single 'str' EPATH component
#
def parse_path( path, elm=None ):
"""Convert a "."-separated sequence of "Tag" or "@<class>/<instance>/<attribute>" to a list of
EtherNet/IP EPATH segments (if a string is supplied). Numeric form allows
<class>[/<instance>[/<attribute>[/<element>]]] by default, or any segment type at all by
providing it in JSON form, eg. .../{"connection":100}.
Resultant path will be a list of the form [{'symbolic': "Tag"}, {'element': 3}], or [{'class':
511}, {'instance': 1}, {'attribute': 2}].
If strings are supplied for path or element, any numeric data (eg. class, instance, attribute or
element numbers) default to integer (eg. 26), but may be escaped with the normal base indicators
(eg. 0x1A, 0o49, 0b100110). Leading zeros do NOT imply octal.
Also supported is the manual assembly of the path segments: @{"class":0x04}/5/{"connection":100}
A trailing element count may be included in the path, but this interface provides no mechanism
to return an element count. A default <element> 'elm' keyword (if non-None) may be supplied.
"""
return parse_path_elements( path, elm=elm )[0]
def parse_path_elements( path, elm=None, cnt=None ):
"""Returns (<path>,<element>,<count>). If an element is specified (eg. Tag[#]), then it will be
added to the path (or replace any existing element segment at the end of the path) and returned,
otherwise None will be returned. If a count is specified (eg. Tag[#-#] or ...*#), then it will
be returned; otherwise a None will be returned.
Any "."-separated EPATH component (except the last) including an element index must specify
exactly None/one element, eg: "Tag.SubTag[5].AnotherTag[3-4]".
A default <element> 'elm' and/or <count> 'cnt' (if non-None) may be specified.
"""
if isinstance( path, list ) and len( path ) == 1 and isinstance( path[0], type_str_base ):
# Unpack single-element list containing a string
path = path[0]
elif not isinstance( path, type_str_base ):
# Already better be a list-like CIP path...
assert isinstance( path, list ) and all( isinstance( p, dict ) for p in path ), \
"parse_path unrecognized: %r" % ( path, )
return path,None,None
segments = []
p = path.split( '.' )
while len( p ) > 1:
s,e,c = parse_path_component( p.pop( 0 ))
assert c in (None,1), "Only final path segment may specify multiple elements: %r" % ( path )
segments += s
s,elm,cnt = parse_path_component( p[0], elm=elm, cnt=cnt )
return segments+s,elm,cnt
def parse_path_component( path, elm=None, cnt=None ):
"""Parses a single str "@class/instance/attribute" or "Tag" segment, optionally followed by a
"[<begin>-<end>]" and/or "*<count>". Returns <path>,<element>,<count>. Priority for computing
element count is the "[<begin>-<end>]" range, any specified "*<count>", and finally the supplied
'cnt' (default: None).
"""
if '*' in path:
path,cnt = path.split( '*', 1 )
cnt = parse_int( cnt )
if '[' in path:
path,elm = path.split( '[', 1 )
elm,rem = elm.split( ']' )
assert not rem, "Garbage after [...]: %r" % ( rem )
lst = None
if '-' in elm:
elm,lst = elm.split( '-' )
lst = int( lst )
elm = int( elm )
if lst is not None:
cnt = lst + 1 - elm
assert cnt > 0, "Invalid element range %d-%d" % ( elm, lst )
segments = []
if path.startswith( '@' ):
# Numeric and/or JSON. @<class>/<instance>/<attribute>/<element> (up to 4 segments)
try:
defaults = ('class','instance','attribute','element')
for i,seg in enumerate( path[1:].split( '/' )):
if seg.startswith( '{' ):
trm = json.loads( seg )
else:
assert i < len( defaults ), "No default segment type beyond %r" % ( defaults )
trm = {defaults[i]: parse_int( seg )}
segments.append( trm )
except Exception as exc:
raise Exception( "Invalid @%s; 1-4 (default decimal) terms, eg. 26, 0x1A, {\"connection\":100}, 0o46, 0b100110: %s" % (
'/'.join( '<%s>' % d for d in defaults ), exc ))
else:
# Symbolic Tag
segments.append( { "symbolic": path } )
if elm is not None:
if not segments or 'element' not in segments[-1]:
segments.append( {} )
segments[-1]['element'] = elm
return segments,elm,cnt
def port_link( pl ):
"""Convert "1/1" or "2/1.2.3.4" (or validate provided dict) to: {"port": 1, "link": 1} or {"port":
2, "link": "1.2.3.4"}. Link may be integer or IPv4 dotted-quad address. Raises an Exception if
not valid port/link types. This result could be one element of a route_path list.
"""
if isinstance( pl, type_str_base ):
pl = map( str.strip, str( pl ).split( '/', 1 ))
if not isinstance( pl, dict ):
# If its not already a dict, it better be an iterable satisfying exactly [<port>, <link>]
try:
port,link = pl
except:
raise AssertionError( "port/link: must have exactly 2 components, not: %r" % ( pl ))
pl = { "port": port, "link": link }
assert isinstance( pl, dict ) and 'port' in pl and 'link' in pl, \
"""port/link: must be dict containing { "port": <int>, "link": <int>/"<ip>" }"""
try:
pl["port"] = int( pl["port"] )
except:
raise AssertionError( "port/link: port must be an integer" )
assert pl["port"] > 0, \
"port/link: port number must be > 0"
try:
pl["link"] = int( pl["link"] )
except: # Not an int; must be an IPv{4,6} address; canonicalize
try:
pl["link"] = str( misc.ip( pl["link"] ))
except Exception as exc:
raise AssertionError( "port/link: %r: %s" % ( pl["Link"], exc ))
return pl
def parse_route_path( route_path, trailer_parser=None ):
"""A route path is None/0/False, or list of port/link[/port/link] segments. Allows a single
port/link element to be specified bare, and will en-list it, eg: "--route_path=1/2".
Must either result in a Falsey, or a valid sequence of port/link[/port/link...], followed by
whatever sequence trailer_parser produces (if supplied).
"""
if isinstance( route_path, type_str_base ):
try:
route_path = json.loads( route_path )
if route_path and isinstance( route_path, dict ):
# a dict; validate as eg. [{"port":<int>,"link":<int>/"<ip>"}]
route_path = [route_path]
assert isinstance( route_path, list ), \
"route_path invalid; must resolve to list, not: %r" % ( route_path, )
except Exception as exc:
# Handle multiple route_path strings like: "1/0/2/1.2.3.4", by splitting on even '/'.
# Ceases splitting when port_link fails to recognize a component; the remainder is
# re-joined and appended for processing in final stage, below.
assert route_path[:1] not in '[{"', \
"route_path JSON invalid: %r; %s" % ( route_path, exc, ) # JSON was intended, but was invalid
rps = []
pls = iter( route_path.split( '/' ))
pl = list( itertools.islice( pls, 2 ))
while pl:
try:
rps.append( port_link( pl ))
except Exception as exc:
# Done processing; this wasn't a valid port_link element
break
pl = list( itertools.islice( pls, 2 ))
# Done all port_link segments; put any remaining back on the end
trailer = '/'.join( pl + list( pls ))
if trailer:
rps.append( trailer )
log.info( "Converted route_path %r to %r", route_path, rps )
route_path = rps
else:
# Was JSON; better be one of the known types
assert isinstance( route_path, (type(None),bool,int,list)), \
"route_path: must be null/0/false/true or a (sequence of) port/link, not: %r" % ( route_path )
if route_path:
# not a JSON 0/false/null (0/False/None); must be a sequence of str/dict port_link elements,
# followed optionally by something acceptable to trailer_parser (producing a sequence)
rps = []
pls = iter( route_path )
pl = next( pls, None )
while pl:
try:
rps.append( port_link( pl ))
except Exception as exc:
break
pl = next( pls, None )
trs = ( [] if pl is None else [ pl ] ) + list( pls )
if trs:
# All trailer elements are CIP paths
assert trailer_parser, "route_path unhandled: %r" % ( trs, )
try:
pth = trailer_parser( trs )
rps.extend( pth )
except Exception as exc:
raise AssertionError( "route_path invalid: %s" % ( exc ))
log.info( "Converted route_path %r to %r", route_path, rps )
route_path = rps
return route_path
def parse_connection_path( path ):
"""A Connection Path (eg. for Forward Open) consists of a route path, eg. 2/192.168.0.24/1/0
(eg. port 2, link 192.168.0.24, then port 1 (backplane), link 0. Following may optionally be
another '/' + CIP path, eg. "/@2/1" (Connection Manaager), "/@1/1/7" (CIP Identity ProductName
SSTRING), or "/TagName[0].SubName"). The only restriction is that a 'symbolic' CIP path may not
contain a '/' character.
The trailing CIP path is identified typically by the presense of a non-[0-9.:] element (ie. not
a port/link component); once the parse_route_path and port_link ceases to be able to parse
'/'-separated components, then parse_path must successfully consume the remainder.
The only ambiguity is between IPv6 addresses eg. '2001:db8::1' and Tag names; therefore, we do
not support Tag names with ':' symbols in them; if necessary, supply these as JSON-encoded
connection paths.
"""
return parse_route_path( path, trailer_parser=parse_path )
#
# EtherNet/IP CIP Object Attribute
#
class Attribute( object ):
"""A simple Attribute just has a default scalar value of 0. We'll instantiate an instance of the
supplied enip.TYPE/STRUCT class as the Attribute's .parser property. This can be used to parse
incoming data, and produce the current value in bytes form.
The value defaults to a scalar 0, but may be configured as an array by setting default to a list
of values of the desired array size.
If an error code is supplied, requests on the Attribute should fail with that code.
To interface to other types of data (eg. remote data), supply as 'default' an object that
supplies the following interface:
o.__len__() -- DOESN'T EXIST if scalar; returns number of elements if vector (a str is considered scalar)
o.__repr__() -- Some representation of the object; a few of its elements, an address
o.__getitem__(b[:e[:s]]) -- Raise TypeError if scalar; return an item/slice if a vector
o.__setitem(k,v) -- Raise TypeError if scalar, store the value(s) v at index/slice k if vector
o.__int__(), __float__() -- Scalars should directly implement conversion methods; vectors should return
objects (on [int]) or iterables of objects (on [slice]) convertible to
int/float. These will be accessed by functions such as struct.pack()
Note that it is impossible to capture assignment to a scalar value; all remote data must be
vectors, even if they only have a single element. However, for Attributes whose underlying
'default' value is a simple scalar type, we'll support simple value assignment (it will replace
the underlying 'default' value with a new instance of the same type).
Therefore, for scalar types, it is important to ensure that the original default=... value supplied is
of the correct type; eg. 'float' for REAL, 'int', for SINT/INT/DINT types, etc.
"""
MASK_GA_SNG = 1 << 0
MASK_GA_ALL = 1 << 1
def __init__( self, name, type_cls, default=0, error=0x00, mask=0 ):
self.name = name
self.default = default
self.scalar = isinstance( default, type_str_base ) or not hasattr( default, '__len__' )
self.parser = type_cls()
self.error = error # If an error code is desired on access
self.mask = mask # May be hidden from Get Attribute(s) All/Single
@property
def value( self ):
return self.default
@value.setter
def value( self, v ):
assert self.scalar, "Scalar assignment to %s not supported" % type( self.default )
self.default = type(self.default)( v )
def __str__( self ):
return "%-24s %10s%s == %s" % (
self.name, self.parser.__class__.__name__,
( ("[%4d]" % len( self )) if not self.scalar else ( " x%-4d" % len( self )) ), reprlib.repr( self.value ))
__repr__ = __str__
def __len__( self ):
"""Scalars are limited to 1 indexable element, while arrays (implemented as lists) are limited to
their length. """
return 1 if self.scalar else len( self.value )
# Indexing. This allows us to get/set individual values in the Attribute's underlying data
# repository. Simple, linear slices are supported.
def _validate_key( self, key ):
"""Support simple, linear beg:end slices within Attribute len with no truncation (accepts
slices like [:], with a slice.stop of None); even on scalars, allows [0:1]. Returns type of
index, which must be slice or int. We do not validate that the length of the assignment
equals the length of the slice! The caller must ensure this is the same, or truncation /
extension of the underlying datastore would occur.
"""
if isinstance( key, slice ):
start,stop,stride = key.indices( len( self ))
if stride == 1 and start < stop and stop <= len( self ) and key.stop in (stop,None):
return slice
raise KeyError( "%r indices %r too complex, empty, or beyond Attribute length %d" % (
key, (start,stop,stride), len( self )))
if not isinstance( key, int ) or key >= len( self ):
raise KeyError( "Attempt to access item at key %r beyond Attribute length %d" % ( key, len( self )))
return int
def __getitem__( self, key ):
if self._validate_key( key ) is slice:
# Returning slice of elements; always returns an iterable
return [ self.value ] if self.scalar else self.value[key]
# Returning single indexed element; always returns a scalar
return self.value if self.scalar else self.value[key]
def __setitem__( self, key, value ):
"""Allow setting a scalar or indexable array item. We will not confirm length of supplied value for
slices, to allow iterators/generators to be supplied."""
if log.isEnabledFor( logging.INFO ):
log.info( "Setting %s %s %s[%r] to %r", "scalar" if self.scalar else "vector", type( self.value ),
( repr if log.isEnabledFor( logging.DEBUG ) else misc.reprlib.repr )( self.value ),
key, value )
if self._validate_key( key ) is slice:
# Setting a slice of elements; always supplied an iterable; must confirm size
if self.scalar:
self.value = next( iter( value ))
else:
self.value[key] = value
return
# Setting a single indexed element; always supplied a scalar
if self.scalar:
self.value = value
else:
self.value[key] = value
def produce( self, start=0, stop=None ):
"""Output the binary rendering of the current value, using enip type_cls instance configured,
to produce the value in binary form ('produce' is normally a classmethod on the type_cls).
Both scalar and vector Attributes respond to appropriate slice indexes.
"""
if stop is None:
stop = len( self )
return b''.join( self.parser.produce( v ) for v in self[start:stop] )
class MaxInstance( Attribute ):
def __init__( self, name, type_cls, class_id=None, **kwds ):
assert class_id is not None
self.class_id = class_id
super( MaxInstance, self ).__init__( name=name, type_cls=type_cls, **kwds )
@property
def value( self ):
"""Look up any instance of the specified class_id; it has a max_instance property, which
is the maximum instance number allocated thus far. """
return lookup( self.class_id, 0 ).max_instance
def __setitem__( self, key, value ):
raise AssertionError("Cannot set value")
class NumInstances( MaxInstance ):
def __init__( self, name, type_cls, **kwds ):
super( NumInstances, self ).__init__( name=name, type_cls=type_cls, **kwds )
@property
def value( self ):
"""Count how many instances are presently in existence; use the parent class MaxInstances.value."""
return sum( lookup( class_id=self.class_id, instance_id=i_id ) is not None
for i_id in range( 1, super( NumInstances, self ).value + 1 ))
def __setitem__( self, key, value ):
raise AssertionError("Cannot set value")
#
# EtherNet/IP CIP Object
#
# Some of the standard objects (Vol 1-3.13, Table 5-1-1):
#
# Class Code Object
# ---------- ------
# 0x01 Identity
# 0x02 Message Router
# 0x03 DeviceNet
# 0x04 Assembly
# 0x05 Connection
# 0x06 Connection Manager
# 0x07 Register
#
# Figure 1-4.1 CIP Device Object Model
# +-------------+
# Unconnected -------------------------------->| Unconnected |
# Explicit Messages <--------------------------------| Message |
# | Manager |
# +-------------+
# |^
# ||
# || +-------------+
# || | Link |
# || | Specific |
# || | Objects |
# || +-------------+
# v| ^v
# +-------------+ ||
# Connection --> Explcit | Message | ||
# Based <-- Messaging <-- | Router |>-----+|
# Explicit Connection --> | |<------+
# Message Objects +-------------+
# |^
# ||
# ||
# ||
# v|
# +-------------+
# I/O --> I/O ..+ | Application |
# Messages <-- Connection v <.. | Objects |
# Objects ..+ --> | |
# +-------------+
#
#
#
class RequestUnrecognized( AssertionError ):
"""If a Request/Reply cannot be parsed"""
class Object( object ):
"""An EtherNet/IP device.Object is capable of parsing and processing a number of requests. It has
a class_id and an instance_id; an instance_id of 0 indicates the "class" instance of the
device.Object, which has different (class level) Attributes (and may respond to different
commands) than the other instance_id's. An instance_id will be dynamically allocated, if one
isn't specified.
Each Object has a single class-level parser, which is used to register all of its available
service request parsers. The next available symbol designates the type of service request,
eg. 0x01 ==> Get Attributes All. These parsers enumerate the requests that are *possible* on
the Object. Later, when the Object is requested to actually process the request, a decision can
be made about whether the request is *allowed*.
The parser knows how to parse any requests it must handle, and any replies it can generate, and
puts the results into the provided data artifact.
Assuming Obj is an instance of Object, and the source iterator produces the incoming symbols:
0x52, 0x04, 0x91, 0x05, 0x53, 0x43, 0x41, 0x44, #/* R...SCAD */
0x41, 0x00, 0x14, 0x00, 0x02, 0x00, 0x00, 0x00, #/* A....... */
then we could run the parser:
data = dotdict()
with Obj.parse as machine:
for m,w in machine.run( source=source, data=data ):
pass
and it would parse a recognized command (or reply, but that would be unexpected), and produce
the following entries (in data, under the current context):
'service': 0x52,
'path.segment': [{'symbolic': 'SCADA', 'length': 5}],
'read_frag.elements': 20,
'read_frag.offset': 2,
Then, we could process the request:
proceed = Obj.request( data )
and this would process a request, converting it into a reply (any data elements unchanged by the
reply remain):
'service': 0xd2, # changed: |= 0x80
'status': 0x00, # default if not specified
'path.segment': [{'symbolic': 'SCADA', 'length': 5}], # unchanged
'read_frag.elements': 20, # unchanged
'read_frag.offset': 2, # unchanged
'read_frag.type': 0x00c3, # produced for reply
'read_frag.data': [ # produced for response
0x104c, 0x0008,
0x0003, 0x0002, 0x0002, 0x0002,
0x000e, 0x0000, 0x0000, 0x42e6,
0x0007, 0x40c8, 0x40c8, 0x0000,
0x00e4, 0x0000, 0x0064, 0x02b2,
0x80c8
]
'input': bytearray( [ # encoded response payload
0xd2, 0x00, #/* ....,... */
0x00, 0x00, 0xc3, 0x00, 0x4c, 0x10, 0x08, 0x00, #/* ....L... */
0x03, 0x00, 0x02, 0x00, 0x02, 0x00, 0x02, 0x00, #/* ........ */
0x0e, 0x00, 0x00, 0x00, 0x00, 0x00, 0xe6, 0x42, #/* .......B */
0x07, 0x00, 0xc8, 0x40, 0xc8, 0x40, 0x00, 0x00, #/* ...@.@.. */
0xe4, 0x00, 0x00, 0x00, 0x64, 0x00, 0xb2, 0x02, #/* ....d... */
0xc8, 0x80 #/* .@ */
]
The response payload is also produced as a bytes array in data.input, encoded and ready for
transmission, or encapsulation by the next higher level of request processor (eg. a
Message_Router, encapsulating the response into an EtherNet/IP response).
If desired, invoke Object.config_loader.read method on a sequence of configuration file names
(see Python3 configparser for format), before first Object is created.
"""
max_instance = 0
# A derived class may specify its own parser and lock (and service, transit, used for logging)
# This would be necessary if the same service numbers are used for different services.
service = {} # Service number/name mappings
transit = {} # Symbol to transition to service parser on
# The parser doesn't add a layer of context; run it with a path= keyword to add a layer
lock = threading.Lock()
parser = dfa_post( service, initial=state( 'Obj svc' ),
terminal=True )
# No config, by default (use default values). Allows ${<section>:<key>} interpolation, and
# comments anywhere via the # symbol (this implies no # allowed in any value, due to the lack of
# support for escape symbol).
#
# If config files are desired, somewhere early in program initialization, add:
#
# Object.config_loader.read( ["<config-file>", ...] )
#
config_loader = configparser.ConfigParser(
comment_prefixes=('#',), inline_comment_prefixes=('#',),
allow_no_value=True, empty_lines_in_values=False,
interpolation=configparser.ExtendedInterpolation() )
@classmethod
def config_section( cls, section ):
if section and section in cls.config_loader:
log.detail( "[{section}]".format( section=section ))
return cls.config_loader[section]
log.detail( "[{section}]".format( section='DEFAULT' ))
return cls.config_loader['DEFAULT']
@classmethod
def config_override( cls, val, key, default=None, config=None, section=None ):
"""Use the provided val (or get key's value from config, toggling '_'/' ' so either
"Some Thing" or "some_thing" are acceptable config file keys), converting to type of
default.
The configparser doesn't support accessing dicts in 'k.r...' form, so we'll split on '.'
and attempt to convert the discovered config[k] to a dotdict, and index it by 'r...'.
"""
if config is None:
config = cls.config_section( section ) # if neither, uses 'DEFAULT'
if val is None:
for kr in ( key, key.replace( '_', ' ' ), key.replace( '_', ' ' )):
for k,r in ( [kr, ''], ) + ( ( kr.split( '.', 1 ), ) if '.' in kr else () ):
if r: # a key.key...;
try:
val = dotdict( config.get( k )).get( r )
log.info( " {k:>10}{r:<10} == {val!r} (config dict)".format( k=k, r=r, val=val ))
except Exception as exc:
log.info( " {k:>10}{r:<10}: {exc!r}".format( k=k, r=r, exc=exc ))
else:
val = config.get( k, None )
if val is not None:
log.info( " {k:<20} == {val!r} (config)".format( k=k, val=val ))
break
try:
if val is None:
val = default
if val: log.info( " {key:<20} == {val!r:<20} (default)".format( key=key, val=val ))
elif isinstance( default, bool) and \
isinstance( val, type_str_base):
# Python bools supplied as strings or from config files are a special case, eg. 0 or
# "False" ==> False, 1 or "True' ==> True
val = type( default )( ast.literal_eval( val.capitalize() ))
elif isinstance( default, (list, dict )) and \
isinstance( val, type_str_base):
# Other complex types eg. "[ ... ]" must be obtained by ast.literal_eval.
val = type( default )( ast.literal_eval( val ))
elif isinstance( default, (bool, int, float, type_str_base)) and \
isinstance( val, (bool, int, float, type_str_base)):
# Otherwise, any basic-typed val supplied or loaded from config file will be converted to
# type of any basic-typed default supplied. This allows conversion of values
# supplied as valid literals, or from string to numeric types.
try:
val = type( default )( val ) # eg. 123, abc
except ValueError:
if isinstance( val, type_str_base ):
val = type( default )( ast.literal_eval( val )) # eg. 0x123, "abc"
else:
raise
# else leave val as str/None
log.detail( " {key:<20} == {val!r}".format( key=key, val=val ))
except Exception as exc:
msg = "Converting {key} from {val!r} failed (default type {typ.__name__}): {exc}".format(
key=key, val=val, typ=type( default ), exc=exc )
log.warning( " %s", msg )
raise Exception( msg )
return val
@classmethod
def register_service_parser( cls, number, name, short, machine ):
"""Registers a parser with the Object. May be invoked during import; no logging. Allows a single
"default" parser w/ number == True to be defined. So, use our parser's .encode() method to
process the number, to convert True/None to state.ALL/NON
"""
enc = cls.parser.encode( number )
assert enc not in cls.service, \
"Duplicate service #%s: %r number registered for Object %s" % ( number, name, cls.__name__ )
assert name not in cls.service, \
"Duplicate service #%s: %r name registered for Object %s" % ( number, name, cls.__name__ )
cls.service[enc] = name
cls.service[name] = enc
cls.transit[enc] = ( chr( enc )
if sys.version_info[0] < 3 and enc >= 0
else enc )
cls.parser.initial[cls.transit[enc]] \
= dfa( name=short, initial=machine, terminal=True )
GA_ALL_NAM = "Get Attributes All"
GA_ALL_CTX = "get_attributes_all"
GA_ALL_REQ = 0x01
GA_ALL_RPY = GA_ALL_REQ | 0x80
SA_ALL_NAM = "Set Attributes All"
SA_ALL_CTX = "set_attributes_all"
SA_ALL_REQ = 0x02
SA_ALL_RPY = SA_ALL_REQ | 0x80
GA_LST_NAM = "Get Attribute List"
GA_LST_CTX = "get_attribute_list"
GA_LST_REQ = 0x03
GA_LST_RPY = GA_LST_REQ | 0x80
GA_SNG_NAM = "Get Attribute Single"
GA_SNG_CTX = "get_attribute_single"
GA_SNG_REQ = 0x0E
GA_SNG_RPY = GA_SNG_REQ | 0x80
SA_SNG_NAM = "Set Attribute Single"
SA_SNG_CTX = "set_attribute_single"
SA_SNG_REQ = 0x10
SA_SNG_RPY = SA_SNG_REQ | 0x80
SV_COD_NAM = "Service Code"
SV_COD_CTX = "service_code"
@property
def config( self ):
if self._config is None:
self._config = self.config_section( self.name )
return self._config
@misc.logresult( log=log, log_level=logging.DETAIL )
def config_str( self, *args, **kwds ):
return self.config.get( *args, **kwds )
@misc.logresult( log=log, log_level=logging.DETAIL )
def config_int( self, *args, **kwds ):
return self.config.getint( *args, **kwds )
@misc.logresult( log=log, log_level=logging.DETAIL )
def config_float( self, *args, **kwds ):
return self.config.getfloat( *args, **kwds )
@misc.logresult( log=log, log_level=logging.DETAIL )
def config_bool( self, *args, **kwds ):
return self.config.getboolean( *args, **kwds )
@misc.logresult( log=log, log_level=logging.DETAIL )
def config_json( self, *args, **kwds ):
return json.loads( self.config_str( *args, **kwds ))
def __init__( self, name=None, instance_id=None ):
"""Create the instance (default to the next available instance_id). An instance_id of 0 holds the
"class" attributes/commands. Any configured values for the Object are available in
self.config via its get/getint/getfloat/getboolean( <name>, <default> ) methods.
[Object Name]
a key = some value
"""
self._config = None
self.name = name or self.__class__.__name__
# Allocate and/or keep track of maximum instance ID assigned thus far.
if instance_id is None:
instance_id = self.__class__.max_instance + 1
if instance_id > self.__class__.max_instance:
self.__class__.max_instance = instance_id
self.instance_id = instance_id
( log.detail if self.instance_id else log.info )(
"%24s, Class ID 0x%04x, Instance ID %3d created",
self, self.class_id, self.instance_id )
instance = lookup( self.class_id, instance_id )
assert instance is None, \
"CIP Object class %x, instance %x already exists\n%s" % (
self.class_id, self.instance_id, ''.join( traceback.format_stack() ))
#
# directory.1.2.None == self