1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
# -*- coding: utf-8 -*-
#
# AWL simulator - Environment variables
#
# Copyright 2017-2019 Michael Buesch <m@bues.ch>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
from __future__ import division, absolute_import, print_function, unicode_literals
#from awlsim.common.cython_support cimport * #@cy
from awlsim.common.compat import *
from awlsim.common.util import *
from awlsim.common.exceptions import *
import os
import gc
__all__ = [
"AwlSimEnv",
]
class AwlSimEnv(object):
"""Central environment variable handler.
"""
@classmethod
def getEnv(cls):
"""Get a copy of the environment dict.
"""
return dict(os.environ)
@classmethod
def clearLang(cls, env, lang="C"):
"""Reset the language settings of an environment dict
to some expected value and return the result.
"""
env = dict(env)
env["LANG"] = lang
for i in {"LANGUAGE", "LC_CTYPE", "LC_NUMERIC",
"LC_TIME", "LC_COLLATE", "LC_MONETARY",
"LC_MESSAGES", "LC_PAPER", "LC_NAME",
"LC_ADDRESS", "LC_TELEPHONE", "LC_MEASUREMENT",
"LC_IDENTIFICATION",}:
env.pop(i, None)
return env
@classmethod
def __getVar(cls, name, default=None):
return cls.getEnv().get("AWLSIM_" + name, default)
@classmethod
def getProfileLevel(cls):
"""Get AWLSIM_PROFILE.
"""
profileLevel = cls.__getVar("PROFILE", "0")
try:
profileLevel = int(profileLevel)
except ValueError as e:
profileLevel = 0
return clamp(profileLevel, 0, 2)
@classmethod
def getGuiFramework(cls):
"""Get AWLSIM_GUI.
"""
return cls.__getVar("GUI", "auto").lower()
@classmethod
def getAffinity(cls):
"""Get AWLSIM_AFFINITY.
Returns a list of host CPU indices or an empty list,
if all host CPUs are allowed.
"""
affinityStr = cls.__getVar("AFFINITY", "")
affinity = []
try:
for cpuIndex in affinityStr.split(","):
cpuIndex = int(cpuIndex)
if cpuIndex < 0:
try:
import multiprocessing
cpuIndex = multiprocessing.cpu_count() + cpuIndex
except ImportError as e:
pass
if cpuIndex < 0:
cpuIndex = 0
affinity.append(cpuIndex)
except ValueError as e:
affinity = []
return sorted(set(affinity))
SCHED_DEFAULT = "default" # Do not change the scheduling policy
SCHED_NORMAL = "normal" # Use non-realtime scheduling policy
SCHED_FIFO = "fifo" # Use SCHED_FIFO realtime scheduling policy
SCHED_RR = "rr" # Use SCHED_RR realtime scheduling policy
SCHED_DEADLINE = "deadline" # Use SCHED_DEADLINE realtime scheduling policy
@classmethod
def getSched(cls):
"""Get AWLSIM_SCHED.
Returns one of the SCHED_... constants.
Returns None, if AWLSIM_SCHED has an invalid value.
"""
schedStr = cls.__getVar("SCHED", "").lower().strip()
if schedStr == cls.SCHED_DEFAULT:
return cls.SCHED_DEFAULT
if schedStr == cls.SCHED_NORMAL or schedStr == "other":
return cls.SCHED_NORMAL
if schedStr == cls.SCHED_FIFO or schedStr == "realtime":
return cls.SCHED_FIFO
if schedStr == cls.SCHED_RR:
return cls.SCHED_RR
if schedStr == cls.SCHED_DEADLINE:
return cls.SCHED_DEADLINE
return None
@classmethod
def getPrio(cls):
"""Get AWLSIM_PRIO.
Returns the scheduling priority as an integer or None.
"""
prioStr = cls.__getVar("PRIO", "").lower().strip()
if prioStr != "default":
try:
return int(prioStr)
except ValueError as e:
pass
return None
GCMODE_RT = "realtime" # Manual GC, if realtime scheduling
GCMODE_AUTO = "auto" # Automatic GC
GCMODE_MANUAL = "manual" # Manual GC
@classmethod
def getGcMode(cls):
"""Get AWLSIM_GCMODE.
Returns one of the GCMODE_... constants.
"""
gcModeStr = cls.__getVar("GCMODE", "").lower().strip()
if gcModeStr == cls.GCMODE_RT:
return cls.GCMODE_RT
if gcModeStr == cls.GCMODE_AUTO:
return cls.GCMODE_AUTO
if gcModeStr == cls.GCMODE_MANUAL:
return cls.GCMODE_MANUAL
return cls.GCMODE_RT
@classmethod
def getGcThreshold(cls, generation):
"""Get AWLSIM_GCTHRES.
AWLSIM_GCTHRES is a comma separated string with up to 3 integers.
Each integer corresponding to the generation 0 to 2 thresholds.
Returns the garbage collector threshold for the selected generation.
"""
thresStr = cls.__getVar("GCTHRES", "")
thres = thresStr.split(",")
assert(generation in (0, 1, 2))
try:
return clamp(int(thres[generation]),
0, 0x7FFFFFFF)
except (ValueError, IndexError) as e:
if generation == 0:
gc_get_threshold = getattr(gc, "get_threshold", None)
if gc_get_threshold:
return gc_get_threshold()[0]
return 700
return 1
@classmethod
def getGcCycle(cls):
"""Get AWLSIM_GCCYCLE.
AWLSIM_GCCYCLE is the number of OB1 cycles it takes to trigger
a manual garbage collection.
Returns an integer.
"""
cycStr = cls.__getVar("GCCYCLE", "")
try:
return clamp(int(cycStr), 1, 0xFFFF)
except ValueError as e:
return 64
|