Programmer's Python Async - Sharing Data Pipes & Queues
Written by Mike James   
Tuesday, 18 July 2023
Article Index
Programmer's Python Async - Sharing Data Pipes & Queues
Queue Example
Pipes

If you are going in for process-based asychronous code you need to find ways to share data - it doesn't just happen as it does with threads. Find out how to share data safely between processes in this extract from Programmer's Python: Async.

Programmer's Python:
Async
Threads, processes, asyncio & more

Is now available as a print book: Amazon

pythonAsync360Contents

1)  A Lightning Tour of Python

Python's Origins, Basic Python, Data Structures, Control Structures – Loops, Space Matters, Conditionals and Indenting, Pattern Matching, Everything Is An Object – References, Functions , Objects and Classes, Inheritance, Main and Modules, IDEs for Python, Pythonic – The Meta Philosophy, Where Next, Summary.

2) Asynchronous Explained

A Single Thread, Processes, I/O-Bound and CPU-Bound, Threads, Locking, Deadlock, Processes with Multiple Threads, Single-Threaded Async, Events,,Events or Threads, Callback Hell, More Than One CPU – Concurrency, Summary.

3) Processed-Based Parallelism

Extract 1 Process Based Parallism
The Process Class, Daemon, Waiting for Processes, Waiting for the First to Complete, Computing Pi, Fork v Spawn, Forkserve, Controlling Start Method, Summary.

4) Threads

Extract 1 -- Threads
The Thread Class, Threads and the GIL, Threading Utilities, Daemon Threads, Waiting for a Thread, Local Variables, Thread Local Storage, Computing Pi with Multiple Threads, I/O-Bound Threads, Sleep(0), Timer Object, Summary.

5) Locks and Deadlock

Race Conditions, Hardware Problem or Heisenbug, Locks, Locks and Processes, Deadlock, Context Managed Locks, Recursive Lock, Semaphore, Atomic Operations, Atomic CPython, Lock-Free Code, Computing Pi Using Locks, Summary.

6) Synchronization

Join, First To Finish, Events, Barrier, Condition Object, The Universal Condition Object, Summary.

7) Sharing Data

Extract 1 - Pipes & Queues
The Queue, Pipes, Queues for Threads, Shared Memory,  Shared ctypes, Raw Shared Memory, Shared Memory, Manager, Computing Pi , Summary.

8) The Process Pool

Waiting for Pool Processes, Computing Pi using AsyncResult, Map_async, Starmap_async, Immediate Results – imap, MapReduce, Sharing and Locking, Summary.

9) Process Managers

The SyncManager, How Proxies Work, Locking, Computing Pi with a Manager, Custom Managers, A Custom Data Type, The BaseProxy, A Property Proxy, Remote Managers, A Remote Procedure Call, Final Thoughts, Summary.

10) Subprocesses

Running a program, Input/Output, Popen, Interaction, Non-Blocking Read Pipe, Using subprocess, Summary.

11) Futures

Futures, Executors, I/O-Bound Example, Waiting On Futures, Future Done Callbacks, Dealing With Exceptions, Locking and Sharing Data, Locking and Process Parameters, Using initializer to Create Shared Globals, Using a Process Manager to Share Resources, Sharing Futures and Deadlock, Computing Pi with Futures, Process Pool or Concurrent Futures, Summary.

12) Basic Asyncio

Extract 1 Basic Asyncio
Callbacks, Futures and Await, Coroutines, Await, Awaiting Sleep, Tasks, Execution Order, Tasks and Futures, Waiting On Coroutines, Sequential and Concurrent, Canceling Tasks, Dealing With Exceptions, Shared Variables and Locks, Context Variables, Queues, Summary.

13) Using asyncio

Extract 1 Asyncio Web Client
Streams, Downloading a Web Page, Server, A Web Server, SSL Server, Using Streams, Converting Blocking To Non-blocking, Running in Threads, Why Not Just Use Threads, CPU-Bound Tasks, Asyncio-Based Modules, Working With Other Event Loops – Tkinter, Subprocesses, Summary.

14) The Low-Level API

Extract 1 - Streams & Web Clients
The Event Loop, Using the Loop, Executing Tasks in Processes, Computing Pi With asyncio, Network Functions,
Transports and Protocols, A UDP Server, A UDP Client, Broadcast UDP, Sockets, Event Loop Implementation, What Makes a Good Async Operation, Summary.

Appendix I Python in Visual Studio Code

 

Processes have access to the same range of synchronization primitives as threads. You can use Lock, Rlock, Event, Semaphore, Barrier and Condition with processes in almost exactly the same way as with threads. What is very different, however, is that processes do not share global variables and thus there is very little to lock! Of course, for processes to work together towards some common objective they need to share some data and there are a number of different ways of doing this. There are two shared data structures, the Queue and the Pipe, which are easy to use and usually powerful enough for most problems. The Queue has the advantage of being usable by processes and threads. The Pipe is closer to the operating system.

Beyond these two data structures there are some more sophisticated and flexible options. You can use a shared area of memory to transfer data directly between any number of processes. This is made easier by the use of the ctypes module which allows the specification of Python types to C types.

Finally we have raw shared memory, which is very close to the way the hardware allows processes to share data. The only problem with this alternative is that everything is done in terms of bytes rather than data structures.

The Queue

The multiprocessing.Queue is an implementation of a First In First Out (FIFO) stack that can be shared between processes. It is implemented using a pipe, see later, and some locks. It is a shared data structure at a higher level than the Pipe. In particular, the items that you can add and remove from the queue are Python objects, not just basic data types. If you add an object to a Queue it is first pickled to reduce its size and automatically un-pickled when it is retrieved. This is explained in Programmer’s Python: Everything Is Data,ISBN: 978-1871962598.

 

A FIFO stack is similar to the standard queue that we are all used to. Items join the queue at that back and leave the queue at the front. This means that the first item to join the queue is the first item out of the queue, hence the name. To create a Queue you use the constructor:

q=multiprocessing.Queue(maxsize=0)

The two basic operations are:

  • q.put(object, block=True, timeout=-1)
  • object=q.get(block=True, timeout=-1)

The put operation adds the object to the tail of the queue and the get removes and returns an object from the head of the queue. You can specify a blocking or non-blocking operation and a timeout. A non-blocking operation returns at once with an object or it raises the Empty exception. Notice that both operations can block as locks are used to protect the queue from a data race caused by overlapping access from multiple threads. That is, at any given time only one thread can be getting or putting data from or to the queue. Another reason that the put operation can wait is if the Queue is full. The put will wait until either another process gets some data or a timeout expires when a Full exception is raised.

The Queue works in a fairly sophisticated way. As long as you don’t specify a maxsize in the constructor you can store as many items in it as there is memory available. When the first item is added to the Queue a thread is started which transfers the data to the pipe, see later, that the Queue is built on. This means that the put doesn’t have to wait and there is always a free place in the Queue for new data. If you do specify a maxsize parameter then the Queue can only hold that number of items – maxsize gives the number of items, not the memory allocated. In this case it is possible for a put to have to wait until a space in the Queue becomes available. Notice that because of the buffering provided by the thread it is possible for the state of the Queue to lag behind the put operation. For example, you can put an item to the Queue and then test to see if it is empty using empty() only to find that it is. A moment later the thread will send the item to the Queue and empty() will return False.



Last Updated ( Tuesday, 18 July 2023 )