aboutsummaryrefslogtreecommitdiffstats
path: root/awlsim/common/monotonic.py
blob: 09005476363bcb9f88ad28c73bbb1f72b2de3f72 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# -*- coding: utf-8 -*-
#
# AWL simulator - Monotonic timer
#
# Copyright 2018 Michael Buesch <m@bues.ch>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

from __future__ import division, absolute_import, print_function, unicode_literals
#from awlsim.common.cython_support cimport * #@cy
from awlsim.common.compat import *

from awlsim.common.util import *

import time


__all__ = [
	"monotonic_time",
]


class _MONOTONIC_RAW_factory(object): #+cdef
	"""CLOCK_MONOTONIC_RAW wrapper base class.
	"""

	def probe(self):
		"""Probe this timer.
		Returns True, if this timer works correctly.
		"""
		raise NotImplementedError

	def monotonic_raw(self):
		"""Returns the timer as float.
		"""
		raise NotImplementedError

	def _sanityCheck(self):
		"""Check if CLOCK_MONOTONIC_RAW works correctly.
		"""
		try:
			a = self.monotonic_raw()
			time.sleep(1e-3)
			b = self.monotonic_raw()
			if b - a <= 0.0 or b - a > 1.0:
				raise RuntimeError
		except Exception as e: #@nocov
			return False
		return True

class _MONOTONIC_RAW_timemodule_factory(_MONOTONIC_RAW_factory): #+cdef
	"""CLOCK_MONOTONIC_RAW time module wrapper.
	"""

	def probe(self):
		if not hasattr(time, "clock_gettime") or\
		   not hasattr(time, "CLOCK_MONOTONIC_RAW"):
			return False #@nocov

		self.__clock_gettime = time.clock_gettime
		self.__id_CLOCK_MONOTONIC_RAW = time.CLOCK_MONOTONIC_RAW

		if not self._sanityCheck(): #@nocov
			printWarning("CLOCK_MONOTONIC_RAW (time module) does not work "
				     "correctly on this system. Falling "
				     "back to an alternative.")
			return False
		return True

	def monotonic_raw(self):
		return self.__clock_gettime(self.__id_CLOCK_MONOTONIC_RAW)

class _MONOTONIC_RAW_CFFI_factory(_MONOTONIC_RAW_factory): #+cdef
	"""CLOCK_MONOTONIC_RAW CFFI wrapper.
	"""

	def probe(self):
		if not osIsLinux:
			printInfo("CLOCK_MONOTONIC_RAW is only available on Linux.")
			return False

		try:
			from cffi import FFI
		except ImportError as e:
			printWarning("Failed to import CFFI: %s\n"
				     "Cannot use CLOCK_MONOTONIC_RAW via CFFI." % (
				     str(e)))
			return False

		self.__id_CLOCK_MONOTONIC_RAW = 4

		self.__ffi = FFI()
		getattr(self.__ffi, "cdef")("""
			typedef int clockid_t;
			struct timespec {
				long tv_sec;
				long tv_nsec;
			};
			int clock_gettime(clockid_t clk_id, struct timespec *tp);
		""")
		self.__c = self.__ffi.dlopen(None)
		self.__ts = self.__ffi.new("struct timespec *")

		if not self._sanityCheck():
			printWarning("CLOCK_MONOTONIC_RAW (CFFI) does not work "
				     "correctly on this system. Falling "
				     "back to an alternative.")
			return False
		return True

	def monotonic_raw(self):
		ts = self.__ts
		if self.__c.clock_gettime(self.__id_CLOCK_MONOTONIC_RAW, ts):
			raise OSError("CLOCK_MONOTONIC_RAW failed.")
		return float(ts.tv_sec) + (float(ts.tv_nsec) / 1e9)

def _monotonic_time_init():
	"""Initialize _monotonicTimeHandler.
	This will be called on the first call to monotonic_time().
	"""
	global _monotonicTimeHandler

	# Probe time.clock_gettime(CLOCK_MONOTONIC_RAW)
	factory = _MONOTONIC_RAW_timemodule_factory()
	if factory.probe():
		printVerbose("Using CLOCK_MONOTONIC_RAW (time module) as monotonic timer.")
		_monotonicTimeHandler = factory.monotonic_raw
		return _monotonicTimeHandler()

	# Probe CFFI clock_gettime(CLOCK_MONOTONIC_RAW)
	factory = _MONOTONIC_RAW_CFFI_factory()
	if factory.probe():
		printVerbose("Using CLOCK_MONOTONIC_RAW (CFFI) as monotonic timer.")
		_monotonicTimeHandler = factory.monotonic_raw
		return _monotonicTimeHandler()

	# Probe time.perf_counter
	_monotonicTimeHandler = getattr(time, "perf_counter", None)
	if _monotonicTimeHandler:
		printInfo("Using time.perf_counter() as monotonic timer.")
		return _monotonicTimeHandler()

	# Probe time.monotonic
	_monotonicTimeHandler = getattr(time, "monotonic", None)
	if _monotonicTimeHandler:
		printInfo("Using time.monotonic() as monotonic timer.")
		return _monotonicTimeHandler()

	# Out of luck. Use non-monotonic time.time
	printWarning("Falling back to non-monotonic time.time() timer.")
	_monotonicTimeHandler = time.time
	return _monotonicTimeHandler()

#cdef object _monotonicTimeHandler #@cy
_monotonicTimeHandler = _monotonic_time_init

#cdef double _prevMonotonicTime #@cy
_prevMonotonicTime = 0.0

# Get a monotonic time count, as float second count.
# The reference is undefined, so this can only be used for relative times.
def monotonic_time(): #@nocy
#cdef double monotonic_time(): #@cy
#@cy	cdef double t
	global _monotonicTimeHandler
	global _prevMonotonicTime

	try:
		t = _prevMonotonicTime = _monotonicTimeHandler()
	except Exception as e: #@nocov
		t = _prevMonotonicTime
	return t
bues.ch cgit interface