This document describes the current stable version of Kombu (5.3). For development docs, go here.

Source code for kombu.utils.eventio

"""Selector Utilities."""

from __future__ import annotations

import errno
import math
import select as __select__
import sys
from numbers import Integral

from . import fileno
from .compat import detect_environment

__all__ = ('poll',)

_selectf = __select__.select
_selecterr = __select__.error
xpoll = getattr(__select__, 'poll', None)
epoll = getattr(__select__, 'epoll', None)
kqueue = getattr(__select__, 'kqueue', None)
kevent = getattr(__select__, 'kevent', None)
KQ_EV_ADD = getattr(__select__, 'KQ_EV_ADD', 1)
KQ_EV_DELETE = getattr(__select__, 'KQ_EV_DELETE', 2)
KQ_EV_ENABLE = getattr(__select__, 'KQ_EV_ENABLE', 4)
KQ_EV_CLEAR = getattr(__select__, 'KQ_EV_CLEAR', 32)
KQ_EV_ERROR = getattr(__select__, 'KQ_EV_ERROR', 16384)
KQ_EV_EOF = getattr(__select__, 'KQ_EV_EOF', 32768)
KQ_FILTER_READ = getattr(__select__, 'KQ_FILTER_READ', -1)
KQ_FILTER_WRITE = getattr(__select__, 'KQ_FILTER_WRITE', -2)
KQ_FILTER_AIO = getattr(__select__, 'KQ_FILTER_AIO', -3)
KQ_FILTER_VNODE = getattr(__select__, 'KQ_FILTER_VNODE', -4)
KQ_FILTER_PROC = getattr(__select__, 'KQ_FILTER_PROC', -5)
KQ_FILTER_SIGNAL = getattr(__select__, 'KQ_FILTER_SIGNAL', -6)
KQ_FILTER_TIMER = getattr(__select__, 'KQ_FILTER_TIMER', -7)
KQ_NOTE_LOWAT = getattr(__select__, 'KQ_NOTE_LOWAT', 1)
KQ_NOTE_DELETE = getattr(__select__, 'KQ_NOTE_DELETE', 1)
KQ_NOTE_WRITE = getattr(__select__, 'KQ_NOTE_WRITE', 2)
KQ_NOTE_EXTEND = getattr(__select__, 'KQ_NOTE_EXTEND', 4)
KQ_NOTE_ATTRIB = getattr(__select__, 'KQ_NOTE_ATTRIB', 8)
KQ_NOTE_LINK = getattr(__select__, 'KQ_NOTE_LINK', 16)
KQ_NOTE_RENAME = getattr(__select__, 'KQ_NOTE_RENAME', 32)
KQ_NOTE_REVOKE = getattr(__select__, 'KQ_NOTE_REVOKE', 64)
POLLIN = getattr(__select__, 'POLLIN', 1)
POLLOUT = getattr(__select__, 'POLLOUT', 4)
POLLERR = getattr(__select__, 'POLLERR', 8)
POLLHUP = getattr(__select__, 'POLLHUP', 16)
POLLNVAL = getattr(__select__, 'POLLNVAL', 32)

READ = POLL_READ = 0x001
WRITE = POLL_WRITE = 0x004
ERR = POLL_ERR = 0x008 | 0x010

try:
    SELECT_BAD_FD = {errno.EBADF, errno.WSAENOTSOCK}
except AttributeError:
    SELECT_BAD_FD = {errno.EBADF}


class _epoll:

    def __init__(self):
        self._epoll = epoll()

    def register(self, fd, events):
        try:
            self._epoll.register(fd, events)
        except Exception as exc:
            if getattr(exc, 'errno', None) != errno.EEXIST:
                raise
        return fd

    def unregister(self, fd):
        try:
            self._epoll.unregister(fd)
        except (OSError, ValueError, KeyError, TypeError):
            pass
        except OSError as exc:
            if getattr(exc, 'errno', None) not in (errno.ENOENT, errno.EPERM):
                raise

    def poll(self, timeout):
        try:
            return self._epoll.poll(timeout if timeout is not None else -1)
        except Exception as exc:
            if getattr(exc, 'errno', None) != errno.EINTR:
                raise

    def close(self):
        self._epoll.close()


