Skip to content
This repository was archived by the owner on Apr 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions async/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ def _init_atexit():
"""Setup an at-exit job to be sure our workers are shutdown correctly before
the interpreter quits"""
import atexit
import thread
from . import thread
atexit.register(thread.do_terminate_threads)

def _init_signals():
"""Assure we shutdown our threads correctly when being interrupted"""
import signal
import thread
from . import thread
import sys

prev_handler = signal.getsignal(signal.SIGINT)
def thread_interrupt_handler(signum, frame):
thread.do_terminate_threads()
if callable(prev_handler):
if isinstance(prev_handler, collections.Callable):
prev_handler(signum, frame)
raise KeyboardInterrupt()
# END call previous handler
Expand All @@ -30,7 +30,7 @@ def thread_interrupt_handler(signum, frame):
signal.signal(signal.SIGINT, thread_interrupt_handler)
except ValueError:
# happens if we don't try it from the main thread
print >> sys.stderr, "Failed to setup thread-interrupt handler. This is usually not critical"
print("Failed to setup thread-interrupt handler. This is usually not critical", file=sys.stderr)
# END exception handling


Expand All @@ -41,6 +41,7 @@ def thread_interrupt_handler(signum, frame):


# initial imports
from task import *
from pool import *
from channel import *
from .task import *
from .pool import *
from .channel import *
import collections
14 changes: 7 additions & 7 deletions async/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
# This module is part of async and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Contains a queue based channel implementation"""
from Queue import (
from queue import (
Empty,
Full
)

from util import (
from .util import (
AsyncQueue,
SyncQueue,
ReadOnly
Expand Down Expand Up @@ -154,7 +154,7 @@ def __init__(self, device):
def __iter__(self):
return self

def next(self):
def __next__(self):
"""Implements the iterator protocol, iterating individual items"""
items = self.read(1)
if items:
Expand Down Expand Up @@ -220,7 +220,7 @@ def read(self, count=0, block=True, timeout=None):
out.append(queue.get(False))
# END for each item
else:
for i in xrange(count):
for i in range(count):
out.append(queue.get(False))
# END for each item
# END handle count
Expand All @@ -230,7 +230,7 @@ def read(self, count=0, block=True, timeout=None):
else:
# to get everything into one loop, we set the count accordingly
if count == 0:
count = sys.maxint
count = sys.maxsize
# END handle count

i = 0
Expand Down Expand Up @@ -353,9 +353,9 @@ def read(self, count=0, block=True, timeout=None):
else:
out = list()
it = self._iter
for i in xrange(count):
for i in range(count):
try:
out.append(it.next())
out.append(next(it))
except StopIteration:
self._empty = True
break
Expand Down
67 changes: 66 additions & 1 deletion async/mod/zlibmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,21 @@

/* Windows users: read Python's PCbuild\readme.txt */


#include "Python.h"
#include "zlib.h"

#if PY_MAJOR_VERSION >= 3
#define PyInt_FromLong PyLong_FromLong
#define PyString_FromString PyUnicode_FromString
#define PyString_FromStringAndSize PyUnicode_FromStringAndSize
#define PyString_AS_STRING PyUnicode_AS_UNICODE
#define _PyString_Resize PyUnicode_Resize
#define PyText_AS_UTF8 _PyUnicode_AsString
#define PyText_Check PyUnicode_Check
#endif



#ifdef WITH_THREAD
#include "pythread.h"

Expand Down Expand Up @@ -897,6 +908,23 @@ static PyMethodDef Decomp_methods[] =
{NULL, NULL}
};


/*
Py_FindMethod gone in Python 3, so Comp_getattr and Decomp_getattr
have to be rewritten. I googled the following tip somewhere:

"The same functionality can be achieved with the tp_getattro slot:
implement your special dynamic attributes there, and then call
PyObject_GenericGetAttr for the default behavior. You may have a
look at the implementation of the pyexpat module:
Modules/pyexpat.c, function xmlparse_getattro."

Looking at xmlparse_getattro [1] it seems it could be readily adopted
here.

[1] http://svn.python.org/projects/python/branches/py3k/Modules/pyexpat.c
*/

static PyObject *
Comp_getattr(compobject *self, char *name)
{
Expand Down Expand Up @@ -1052,17 +1080,50 @@ PyDoc_STRVAR(zlib_module_documentation,
"Compressor objects support compress() and flush() methods; decompressor\n"
"objects support decompress() and flush().");

#if PY_MAJOR_VERSION >= 3
/* See http://python3porting.com/cextensions.html */
static struct PyModuleDef zlib_moddef = {
PyModuleDef_HEAD_INIT,
"zlib",
zlib_module_documentation,
-1,
zlib_methods,
NULL, NULL, NULL, NULL
};
#endif



PyMODINIT_FUNC
PyInit_zlib(void)
{
PyObject *m, *ver;

#if PY_MAJOR_VERSION >= 3
/* Use this version check as a replacement for Py_InitModule4 */
ver = PySys_GetObject("version");
if (ver == NULL || !PyText_Check(ver) ||
strncmp(PyText_AS_UTF8(ver), PY_VERSION, 3) != 0) {
PyErr_Format(PyExc_ImportError,
"this module was compiled for Python %c%c%c",
PY_VERSION[0], PY_VERSION[1], PY_VERSION[2]);
return NULL;
}
#endif

Py_TYPE(&Comptype) = &PyType_Type;
Py_TYPE(&Decomptype) = &PyType_Type;
#if PY_MAJOR_VERSION >= 3
m = PyModule_Create(&zlib_moddef);
if (m == NULL)
return m;
#else
m = Py_InitModule4("zlib", zlib_methods,
zlib_module_documentation,
(PyObject*)NULL,PYTHON_API_VERSION);
if (m == NULL)
return;
#endif

ZlibError = PyErr_NewException("zlib.error", NULL, NULL);
if (ZlibError != NULL) {
Expand Down Expand Up @@ -1101,4 +1162,8 @@ PyInit_zlib(void)
PyModule_AddObject(m, "ZLIB_VERSION", ver);

PyModule_AddStringConstant(m, "__version__", "1.0");

#if PY_MAJOR_VERSION >= 3
return m;
#endif
}
15 changes: 8 additions & 7 deletions async/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@
# This module is part of async and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Implementation of a thread-pool working with channels"""
from thread import (
from .thread import (
WorkerThread,
StopProcessing,
)
from threading import Lock

from util import (
from .util import (
AsyncQueue,
DummyLock
)

from Queue import (
from queue import (
Queue,
Empty
)

from graph import Graph
from channel import (
from .graph import Graph
from .channel import (
mkchannel,
ChannelWriter,
Channel,
Expand All @@ -31,6 +31,7 @@
import sys
import weakref
from time import sleep
from functools import reduce


__all__ = ('PoolReader', 'Pool', 'ThreadPool')
Expand Down Expand Up @@ -284,7 +285,7 @@ def _prepare_channel_read(self, task, count):
# to process too much. This can be defined per task
qput = self._queue.put
if numchunks > 1:
for i in xrange(numchunks):
for i in range(numchunks):
qput((task.process, chunksize))
# END for each chunk to put
else:
Expand All @@ -297,7 +298,7 @@ def _prepare_channel_read(self, task, count):
else:
# no workers, so we have to do the work ourselves
if numchunks > 1:
for i in xrange(numchunks):
for i in range(numchunks):
task.process(chunksize)
# END for each chunk to put
else:
Expand Down
10 changes: 5 additions & 5 deletions async/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
#
# This module is part of async and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
from graph import Node
from util import ReadOnly
from channel import IteratorReader
from .graph import Node
from .util import ReadOnly
from .channel import IteratorReader

import threading
import weakref
Expand Down Expand Up @@ -128,7 +128,7 @@ def process(self, count=0):
self._num_writers -= 1
self._wlock.release()
# END handle writer count
except Exception, e:
except Exception as e:
# be sure our task is not scheduled again
self.set_done()

Expand Down Expand Up @@ -226,7 +226,7 @@ def reader(self):
""":return: input channel from which we read"""
# the instance is bound in its instance method - lets use this to keep
# the refcount at one ( per consumer )
return self._read.im_self
return self._read.__self__

def set_read(self, read):
"""Adjust the read method to the given one"""
Expand Down
6 changes: 3 additions & 3 deletions async/test/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _assert(self, pc, fc, check_scheduled=False):
:return: self"""
self.lock.acquire()
if self.item_count != fc:
print self.item_count, fc
print(self.item_count, fc)
assert self.item_count == fc
self.lock.release()

Expand Down Expand Up @@ -166,7 +166,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
tasks = [feeder]

inrc = frc
for tc in xrange(count):
for tc in range(count):
t = transformercls(inrc, tc+id_offset, None)

t.fun = make_proxy_method(t)
Expand Down Expand Up @@ -198,7 +198,7 @@ def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs):
""":return: task which yields ni items
:param taskcls: the actual iterator type to use
:param kwargs: additional kwargs to be passed to the task"""
t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
t = taskcls(iter(list(range(ni))), 'iterator', None, **kwargs)
if isinstance(t, _TestTaskBase):
t.fun = make_proxy_method(t)
return t
Expand Down
6 changes: 3 additions & 3 deletions async/test/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This module is part of async and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Channel testing"""
from lib import *
from .lib import *
from async.channel import *

import time
Expand Down Expand Up @@ -83,7 +83,7 @@ def post_read(items):


# ITERATOR READER
reader = IteratorReader(iter(range(10)))
reader = IteratorReader(iter(list(range(10))))
assert len(reader.read(2)) == 2
assert len(reader.read(0)) == 8
# its empty now
Expand All @@ -95,7 +95,7 @@ def post_read(items):


# test general read-iteration - its supported by all readers
reader = IteratorReader(iter(range(10)))
reader = IteratorReader(iter(list(range(10))))
assert len(list(reader)) == 10

# NOTE: its thread-safety is tested by the pool
Expand Down
6 changes: 3 additions & 3 deletions async/test/test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This module is part of async and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Module containing examples from the documentaiton"""
from lib import *
from .lib import *

from async.pool import *
from async.task import *
Expand All @@ -25,7 +25,7 @@ def test_usage(self):
assert p.size() == 1

# A task performing processing on items from an iterator
t = IteratorThreadTask(iter(range(10)), "power", lambda i: i*i)
t = IteratorThreadTask(iter(list(range(10))), "power", lambda i: i*i)
reader = p.add_task(t)

# read all items - they where procesed by worker 1
Expand All @@ -34,7 +34,7 @@ def test_usage(self):


# chaining
t = IteratorThreadTask(iter(range(10)), "power", lambda i: i*i)
t = IteratorThreadTask(iter(list(range(10))), "power", lambda i: i*i)
reader = p.add_task(t)

# chain both by linking their readers
Expand Down
2 changes: 1 addition & 1 deletion async/test/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This module is part of async and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Channel testing"""
from lib import *
from .lib import *
from async.graph import *

import time
Expand Down
Loading