class _kqueue:
    w_fflags = (KQ_NOTE_WRITE | KQ_NOTE_EXTEND |
                KQ_NOTE_ATTRIB | KQ_NOTE_DELETE)

    def __init__(self):
        self._kqueue = kqueue()
        self._active = {}
        self.on_file_change = None
        self._kcontrol = self._kqueue.control

    def register(self, fd, events):
        self._control(fd, events, KQ_EV_ADD)
        self._active[fd] = events
        return fd

    def unregister(self, fd):
        events = self._active.pop(fd, None)
        if events:
            try:
                self._control(fd, events, KQ_EV_DELETE)
            except OSError:
                pass

    def watch_file(self, fd):
        ev = kevent(fd,
                    filter=KQ_FILTER_VNODE,
                    flags=KQ_EV_ADD | KQ_EV_ENABLE | KQ_EV_CLEAR,
                    fflags=self.w_fflags)
        self._kcontrol([ev], 0)

    def unwatch_file(self, fd):
        ev = kevent(fd,
                    filter=KQ_FILTER_VNODE,
                    flags=KQ_EV_DELETE,
                    fflags=self.w_fflags)
        self._kcontrol([ev], 0)

    def _control(self, fd, events, flags):
        if not events:
            return
        kevents = []
        if events & WRITE:
            kevents.append(kevent(fd,
                                  filter=KQ_FILTER_WRITE,
                                  flags=flags))
        if not kevents or events & READ:
            kevents.append(
                kevent(fd, filter=KQ_FILTER_READ, flags=flags),
            )
        control = self._kcontrol
        for e in kevents:
            try:
                control([e], 0)
            except ValueError:
                pass

    def poll(self, timeout):
        try:
            kevents = self._kcontrol(None, 1000, timeout)
        except Exception as exc:
            if getattr(exc, 'errno', None) == errno.EINTR:
                return
            raise
        events, file_changes = {}, []
        for k in kevents:
            fd = k.ident
            if k.filter == KQ_FILTER_READ:
                events[fd] = events.get(fd, 0) | READ
            elif k.filter == KQ_FILTER_WRITE:
                if k.flags & KQ_EV_EOF:
                    events[fd] = ERR
                else:
                    events[fd] = events.get(fd, 0) | WRITE
            elif k.filter == KQ_EV_ERROR:
                events[fd] = events.get(fd, 0) | ERR
            elif k.filter == KQ_FILTER_VNODE:
                if k.fflags & KQ_NOTE_DELETE:
                    self.unregister(fd)
                file_changes.append(k)
        if file_changes:
            self.on_file_change(file_changes)
        return list(events.items())

    def close(self):
        self._kqueue.close()


class _poll:

    def __init__(self):
        self._poller = xpoll()
        self._quick_poll = self._poller.poll
        self._quick_register = self._poller.register
        self._quick_unregister = self._poller.unregister

    def register(self, fd, events):
        fd = fileno(fd)
        poll_flags = 0
        if events & ERR:
            poll_flags |= POLLERR
        if events & WRITE:
            poll_flags |= POLLOUT
        if events & READ:
            poll_flags |= POLLIN
        self._quick_register(fd, poll_flags)
        return fd

    def unregister(self, fd):
        try:
            fd = fileno(fd)
        except OSError as exc:
            # we don't know the previous fd of this object
            # but it will be removed by the next poll iteration.
            if getattr(exc, 'errno', None) in SELECT_BAD_FD:
                return fd
            raise
        self._quick_unregister(fd)
        return fd

    def poll(self, timeout, round=math.ceil,
             POLLIN=POLLIN, POLLOUT=POLLOUT, POLLERR=POLLERR,
             READ=READ, WRITE=WRITE, ERR=ERR, Integral=Integral):
        timeout = 0 if timeout and timeout < 0 else round((timeout or 0) * 1e3)
        try:
            event_list = self._quick_poll(timeout)
        except (_selecterr, OSError) as exc:
            if getattr(exc, 'errno', None) == errno.EINTR:
                return
            raise

        ready = []
        for fd, event in event_list:
            events = 0
            if event & POLLIN:
                events |= READ
            if event & POLLOUT:
                events |= WRITE
            if event & POLLERR or event & POLLNVAL or event & POLLHUP:
                events |= ERR
            assert events
            if not isinstance(fd, Integral):
                fd = fd.fileno()
            ready.append((fd, events))
        return ready

    def close(self):
        self._poller = None


class _select:

    def __init__(self):
        self._all = (self._rfd,
                     self._wfd,
                     self._efd) = set(), set(), set()

    def register(self, fd, events):
        fd = fileno(fd)
        if events & ERR:
            self._efd.add(fd)
        if events & WRITE:
            self._wfd.add(fd)
        if events & READ:
            self._rfd.add(fd)
        return fd

    def _remove_bad(self):
        for fd in self._rfd | self._wfd | self._efd:
            try:
                _selectf([fd], [], [], 0)
            except (_selecterr, OSError) as exc:
                if getattr(exc, 'errno', None) in SELECT_BAD_FD:
                    self.unregister(fd)

    def unregister(self, fd):
        try:
            fd = fileno(fd)
        except OSError as exc:
            # we don't know the previous fd of this object
            # but it will be removed by the next poll iteration.
            if getattr(exc, 'errno', None) in SELECT_BAD_FD:
                return
            raise
        self._rfd.discard(fd)
        self._wfd.discard(fd)
        self._efd.discard(fd)

    def poll(self, timeout):
        try:
            read, write, error = _selectf(
                self._rfd, self._wfd, self._efd, timeout,
            )
        except (_selecterr, OSError) as exc:
            if getattr(exc, 'errno', None) == errno.EINTR:
                return
            elif getattr(exc, 'errno', None) in SELECT_BAD_FD:
                return self._remove_bad()
            raise

        events = {}
        for fd in read:
            if not isinstance(fd, Integral):
                fd = fd.fileno()
            events[fd] = events.get(fd, 0) | READ
        for fd in write:
            if not isinstance(fd, Integral):
                fd = fd.fileno()
            events[fd] = events.get(fd, 0) | WRITE
        for fd in error:
            if not isinstance(fd, Integral):
                fd = fd.fileno()
            events[fd] = events.get(fd, 0) | ERR
        return list(events.items())

    def close(self):
        self._rfd.clear()
        self._wfd.clear()
        self._efd.clear()


def _get_poller():
    if detect_environment() != 'default':
        # greenlet
        return _select
    elif epoll:
        # Py2.6+ Linux
        return _epoll
    elif kqueue and 'netbsd' in sys.platform:
        return _kqueue
    elif xpoll:
        return _poll
    else:
        return _select


[docs] def poll(*args, **kwargs): """Create new poller instance.""" return _get_poller()(*args, **kwargs)