add Venus Release document and scripts

This commit is contained in:
Yinyin Liu 2024-07-01 11:49:57 +02:00
parent 1f079eb29f
commit 709bbc8795
189 changed files with 30967 additions and 0 deletions

View File

@ -0,0 +1,5 @@
**/docs
**/examples
**/test
**/utils
setup.py

View File

@ -0,0 +1,8 @@
languages:
- python
exclude_paths:
- docs/*
- tests/*
- utils/*
- pika/examples/*
- pika/spec.py

View File

@ -0,0 +1,2 @@
[run]
omit = pika/spec.py

View File

@ -0,0 +1,15 @@
Thank you for using Pika.
GitHub issues are **strictly** used for actionable work and pull
requests.
Pika's maintainers do NOT use GitHub issues for questions, root cause
analysis, conversations, code reviews, etc.
Please direct all non-work issues to either the `pika-python` or
`rabbitmq-users` mailing list:
* https://groups.google.com/forum/#!forum/pika-python
* https://groups.google.com/forum/#!forum/rabbitmq-users
Thank you

View File

@ -0,0 +1,43 @@
## Proposed Changes
Please describe the big picture of your changes here to communicate to
the Pika team why we should accept this pull request. If it fixes a bug
or resolves a feature request, be sure to link to that issue.
A pull request that doesn't explain **why** the change was made has a
much lower chance of being accepted.
If English isn't your first language, don't worry about it and try to
communicate the problem you are trying to solve to the best of your
abilities. As long as we can understand the intent, it's all good.
## Types of Changes
What types of changes does your code introduce to this project?
_Put an `x` in the boxes that apply_
- [ ] Bugfix (non-breaking change which fixes issue #NNNN)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Documentation (correction or otherwise)
- [ ] Cosmetics (whitespace, appearance)
## Checklist
_Put an `x` in the boxes that apply. You can also fill these out after
creating the PR. If you're unsure about any of them, don't hesitate to
ask on the
[`pika-python`](https://groups.google.com/forum/#!forum/pika-python)
mailing list. We're here to help! This is simply a reminder of what we
are going to look for before merging your code._
- [ ] I have read the `CONTRIBUTING.md` document
- [ ] All tests pass locally with my changes
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] I have added necessary documentation (if appropriate)
## Further Comments
If this is a relatively large or complex change, kick off the discussion
by explaining why you chose the solution you did and what alternatives
you considered, etc.

View File

@ -0,0 +1,20 @@
*.pyc
*~
.idea
.coverage
.tox
.DS_Store
.python-version
pika.iml
codegen
pika.egg-info
debug/
examples/pika
examples/blocking/pika
atlassian*xml
build
dist
docs/_build
venv*/
env/
testdata/*.conf

View File

@ -0,0 +1,103 @@
language: python
sudo: false
addons:
apt:
sources:
- sourceline: deb https://packages.erlang-solutions.com/ubuntu trusty contrib
key_url: https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc
packages:
# apt-cache show erlang-nox=1:20.3-1 | grep Depends | tr ' ' '\n' | grep erlang | grep -v erlang-base-hipe | tr -d ',' | sed 's/$/=1:20.3-1/'
- erlang-nox
env:
global:
- RABBITMQ_VERSION=3.7.8
- RABBITMQ_DOWNLOAD_URL="https://github.com/rabbitmq/rabbitmq-server/releases/download/v$RABBITMQ_VERSION/rabbitmq-server-generic-unix-$RABBITMQ_VERSION.tar.xz"
- RABBITMQ_TAR="rabbitmq-$RABBITMQ_VERSION.tar.xz"
- PATH=$HOME/.local/bin:$PATH
- AWS_DEFAULT_REGION=us-east-1
- secure: "Eghft2UgJmWuCgnqz6O+KV5F9AERzUbKIeXkcw7vsFAVdkB9z01XgqVLhQ6N+n6i8mkiRDkc0Jes6htVtO4Hi6lTTFeDhu661YCXXTFdRdsx+D9v5bgw8Q2bP41xFy0iao7otYqkzFKIo32Q2cUYzMUqXlS661Yai5DXldr3mjM="
- secure: "LjieH/Yh0ng5gwT6+Pl3rL7RMxxb/wOlogoLG7cS99XKdX6N4WRVFvWbHWwCxoVr0be2AcyQynu4VOn+0jC8iGfQjkJZ7UrJjZCDGWbNjAWrNcY0F9VdretFDy8Vn2sHfBXq8fINqszJkgTnmbQk8dZWUtj0m/RNVnOBeBcsIOU="
stages:
- test
- name: coverage
if: repo = pika/pika
- name: deploy
if: tag IS present
cache:
apt: true
directories:
- $HOME/.cache
install:
- pip install -r test-requirements.txt
- pip install awscli==1.11.18
- if [ ! -d "$HOME/.cache" ]; then mkdir "$HOME/.cache"; fi
- if [ -s "$HOME/.cache/$RABBITMQ_TAR" ]; then echo "[INFO] found cached $RABBITMQ_TAR file"; else wget -O "$HOME/.cache/$RABBITMQ_TAR" "$RABBITMQ_DOWNLOAD_URL"; fi
- tar -C "$TRAVIS_BUILD_DIR" -xvf "$HOME/.cache/$RABBITMQ_TAR"
- sed -e "s#PIKA_DIR#$TRAVIS_BUILD_DIR#g" "$TRAVIS_BUILD_DIR/testdata/rabbitmq.conf.in" > "$TRAVIS_BUILD_DIR/testdata/rabbitmq.conf"
before_script:
- pip freeze
- /bin/sh -c "RABBITMQ_PID_FILE=$TRAVIS_BUILD_DIR/rabbitmq.pid RABBITMQ_CONFIG_FILE=$TRAVIS_BUILD_DIR/testdata/rabbitmq $TRAVIS_BUILD_DIR/rabbitmq_server-$RABBITMQ_VERSION/sbin/rabbitmq-server &"
- /bin/sh "$TRAVIS_BUILD_DIR/rabbitmq_server-$RABBITMQ_VERSION/sbin/rabbitmqctl" wait "$TRAVIS_BUILD_DIR/rabbitmq.pid"
- /bin/sh "$TRAVIS_BUILD_DIR/rabbitmq_server-$RABBITMQ_VERSION/sbin/rabbitmqctl" status
script:
# See https://github.com/travis-ci/travis-ci/issues/1066 and https://github.com/pika/pika/pull/984#issuecomment-370565220
# as to why 'set -e' and 'set +e' are added here
- set -e
- nosetests
- PIKA_TEST_TLS=true nosetests
- set +e
after_success:
- aws s3 cp .coverage "s3://com-gavinroy-travis/pika/$TRAVIS_BUILD_NUMBER/.coverage.${TRAVIS_PYTHON_VERSION}"
jobs:
include:
- python: pypy3
- python: pypy
- python: 2.7
- python: 3.4
- python: 3.5
- python: 3.6
- python: 3.7
dist: xenial # required for Python 3.7 (travis-ci/travis-ci#9069)
- stage: coverage
if: fork = false OR type != pull_request
python: 3.6
services: []
install:
- pip install awscli coverage codecov
before_script: []
script:
- mkdir coverage
- aws s3 cp --recursive s3://com-gavinroy-travis/pika/$TRAVIS_BUILD_NUMBER/ coverage
- cd coverage
- coverage combine
- cd ..
- mv coverage/.coverage .
- coverage report
after_success: codecov
- stage: deploy
if: repo = pika/pika
python: 3.6
services: []
install: true
before_script: []
script: true
after_success: []
deploy:
distributions: sdist bdist_wheel
provider: pypi
user: crad
on:
tags: true
all_branches: true
password:
secure: "V/JTU/X9C6uUUVGEAWmWWbmKW7NzVVlC/JWYpo05Ha9c0YV0vX4jOfov2EUAphM0WwkD/MRhz4dq3kCU5+cjHxR3aTSb+sbiElsCpaciaPkyrns+0wT5MCMO29Lpnq2qBLc1ePR1ey5aTWC/VibgFJOL7H/3wyvukL6ZaCnktYk="

View File

@ -0,0 +1,760 @@
Version History
===============
0.13.1 2019-03-07
-----------------
`GitHub milestone <https://github.com/pika/pika/milestone/14>`_
0.13.0 2019-01-17
-----------------
`GitHub milestone <https://github.com/pika/pika/milestone/13>`_
- `AsyncioConnection`, `TornadoConnection` and `TwistedProtocolConnection` are no longer auto-imported (`PR <https://github.com/pika/pika/pull/1129>`_)
- Python `3.7` support (`Issue <https://github.com/pika/pika/issues/1107>`_)
0.12.0 2018-06-19
-----------------
`GitHub milestone <https://github.com/pika/pika/milestone/12>`_
This is an interim release prior to version `1.0.0`. It includes the following backported pull requests and commits from the `master` branch:
- `PR #908 <https://github.com/pika/pika/pull/908>`_
- `PR #910 <https://github.com/pika/pika/pull/910>`_
- `PR #918 <https://github.com/pika/pika/pull/918>`_
- `PR #920 <https://github.com/pika/pika/pull/920>`_
- `PR #924 <https://github.com/pika/pika/pull/924>`_
- `PR #937 <https://github.com/pika/pika/pull/937>`_
- `PR #938 <https://github.com/pika/pika/pull/938>`_
- `PR #933 <https://github.com/pika/pika/pull/933>`_
- `PR #940 <https://github.com/pika/pika/pull/940>`_
- `PR #932 <https://github.com/pika/pika/pull/932>`_
- `PR #928 <https://github.com/pika/pika/pull/928>`_
- `PR #934 <https://github.com/pika/pika/pull/934>`_
- `PR #915 <https://github.com/pika/pika/pull/915>`_
- `PR #946 <https://github.com/pika/pika/pull/946>`_
- `PR #947 <https://github.com/pika/pika/pull/947>`_
- `PR #952 <https://github.com/pika/pika/pull/952>`_
- `PR #956 <https://github.com/pika/pika/pull/956>`_
- `PR #966 <https://github.com/pika/pika/pull/966>`_
- `PR #975 <https://github.com/pika/pika/pull/975>`_
- `PR #978 <https://github.com/pika/pika/pull/978>`_
- `PR #981 <https://github.com/pika/pika/pull/981>`_
- `PR #994 <https://github.com/pika/pika/pull/994>`_
- `PR #1007 <https://github.com/pika/pika/pull/1007>`_
- `PR #1045 <https://github.com/pika/pika/pull/1045>`_ (manually backported)
- `PR #1011 <https://github.com/pika/pika/pull/1011>`_
Commits:
Travis CI fail fast - 3f0e739
New features:
`BlockingConnection` now supports the `add_callback_threadsafe` method which allows a function to be executed correctly on the IO loop thread. The main use-case for this is as follows:
- Application sets up a thread for `BlockingConnection` and calls `basic_consume` on it
- When a message is received, work is done on another thread
- When the work is done, the worker uses `connection.add_callback_threadsafe` to call the `basic_ack` method on the channel instance.
Please see `examples/basic_consumer_threaded.py` for an example. As always, `SelectConnection` and a fully async consumer/publisher is the preferred method of using Pika.
Heartbeats are now sent at an interval equal to 1/2 of the negotiated idle connection timeout. RabbitMQ's default timeout value is 60 seconds, so heartbeats will be sent at a 30 second interval. In addition, Pika's check for an idle connection will be done at an interval equal to the timeout value plus 5 seconds to allow for delays. This results in an interval of 65 seconds by default.
0.11.2 2017-11-30
-----------------
`GitHub milestone <https://github.com/pika/pika/milestone/11>`_
`0.11.2 <https://github.com/pika/pika/compare/0.11.1...0.11.2>`_
- Remove `+` character from platform releases string (`PR <https://github.com/pika/pika/pull/895>`_)
0.11.1 2017-11-27
-----------------
`GitHub milestone <https://github.com/pika/pika/milestone/10>`_
`0.11.1 <https://github.com/pika/pika/compare/0.11.0...0.11.1>`_
- Fix `BlockingConnection` to ensure event loop exits (`PR <https://github.com/pika/pika/pull/887>`_)
- Heartbeat timeouts will use the client value if specified (`PR <https://github.com/pika/pika/pull/874>`_)
- Allow setting some common TCP options (`PR <https://github.com/pika/pika/pull/880>`_)
- Errors when decoding Unicode are ignored (`PR <https://github.com/pika/pika/pull/890>`_)
- Fix large number encoding (`PR <https://github.com/pika/pika/pull/888>`_)
0.11.0 2017-07-29
-----------------
`GitHub milestone <https://github.com/pika/pika/milestone/9>`_
`0.11.0 <https://github.com/pika/pika/compare/0.10.0...0.11.0>`_
- Simplify Travis CI configuration for OS X.
- Add `asyncio` connection adapter for Python 3.4 and newer.
- Connection failures that occur after the socket is opened and before the
AMQP connection is ready to go are now reported by calling the connection
error callback. Previously these were not consistently reported.
- In BaseConnection.close, call _handle_ioloop_stop only if the connection is
already closed to allow the asynchronous close operation to complete
gracefully.
- Pass error information from failed socket connection to user callbacks
on_open_error_callback and on_close_callback with result_code=-1.
- ValueError is raised when a completion callback is passed to an asynchronous
(nowait) Channel operation. It's an application error to pass a non-None
completion callback with an asynchronous request, because this callback can
never be serviced in the asynchronous scenario.
- `Channel.basic_reject` fixed to allow `delivery_tag` to be of type `long`
as well as `int`. (by quantum5)
- Implemented support for blocked connection timeouts in
`pika.connection.Connection`. This feature is available to all pika adapters.
See `pika.connection.ConnectionParameters` docstring to learn more about
`blocked_connection_timeout` configuration.
- Deprecated the `heartbeat_interval` arg in `pika.ConnectionParameters` in
favor of the `heartbeat` arg for consistency with the other connection
parameters classes `pika.connection.Parameters` and `pika.URLParameters`.
- When the `port` arg is not set explicitly in `ConnectionParameters`
constructor, but the `ssl` arg is set explicitly, then set the port value to
to the default AMQP SSL port if SSL is enabled, otherwise to the default
AMQP plaintext port.
- `URLParameters` will raise ValueError if a non-empty URL scheme other than
{amqp | amqps | http | https} is specified.
- `InvalidMinimumFrameSize` and `InvalidMaximumFrameSize` exceptions are
deprecated. pika.connection.Parameters.frame_max property setter now raises
the standard `ValueError` exception when the value is out of bounds.
- Removed deprecated parameter `type` in `Channel.exchange_declare` and
`BlockingChannel.exchange_declare` in favor of the `exchange_type` arg that
doesn't overshadow the builtin `type` keyword.
- Channel.close() on OPENING channel transitions it to CLOSING instead of
raising ChannelClosed.
- Channel.close() on CLOSING channel raises `ChannelAlreadyClosing`; used to
raise `ChannelClosed`.
- Connection.channel() raises `ConnectionClosed` if connection is not in OPEN
state.
- When performing graceful close on a channel and `Channel.Close` from broker
arrives while waiting for CloseOk, don't release the channel number until
CloseOk arrives to avoid race condition that may lead to a new channel
receiving the CloseOk that was destined for the closing channel.
- The `backpressure_detection` option of `ConnectionParameters` and
`URLParameters` property is DEPRECATED in favor of `Connection.Blocked` and
`Connection.Unblocked`. See `Connection.add_on_connection_blocked_callback`.
0.10.0 2015-09-02
-----------------
`0.10.0 <https://github.com/pika/pika/compare/0.9.14...0.10.0>`_
- a9bf96d - LibevConnection: Fixed dict chgd size during iteration (Michael Laing)
- 388c55d - SelectConnection: Fixed KeyError exceptions in IOLoop timeout executions (Shinji Suzuki)
- 4780de3 - BlockingConnection: Add support to make BlockingConnection a Context Manager (@reddec)
0.10.0b2 2015-07-15
-------------------
- f72b58f - Fixed failure to purge _ConsumerCancellationEvt from BlockingChannel._pending_events during basic_cancel. (Vitaly Kruglikov)
0.10.0b1 2015-07-10
-------------------
High-level summary of notable changes:
- Change to 3-Clause BSD License
- Python 3.x support
- Over 150 commits from 19 contributors
- Refactoring of SelectConnection ioloop
- This major release contains certain non-backward-compatible API changes as
well as significant performance improvements in the `BlockingConnection`
adapter.
- Non-backward-compatible changes in `Channel.add_on_return_callback` callback's
signature.
- The `AsyncoreConnection` adapter was retired
**Details**
Python 3.x: this release introduces python 3.x support. Tested on Python 3.3
and 3.4.
`AsyncoreConnection`: Retired this legacy adapter to reduce maintenance burden;
the recommended replacement is the `SelectConnection` adapter.
`SelectConnection`: ioloop was refactored for compatibility with other ioloops.
`Channel.add_on_return_callback`: The callback is now passed the individual
parameters channel, method, properties, and body instead of a tuple of those
values for congruence with other similar callbacks.
`BlockingConnection`: This adapter underwent a makeover under the hood and
gained significant performance improvements as well as enhanced timer
resolution. It is now implemented as a client of the `SelectConnection` adapter.
Below is an overview of the `BlockingConnection` and `BlockingChannel` API
changes:
- Recursion: the new implementation eliminates callback recursion that
sometimes blew out the stack in the legacy implementation (e.g.,
publish -> consumer_callback -> publish -> consumer_callback, etc.). While
`BlockingConnection.process_data_events` and `BlockingConnection.sleep` may
still be called from the scope of the blocking adapter's callbacks in order
to process pending I/O, additional callbacks will be suppressed whenever
`BlockingConnection.process_data_events` and `BlockingConnection.sleep` are
nested in any combination; in that case, the callback information will be
bufferred and dispatched once nesting unwinds and control returns to the
level-zero dispatcher.
- `BlockingConnection.connect`: this method was removed in favor of the
constructor as the only way to establish connections; this reduces
maintenance burden, while improving reliability of the adapter.
- `BlockingConnection.process_data_events`: added the optional parameter
`time_limit`.
- `BlockingConnection.add_on_close_callback`: removed; legacy raised
`NotImplementedError`.
- `BlockingConnection.add_on_open_callback`: removed; legacy raised
`NotImplementedError`.
- `BlockingConnection.add_on_open_error_callback`: removed; legacy raised
`NotImplementedError`.
- `BlockingConnection.add_backpressure_callback`: not supported
- `BlockingConnection.set_backpressure_multiplier`: not supported
- `BlockingChannel.add_on_flow_callback`: not supported; per docstring in
channel.py: "Note that newer versions of RabbitMQ will not issue this but
instead use TCP backpressure".
- `BlockingChannel.flow`: not supported
- `BlockingChannel.force_data_events`: removed as it is no longer necessary
following redesign of the adapter.
- Removed the `nowait` parameter from `BlockingChannel` methods, forcing
`nowait=False` (former API default) in the implementation; this is more
suitable for the blocking nature of the adapter and its error-reporting
strategy; this concerns the following methods: `basic_cancel`,
`confirm_delivery`, `exchange_bind`, `exchange_declare`, `exchange_delete`,
`exchange_unbind`, `queue_bind`, `queue_declare`, `queue_delete`, and
`queue_purge`.
- `BlockingChannel.basic_cancel`: returns a sequence instead of None; for a
`no_ack=True` consumer, `basic_cancel` returns a sequence of pending
messages that arrived before broker confirmed the cancellation.
- `BlockingChannel.consume`: added new optional kwargs `arguments` and
`inactivity_timeout`. Also, raises ValueError if the consumer creation
parameters don't match those used to create the existing queue consumer
generator, if any; this happens when you break out of the consume loop, then
call `BlockingChannel.consume` again with different consumer-creation args
without first cancelling the previous queue consumer generator via
`BlockingChannel.cancel`. The legacy implementation would silently resume
consuming from the existing queue consumer generator even if the subsequent
`BlockingChannel.consume` was invoked with a different queue name, etc.
- `BlockingChannel.cancel`: returns 0; the legacy implementation tried to
return the number of requeued messages, but this number was not accurate
as it didn't include the messages returned by the Channel class; this count
is not generally useful, so returning 0 is a reasonable replacement.
- `BlockingChannel.open`: removed in favor of having a single mechanism for
creating a channel (`BlockingConnection.channel`); this reduces maintenance
burden, while improving reliability of the adapter.
- `BlockingChannel.confirm_delivery`: raises UnroutableError when unroutable
messages that were sent prior to this call are returned before we receive
Confirm.Select-ok.
- `BlockingChannel.basic_publish: always returns True when delivery
confirmation is not enabled (publisher-acks = off); the legacy implementation
returned a bool in this case if `mandatory=True` to indicate whether the
message was delivered; however, this was non-deterministic, because
Basic.Return is asynchronous and there is no way to know how long to wait
for it or its absence. The legacy implementation returned None when
publishing with publisher-acks = off and `mandatory=False`. The new
implementation always returns True when publishing while
publisher-acks = off.
- `BlockingChannel.publish`: a new alternate method (vs. `basic_publish`) for
publishing a message with more detailed error reporting via UnroutableError
and NackError exceptions.
- `BlockingChannel.start_consuming`: raises pika.exceptions.RecursionError if
called from the scope of a `BlockingConnection` or `BlockingChannel`
callback.
- `BlockingChannel.get_waiting_message_count`: new method; returns the number
of messages that may be retrieved from the current queue consumer generator
via `BasicChannel.consume` without blocking.
**Commits**
- 5aaa753 - Fixed SSL import and removed no_ack=True in favor of explicit AMQP message handling based on deferreds (skftn)
- 7f222c2 - Add checkignore for codeclimate (Gavin M. Roy)
- 4dec370 - Implemented BlockingChannel.flow; Implemented BlockingConnection.add_on_connection_blocked_callback; Implemented BlockingConnection.add_on_connection_unblocked_callback. (Vitaly Kruglikov)
- 4804200 - Implemented blocking adapter acceptance test for exchange-to-exchange binding. Added rudimentary validation of BasicProperties passthru in blocking adapter publish tests. Updated CHANGELOG. (Vitaly Kruglikov)
- 4ec07fd - Fixed sending of data in TwistedProtocolConnection (Vitaly Kruglikov)
- a747fb3 - Remove my copyright from forward_server.py test utility. (Vitaly Kruglikov)
- 94246d2 - Return True from basic_publish when pubacks is off. Implemented more blocking adapter accceptance tests. (Vitaly Kruglikov)
- 3ce013d - PIKA-609 Wait for broker to dispatch all messages to client before cancelling consumer in TestBasicCancelWithNonAckableConsumer and TestBasicCancelWithAckableConsumer (Vitaly Kruglikov)
- 293f778 - Created CHANGELOG entry for release 0.10.0. Fixed up callback documentation for basic_get, basic_consume, and add_on_return_callback. (Vitaly Kruglikov)
- 16d360a - Removed the legacy AsyncoreConnection adapter in favor of the recommended SelectConnection adapter. (Vitaly Kruglikov)
- 240a82c - Defer creation of poller's event loop interrupt socket pair until start is called, because some SelectConnection users (e.g., BlockingConnection adapter) don't use the event loop, and these sockets would just get reported as resource leaks. (Vitaly Kruglikov)
- aed5cae - Added EINTR loops in select_connection pollers. Addressed some pylint findings, including an error or two. Wrap socket.send and socket.recv calls in EINTR loops Use the correct exception for socket.error and select.error and get errno depending on python version. (Vitaly Kruglikov)
- 498f1be - Allow passing exchange, queue and routing_key as text, handle short strings as text in python3 (saarni)
- 9f7f243 - Restored basic_consume, basic_cancel, and add_on_cancel_callback (Vitaly Kruglikov)
- 18c9909 - Reintroduced BlockingConnection.process_data_events. (Vitaly Kruglikov)
- 4b25cb6 - Fixed BlockingConnection/BlockingChannel acceptance and unit tests (Vitaly Kruglikov)
- bfa932f - Facilitate proper connection state after BasicConnection._adapter_disconnect (Vitaly Kruglikov)
- 9a09268 - Fixed BlockingConnection test that was failing with ConnectionClosed error. (Vitaly Kruglikov)
- 5a36934 - Copied synchronous_connection.py from pika-synchronous branch Fixed pylint findings Integrated SynchronousConnection with the new ioloop in SelectConnection Defined dedicated message classes PolledMessage and ConsumerMessage and moved from BlockingChannel to module-global scope. Got rid of nowait args from BlockingChannel public API methods Signal unroutable messages via UnroutableError exception. Signal Nack'ed messages via NackError exception. These expose more information about the failure than legacy basic_publich API. Removed set_timeout and backpressure callback methods Restored legacy `is_open`, etc. property names (Vitaly Kruglikov)
- 6226dc0 - Remove deprecated --use-mirrors (Gavin M. Roy)
- 1a7112f - Raise ConnectionClosed when sending a frame with no connection (#439) (Gavin M. Roy)
- 9040a14 - Make delivery_tag non-optional (#498) (Gavin M. Roy)
- 86aabc2 - Bump version (Gavin M. Roy)
- 562075a - Update a few testing things (Gavin M. Roy)
- 4954d38 - use unicode_type in blocking_connection.py (Antti Haapala)
- 133d6bc - Let Travis install ordereddict for Python 2.6, and ttest 3.3, 3.4 too. (Antti Haapala)
- 0d2287d - Pika Python 3 support (Antti Haapala)
- 3125c79 - SSLWantRead is not supported before python 2.7.9 and 3.3 (Will)
- 9a9c46c - Fixed TestDisconnectDuringConnectionStart: it turns out that depending on callback order, it might get either ProbableAuthenticationError or ProbableAccessDeniedError. (Vitaly Kruglikov)
- cd8c9b0 - A fix the write starvation problem that we see with tornado and pika (Will)
- 8654fbc - SelectConnection - make interrupt socketpair non-blocking (Will)
- 4f3666d - Added copyright in forward_server.py and fixed NameError bug (Vitaly Kruglikov)
- f8ebbbc - ignore docs (Gavin M. Roy)
- a344f78 - Updated codeclimate config (Gavin M. Roy)
- 373c970 - Try and fix pathing issues in codeclimate (Gavin M. Roy)
- 228340d - Ignore codegen (Gavin M. Roy)
- 4db0740 - Add a codeclimate config (Gavin M. Roy)
- 7e989f9 - Slight code re-org, usage comment and better naming of test file. (Will)
- 287be36 - Set up _kqueue member of KQueuePoller before calling super constructor to avoid exception due to missing _kqueue member. Call `self._map_event(event)` instead of `self._map_event(event.filter)`, because `KQueuePoller._map_event()` assumes it's getting an event, not an event filter. (Vitaly Kruglikov)
- 62810fb - Fix issue #412: reset BlockingConnection._read_poller in BlockingConnection._adapter_disconnect() to guard against accidental access to old file descriptor. (Vitaly Kruglikov)
- 03400ce - Rationalise adapter acceptance tests (Will)
- 9414153 - Fix bug selecting non epoll poller (Will)
- 4f063df - Use user heartbeat setting if server proposes none (Pau Gargallo)
- 9d04d6e - Deactivate heartbeats when heartbeat_interval is 0 (Pau Gargallo)
- a52a608 - Bug fix and review comments. (Will)
- e3ebb6f - Fix incorrect x-expires argument in acceptance tests (Will)
- 294904e - Get BlockingConnection into consistent state upon loss of TCP/IP connection with broker and implement acceptance tests for those cases. (Vitaly Kruglikov)
- 7f91a68 - Make SelectConnection behave like an ioloop (Will)
- dc9db2b - Perhaps 5 seconds is too agressive for travis (Gavin M. Roy)
- c23e532 - Lower the stuck test timeout (Gavin M. Roy)
- 1053ebc - Late night bug (Gavin M. Roy)
- cd6c1bf - More BaseConnection._handle_error cleanup (Gavin M. Roy)
- a0ff21c - Fix the test to work with Python 2.6 (Gavin M. Roy)
- 748e8aa - Remove pypy for now (Gavin M. Roy)
- 1c921c1 - Socket close/shutdown cleanup (Gavin M. Roy)
- 5289125 - Formatting update from PR (Gavin M. Roy)
- d235989 - Be more specific when calling getaddrinfo (Gavin M. Roy)
- b5d1b31 - Reflect the method name change in pika.callback (Gavin M. Roy)
- df7d3b7 - Cleanup BlockingConnection in a few places (Gavin M. Roy)
- cd99e1c - Rename method due to use in BlockingConnection (Gavin M. Roy)
- 7e0d1b3 - Use google style with yapf instead of pep8 (Gavin M. Roy)
- 7dc9bab - Refactor socket writing to not use sendall #481 (Gavin M. Roy)
- 4838789 - Dont log the fd #521 (Gavin M. Roy)
- 765107d - Add Connection.Blocked callback registration methods #476 (Gavin M. Roy)
- c15b5c1 - Fix _blocking typo pointed out in #513 (Gavin M. Roy)
- 759ac2c - yapf of codegen (Gavin M. Roy)
- 9dadd77 - yapf cleanup of codegen and spec (Gavin M. Roy)
- ddba7ce - Do not reject consumers with no_ack=True #486 #530 (Gavin M. Roy)
- 4528a1a - yapf reformatting of tests (Gavin M. Roy)
- e7b6d73 - Remove catching AttributError (#531) (Gavin M. Roy)
- 41ea5ea - Update README badges [skip ci] (Gavin M. Roy)
- 6af987b - Add note on contributing (Gavin M. Roy)
- 161fc0d - yapf formatting cleanup (Gavin M. Roy)
- edcb619 - Add PYPY to travis testing (Gavin M. Roy)
- 2225771 - Change the coverage badge (Gavin M. Roy)
- 8f7d451 - Move to codecov from coveralls (Gavin M. Roy)
- b80407e - Add confirm_delivery to example (Andrew Smith)
- 6637212 - Update base_connection.py (bstemshorn)
- 1583537 - #544 get_waiting_message_count() (markcf)
- 0c9be99 - Fix #535: pass expected reply_code and reply_text from method frame to Connection._on_disconnect from Connection._on_connection_closed (Vitaly Kruglikov)
- d11e73f - Propagate ConnectionClosed exception out of BlockingChannel._send_method() and log ConnectionClosed in BlockingConnection._on_connection_closed() (Vitaly Kruglikov)
- 63d2951 - Fix #541 - make sure connection state is properly reset when BlockingConnection._check_state_on_disconnect raises ConnectionClosed. This supplements the previously-merged PR #450 by getting the connection into consistent state. (Vitaly Kruglikov)
- 71bc0eb - Remove unused self.fd attribute from BaseConnection (Vitaly Kruglikov)
- 8c08f93 - PIKA-532 Removed unnecessary params (Vitaly Kruglikov)
- 6052ecf - PIKA-532 Fix bug in BlockingConnection._handle_timeout that was preventing _on_connection_closed from being called when not closing. (Vitaly Kruglikov)
- 562aa15 - pika: callback: Display exception message when callback fails. (Stuart Longland)
- 452995c - Typo fix in connection.py (Andrew)
- 361c0ad - Added some missing yields (Robert Weidlich)
- 0ab5a60 - Added complete example for python twisted service (Robert Weidlich)
- 4429110 - Add deployment and webhooks (Gavin M. Roy)
- 7e50302 - Fix has_content style in codegen (Andrew Grigorev)
- 28c2214 - Fix the trove categorization (Gavin M. Roy)
- de8b545 - Ensure frames can not be interspersed on send (Gavin M. Roy)
- 8fe6bdd - Fix heartbeat behaviour after connection failure. (Kyösti Herrala)
- c123472 - Updating BlockingChannel.basic_get doc (it does not receive a callback like the rest of the adapters) (Roberto Decurnex)
- b5f52fb - Fix number of arguments passed to _on_return callback (Axel Eirola)
- 765139e - Lower default TIMEOUT to 0.01 (bra-fsn)
- 6cc22a5 - Fix confirmation on reconnects (bra-fsn)
- f4faf0a - asynchronous publisher and subscriber examples refactored to follow the StepDown rule (Riccardo Cirimelli)
0.9.14 - 2014-07-11
-------------------
`0.9.14 <https://github.com/pika/pika/compare/0.9.13...0.9.14>`_
- 57fe43e - fix test to generate a correct range of random ints (ml)
- 0d68dee - fix async watcher for libev_connection (ml)
- 01710ad - Use default username and password if not specified in URLParameters (Sean Dwyer)
- fae328e - documentation typo (Jeff Fein-Worton)
- afbc9e0 - libev_connection: reset_io_watcher (ml)
- 24332a2 - Fix the manifest (Gavin M. Roy)
- acdfdef - Remove useless test (Gavin M. Roy)
- 7918e1a - Skip libev tests if pyev is not installed or if they are being run in pypy (Gavin M. Roy)
- bb583bf - Remove the deprecated test (Gavin M. Roy)
- aecf3f2 - Don't reject a message if the channel is not open (Gavin M. Roy)
- e37f336 - Remove UTF-8 decoding in spec (Gavin M. Roy)
- ddc35a9 - Update the unittest to reflect removal of force binary (Gavin M. Roy)
- fea2476 - PEP8 cleanup (Gavin M. Roy)
- 9b97956 - Remove force_binary (Gavin M. Roy)
- a42dd90 - Whitespace required (Gavin M. Roy)
- 85867ea - Update the content_frame_dispatcher tests to reflect removal of auto-cast utf-8 (Gavin M. Roy)
- 5a4bd5d - Remove unicode casting (Gavin M. Roy)
- efea53d - Remove force binary and unicode casting (Gavin M. Roy)
- e918d15 - Add methods to remove deprecation warnings from asyncore (Gavin M. Roy)
- 117f62d - Add a coveragerc to ignore the auto generated pika.spec (Gavin M. Roy)
- 52f4485 - Remove pypy tests from travis for now (Gavin M. Roy)
- c3aa958 - Update README.rst (Gavin M. Roy)
- 3e2319f - Delete README.md (Gavin M. Roy)
- c12b0f1 - Move to RST (Gavin M. Roy)
- 704f5be - Badging updates (Gavin M. Roy)
- 7ae33ca - Update for coverage info (Gavin M. Roy)
- ae7ca86 - add libev_adapter_tests.py; modify .travis.yml to install libev and pyev (ml)
- f86aba5 - libev_connection: add **kwargs to _handle_event; suppress default_ioloop reuse warning (ml)
- 603f1cf - async_test_base: add necessary args to _on_cconn_closed (ml)
- 3422007 - add libev_adapter_tests.py (ml)
- 6cbab0c - removed relative imports and importing urlparse from urllib.parse for py3+ (a-tal)
- f808464 - libev_connection: add async watcher; add optional parameters to add_timeout (ml)
- c041c80 - Remove ev all together for now (Gavin M. Roy)
- 9408388 - Update the test descriptions and timeout (Gavin M. Roy)
- 1b552e0 - Increase timeout (Gavin M. Roy)
- 69a1f46 - Remove the pyev requirement for 2.6 testing (Gavin M. Roy)
- fe062d2 - Update package name (Gavin M. Roy)
- 611ad0e - Distribute the LICENSE and README.md (#350) (Gavin M. Roy)
- df5e1d8 - Ensure that the entire frame is written using socket.sendall (#349) (Gavin M. Roy)
- 69ec8cf - Move the libev install to before_install (Gavin M. Roy)
- a75f693 - Update test structure (Gavin M. Roy)
- 636b424 - Update things to ignore (Gavin M. Roy)
- b538c68 - Add tox, nose.cfg, update testing config (Gavin M. Roy)
- a0e7063 - add some tests to increase coverage of pika.connection (Charles Law)
- c76d9eb - Address issue #459 (Gavin M. Roy)
- 86ad2db - Raise exception if positional arg for parameters isn't an instance of Parameters (Gavin M. Roy)
- 14d08e1 - Fix for python 2.6 (Gavin M. Roy)
- bd388a3 - Use the first unused channel number addressing #404, #460 (Gavin M. Roy)
- e7676e6 - removing a debug that was left in last commit (James Mutton)
- 6c93b38 - Fixing connection-closed behavior to detect on attempt to publish (James Mutton)
- c3f0356 - Initialize bytes_written in _handle_write() (Jonathan Kirsch)
- 4510e95 - Fix _handle_write() may not send full frame (Jonathan Kirsch)
- 12b793f - fixed Tornado Consumer example to successfully reconnect (Yang Yang)
- f074444 - remove forgotten import of ordereddict (Pedro Abranches)
- 1ba0aea - fix last merge (Pedro Abranches)
- 10490a6 - change timeouts structure to list to maintain scheduling order (Pedro Abranches)
- 7958394 - save timeouts in ordered dict instead of dict (Pedro Abranches)
- d2746bf - URLParameters and ConnectionParameters accept unicode strings (Allard Hoeve)
- 596d145 - previous fix for AttributeError made parent and child class methods identical, remove duplication (James Mutton)
- 42940dd - UrlParameters Docs: fixed amqps scheme examples (Riccardo Cirimelli)
- 43904ff - Dont test this in PyPy due to sort order issue (Gavin M. Roy)
- d7d293e - Don't leave __repr__ sorting up to chance (Gavin M. Roy)
- 848c594 - Add integration test to travis and fix invocation (Gavin M. Roy)
- 2678275 - Add pypy to travis tests (Gavin M. Roy)
- 1877f3d - Also addresses issue #419 (Gavin M. Roy)
- 470c245 - Address issue #419 (Gavin M. Roy)
- ca3cb59 - Address issue #432 (Gavin M. Roy)
- a3ff6f2 - Default frame max should be AMQP FRAME_MAX (Gavin M. Roy)
- ff3d5cb - Remove max consumer tag test due to change in code. (Gavin M. Roy)
- 6045dda - Catch KeyError (#437) to ensure that an exception is not raised in a race condition (Gavin M. Roy)
- 0b4d53a - Address issue #441 (Gavin M. Roy)
- 180e7c4 - Update license and related files (Gavin M. Roy)
- 256ed3d - Added Jython support. (Erik Olof Gunnar Andersson)
- f73c141 - experimental work around for recursion issue. (Erik Olof Gunnar Andersson)
- a623f69 - Prevent #436 by iterating the keys and not the dict (Gavin M. Roy)
- 755fcae - Add support for authentication_failure_close, connection.blocked (Gavin M. Roy)
- c121243 - merge upstream master (Michael Laing)
- a08dc0d - add arg to channel.basic_consume (Pedro Abranches)
- 10b136d - Documentation fix (Anton Ryzhov)
- 9313307 - Fixed minor markup errors. (Jorge Puente Sarrín)
- fb3e3cf - Fix the spelling of UnsupportedAMQPFieldException (Garrett Cooper)
- 03d5da3 - connection.py: Propagate the force_channel keyword parameter to methods involved in channel creation (Michael Laing)
- 7bbcff5 - Documentation fix for basic_publish (JuhaS)
- 01dcea7 - Expose no_ack and exclusive to BlockingChannel.consume (Jeff Tang)
- d39b6aa - Fix BlockingChannel.basic_consume does not block on non-empty queues (Juhyeong Park)
- 6e1d295 - fix for issue 391 and issue 307 (Qi Fan)
- d9ffce9 - Update parameters.rst (cacovsky)
- 6afa41e - Add additional badges (Gavin M. Roy)
- a255925 - Fix return value on dns resolution issue (Laurent Eschenauer)
- 3f7466c - libev_connection: tweak docs (Michael Laing)
- 0aaed93 - libev_connection: Fix varable naming (Michael Laing)
- 0562d08 - libev_connection: Fix globals warning (Michael Laing)
- 22ada59 - libev_connection: use globals to track sigint and sigterm watchers as they are created globally within libev (Michael Laing)
- 2649b31 - Move badge [skip ci] (Gavin M. Roy)
- f70eea1 - Remove pypy and installation attempt of pyev (Gavin M. Roy)
- f32e522 - Conditionally skip external connection adapters if lib is not installed (Gavin M. Roy)
- cce97c5 - Only install pyev on python 2.7 (Gavin M. Roy)
- ff84462 - Add travis ci support (Gavin M. Roy)
- cf971da - lib_evconnection: improve signal handling; add callback (Michael Laing)
- 9adb269 - bugfix in returning a list in Py3k (Alex Chandel)
- c41d5b9 - update exception syntax for Py3k (Alex Chandel)
- c8506f1 - fix _adapter_connect (Michael Laing)
- 67cb660 - Add LibevConnection to README (Michael Laing)
- 1f9e72b - Propagate low-level connection errors to the AMQPConnectionError. (Bjorn Sandberg)
- e1da447 - Avoid race condition in _on_getok on successive basic_get() when clearing out callbacks (Jeff)
- 7a09979 - Add support for upcoming Connection.Blocked/Unblocked (Gavin M. Roy)
- 53cce88 - TwistedChannel correctly handles multi-argument deferreds. (eivanov)
- 66f8ace - Use uuid when creating unique consumer tag (Perttu Ranta-aho)
- 4ee2738 - Limit the growth of Channel._cancelled, use deque instead of list. (Perttu Ranta-aho)
- 0369aed - fix adapter references and tweak docs (Michael Laing)
- 1738c23 - retry select.select() on EINTR (Cenk Alti)
- 1e55357 - libev_connection: reset internal state on reconnect (Michael Laing)
- 708559e - libev adapter (Michael Laing)
- a6b7c8b - Prioritize EPollPoller and KQueuePoller over PollPoller and SelectPoller (Anton Ryzhov)
- 53400d3 - Handle socket errors in PollPoller and EPollPoller Correctly check 'select.poll' availability (Anton Ryzhov)
- a6dc969 - Use dict.keys & items instead of iterkeys & iteritems (Alex Chandel)
- 5c1b0d0 - Use print function syntax, in examples (Alex Chandel)
- ac9f87a - Fixed a typo in the name of the Asyncore Connection adapter (Guruprasad)
- dfbba50 - Fixed bug mentioned in Issue #357 (Erik Andersson)
- c906a2d - Drop additional flags when getting info for the hostnames, log errors (#352) (Gavin M. Roy)
- baf23dd - retry poll() on EINTR (Cenk Alti)
- 7cd8762 - Address ticket #352 catching an error when socket.getprotobyname fails (Gavin M. Roy)
- 6c3ec75 - Prep for 0.9.14 (Gavin M. Roy)
- dae7a99 - Bump to 0.9.14p0 (Gavin M. Roy)
- 620edc7 - Use default port and virtual host if omitted in URLParameters (Issue #342) (Gavin M. Roy)
- 42a8787 - Move the exception handling inside the while loop (Gavin M. Roy)
- 10e0264 - Fix connection back pressure detection issue #347 (Gavin M. Roy)
- 0bfd670 - Fixed mistake in commit 3a19d65. (Erik Andersson)
- da04bc0 - Fixed Unknown state on disconnect error message generated when closing connections. (Erik Andersson)
- 3a19d65 - Alternative solution to fix #345. (Erik Andersson)
- abf9fa8 - switch to sendall to send entire frame (Dustin Koupal)
- 9ce8ce4 - Fixed the async publisher example to work with reconnections (Raphaël De Giusti)
- 511028a - Fix typo in TwistedChannel docstring (cacovsky)
- 8b69e5a - calls self._adapter_disconnect() instead of self.disconnect() which doesn't actually exist #294 (Mark Unsworth)
- 06a5cf8 - add NullHandler to prevent logging warnings (Cenk Alti)
- f404a9a - Fix #337 cannot start ioloop after stop (Ralf Nyren)
0.9.13 - 2013-05-15
-------------------
`0.9.13 <https://github.com/pika/pika/compare/0.9.12...0.9.13>`_
**Major Changes**
- IPv6 Support with thanks to Alessandro Tagliapietra for initial prototype
- Officially remove support for <= Python 2.5 even though it was broken already
- Drop pika.simplebuffer.SimpleBuffer in favor of the Python stdlib collections.deque object
- New default object for receiving content is a "bytes" object which is a str wrapper in Python 2, but paves way for Python 3 support
- New "Raw" mode for frame decoding content frames (#334) addresses issues #331, #229 added by Garth Williamson
- Connection and Disconnection logic refactored, allowing for cleaner separation of protocol logic and socket handling logic as well as connection state management
- New "on_open_error_callback" argument in creating connection objects and new Connection.add_on_open_error_callback method
- New Connection.connect method to cleanly allow for reconnection code
- Support for all AMQP field types, using protocol specified signed/unsigned unpacking
**Backwards Incompatible Changes**
- Method signature for creating connection objects has new argument "on_open_error_callback" which is positionally before "on_close_callback"
- Internal callback variable names in connection.Connection have been renamed and constants used. If you relied on any of these callbacks outside of their internal use, make sure to check out the new constants.
- Connection._connect method, which was an internal only method is now deprecated and will raise a DeprecationWarning. If you relied on this method, your code needs to change.
- pika.simplebuffer has been removed
**Bugfixes**
- BlockingConnection consumer generator does not free buffer when exited (#328)
- Unicode body payloads in the blocking adapter raises exception (#333)
- Support "b" short-short-int AMQP data type (#318)
- Docstring type fix in adapters/select_connection (#316) fix by Rikard Hultén
- IPv6 not supported (#309)
- Stop the HeartbeatChecker when connection is closed (#307)
- Unittest fix for SelectConnection (#336) fix by Erik Andersson
- Handle condition where no connection or socket exists but SelectConnection needs a timeout for retrying a connection (#322)
- TwistedAdapter lagging behind BaseConnection changes (#321) fix by Jan Urbański
**Other**
- Refactored documentation
- Added Twisted Adapter example (#314) by nolinksoft
0.9.12 - 2013-03-18
-------------------
`0.9.12 <https://github.com/pika/pika/compare/0.9.11...0.9.12>`_
**Bugfixes**
- New timeout id hashing was not unique
0.9.11 - 2013-03-17
-------------------
`0.9.11 <https://github.com/pika/pika/compare/0.9.10...0.9.11>`_
**Bugfixes**
- Address inconsistent channel close callback documentation and add the signature
change to the TwistedChannel class (#305)
- Address a missed timeout related internal data structure name change
introduced in the SelectConnection 0.9.10 release. Update all connection
adapters to use same signature and docstring (#306).
0.9.10 - 2013-03-16
-------------------
`0.9.10 <https://github.com/pika/pika/compare/0.9.9...0.9.10>`_
**Bugfixes**
- Fix timeout in twisted adapter (Submitted by cellscape)
- Fix blocking_connection poll timer resolution to milliseconds (Submitted by cellscape)
- Fix channel._on_close() without a method frame (Submitted by Richard Boulton)
- Addressed exception on close (Issue #279 - fix by patcpsc)
- 'messages' not initialized in BlockingConnection.cancel() (Issue #289 - fix by Mik Kocikowski)
- Make queue_unbind behave like queue_bind (Issue #277)
- Address closing behavioral issues for connections and channels (Issue #275)
- Pass a Method frame to Channel._on_close in Connection._on_disconnect (Submitted by Jan Urbański)
- Fix channel closed callback signature in the Twisted adapter (Submitted by Jan Urbański)
- Don't stop the IOLoop on connection close for in the Twisted adapter (Submitted by Jan Urbański)
- Update the asynchronous examples to fix reconnecting and have it work
- Warn if the socket was closed such as if RabbitMQ dies without a Close frame
- Fix URLParameters ssl_options (Issue #296)
- Add state to BlockingConnection addressing (Issue #301)
- Encode unicode body content prior to publishing (Issue #282)
- Fix an issue with unicode keys in BasicProperties headers key (Issue #280)
- Change how timeout ids are generated (Issue #254)
- Address post close state issues in Channel (Issue #302)
** Behavior changes **
- Change core connection communication behavior to prefer outbound writes over reads, addressing a recursion issue
- Update connection on close callbacks, changing callback method signature
- Update channel on close callbacks, changing callback method signature
- Give more info in the ChannelClosed exception
- Change the constructor signature for BlockingConnection, block open/close callbacks
- Disable the use of add_on_open_callback/add_on_close_callback methods in BlockingConnection
0.9.9 - 2013-01-29
------------------
`0.9.9 <https://github.com/pika/pika/compare/0.9.8...0.9.9>`_
**Bugfixes**
- Only remove the tornado_connection.TornadoConnection file descriptor from the IOLoop if it's still open (Issue #221)
- Allow messages with no body (Issue #227)
- Allow for empty routing keys (Issue #224)
- Don't raise an exception when trying to send a frame to a closed connection (Issue #229)
- Only send a Connection.CloseOk if the connection is still open. (Issue #236 - Fix by noleaf)
- Fix timeout threshold in blocking connection - (Issue #232 - Fix by Adam Flynn)
- Fix closing connection while a channel is still open (Issue #230 - Fix by Adam Flynn)
- Fixed misleading warning and exception messages in BaseConnection (Issue #237 - Fix by Tristan Penman)
- Pluralised and altered the wording of the AMQPConnectionError exception (Issue #237 - Fix by Tristan Penman)
- Fixed _adapter_disconnect in TornadoConnection class (Issue #237 - Fix by Tristan Penman)
- Fixing hang when closing connection without any channel in BlockingConnection (Issue #244 - Fix by Ales Teska)
- Remove the process_timeouts() call in SelectConnection (Issue #239)
- Change the string validation to basestring for host connection parameters (Issue #231)
- Add a poller to the BlockingConnection to address latency issues introduced in Pika 0.9.8 (Issue #242)
- reply_code and reply_text is not set in ChannelException (Issue #250)
- Add the missing constraint parameter for Channel._on_return callback processing (Issue #257 - Fix by patcpsc)
- Channel callbacks not being removed from callback manager when channel is closed or deleted (Issue #261)
0.9.8 - 2012-11-18
------------------
`0.9.8 <https://github.com/pika/pika/compare/0.9.7...0.9.8>`_
**Bugfixes**
- Channel.queue_declare/BlockingChannel.queue_declare not setting up callbacks property for empty queue name (Issue #218)
- Channel.queue_bind/BlockingChannel.queue_bind not allowing empty routing key
- Connection._on_connection_closed calling wrong method in Channel (Issue #219)
- Fix tx_commit and tx_rollback bugs in BlockingChannel (Issue #217)
0.9.7 - 2012-11-11
------------------
`0.9.7 <https://github.com/pika/pika/compare/0.9.6...0.9.7>`_
**New features**
- generator based consumer in BlockingChannel (See :doc:`examples/blocking_consumer_generator` for example)
**Changes**
- BlockingChannel._send_method will only wait if explicitly told to
**Bugfixes**
- Added the exchange "type" parameter back but issue a DeprecationWarning
- Dont require a queue name in Channel.queue_declare()
- Fixed KeyError when processing timeouts (Issue # 215 - Fix by Raphael De Giusti)
- Don't try and close channels when the connection is closed (Issue #216 - Fix by Charles Law)
- Dont raise UnexpectedFrame exceptions, log them instead
- Handle multiple synchronous RPC calls made without waiting for the call result (Issues #192, #204, #211)
- Typo in docs (Issue #207 Fix by Luca Wehrstedt)
- Only sleep on connection failure when retry attempts are > 0 (Issue #200)
- Bypass _rpc method and just send frames for Basic.Ack, Basic.Nack, Basic.Reject (Issue #205)
0.9.6 - 2012-10-29
------------------
`0.9.6 <https://github.com/pika/pika/compare/0.9.5...0.9.6>`_
**New features**
- URLParameters
- BlockingChannel.start_consuming() and BlockingChannel.stop_consuming()
- Delivery Confirmations
- Improved unittests
**Major bugfix areas**
- Connection handling
- Blocking functionality in the BlockingConnection
- SSL
- UTF-8 Handling
**Removals**
- pika.reconnection_strategies
- pika.channel.ChannelTransport
- pika.log
- pika.template
- examples directory
0.9.5 - 2011-03-29
------------------
`0.9.5 <https://github.com/pika/pika/compare/0.9.4...0.9.5>`_
**Changelog**
- Scope changes with adapter IOLoops and CallbackManager allowing for cleaner, multi-threaded operation
- Add support for Confirm.Select with channel.Channel.confirm_delivery()
- Add examples of delivery confirmation to examples (demo_send_confirmed.py)
- Update uses of log.warn with warning.warn for TCP Back-pressure alerting
- License boilerplate updated to simplify license text in source files
- Increment the timeout in select_connection.SelectPoller reducing CPU utilization
- Bug fix in Heartbeat frame delivery addressing issue #35
- Remove abuse of pika.log.method_call through a majority of the code
- Rename of key modules: table to data, frames to frame
- Cleanup of frame module and related classes
- Restructure of tests and test runner
- Update functional tests to respect RABBITMQ_HOST, RABBITMQ_PORT environment variables
- Bug fixes to reconnection_strategies module
- Fix the scale of timeout for PollPoller to be specified in milliseconds
- Remove mutable default arguments in RPC calls
- Add data type validation to RPC calls
- Move optional credentials erasing out of connection.Connection into credentials module
- Add support to allow for additional external credential types
- Add a NullHandler to prevent the 'No handlers could be found for logger "pika"' error message when not using pika.log in a client app at all.
- Clean up all examples to make them easier to read and use
- Move documentation into its own repository https://github.com/pika/documentation
- channel.py
- Move channel.MAX_CHANNELS constant from connection.CHANNEL_MAX
- Add default value of None to ChannelTransport.rpc
- Validate callback and acceptable replies parameters in ChannelTransport.RPC
- Remove unused connection attribute from Channel
- connection.py
- Remove unused import of struct
- Remove direct import of pika.credentials.PlainCredentials
- Change to import pika.credentials
- Move CHANNEL_MAX to channel.MAX_CHANNELS
- Change ConnectionParameters initialization parameter heartbeat to boolean
- Validate all inbound parameter types in ConnectionParameters
- Remove the Connection._erase_credentials stub method in favor of letting the Credentials object deal with that itself.
- Warn if the credentials object intends on erasing the credentials and a reconnection strategy other than NullReconnectionStrategy is specified.
- Change the default types for callback and acceptable_replies in Connection._rpc
- Validate the callback and acceptable_replies data types in Connection._rpc
- adapters.blocking_connection.BlockingConnection
- Addition of _adapter_disconnect to blocking_connection.BlockingConnection
- Add timeout methods to BlockingConnection addressing issue #41
- BlockingConnection didn't allow you register more than one consumer callback because basic_consume was overridden to block immediately. New behavior allows you to do so.
- Removed overriding of base basic_consume and basic_cancel methods. Now uses underlying Channel versions of those methods.
- Added start_consuming() method to BlockingChannel to start the consumption loop.
- Updated stop_consuming() to iterate through all the registered consumers in self._consumers and issue a basic_cancel.

View File

@ -0,0 +1,68 @@
# Contributing
## Test Coverage
To contribute to Pika, please make sure that any new features or changes
to existing functionality **include test coverage**.
*Pull requests that add or change code without coverage have a much lower chance
of being accepted.*
## Prerequisites
Pika test suite has a couple of requirements:
* Dependencies from `test-dependencies.txt` are installed
* A RabbitMQ node with all defaults is running on `localhost:5672`
## Installing Dependencies
To install the dependencies needed to run Pika tests, use
pip install -r test-requirements.txt
which on Python 3 might look like this
pip3 install -r test-requirements.txt
## Running Tests
To run all test suites, use
nosetests
Note that some tests are OS-specific (e.g. epoll on Linux
or kqueue on MacOS and BSD). Those will be skipped
automatically.
If you would like to run TLS/SSL tests, use the following procedure:
* Create a `rabbitmq.conf` file:
```
sed -e "s#PIKA_DIR#$PWD#g" ./testdata/rabbitmq.conf.in > ./testdata/rabbitmq.conf
```
* Start RabbitMQ and use the configuration file you just created. An example command
that works with the `generic-unix` package is as follows:
```
$ RABBITMQ_CONFIG_FILE=/path/to/pika/testdata/rabbitmq.conf ./sbin/rabbitmq-server
```
* Run the tests indicating that TLS/SSL connections should be used:
```
PIKA_TEST_TLS=true nosetests
```
## Code Formatting
Please format your code using [yapf](http://pypi.python.org/pypi/yapf)
with ``google`` style prior to issuing your pull request. *Note: only format those
lines that you have changed in your pull request. If you format an entire file and
change code outside of the scope of your PR, it will likely be rejected.*

View File

@ -0,0 +1,25 @@
Copyright (c) 2009-2017, Tony Garnock-Jones, Gavin M. Roy, Pivotal and others.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the Pika project nor the names of its contributors may be used
to endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,2 @@
include LICENSE
include README.rst

View File

@ -0,0 +1,157 @@
Pika
====
Pika is a RabbitMQ (AMQP-0-9-1) client library for Python.
|Version| |Python versions| |Status| |Coverage| |License| |Docs|
Introduction
-------------
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including RabbitMQ's
extensions.
- Python 2.7 and 3.4+ are supported.
- Since threads aren't appropriate to every situation, it doesn't
require threads. It takes care not to forbid them, either. The same
goes for greenlets, callbacks, continuations and generators. It is
not necessarily thread-safe however, and your mileage will vary.
- People may be using direct sockets, plain old `select()`,
or any of the wide variety of ways of getting network events to and from a
python application. Pika tries to stay compatible with all of these, and to
make adapting it to a new environment as simple as possible.
Documentation
-------------
Pika's documentation can be found at `https://pika.readthedocs.io <https://pika.readthedocs.io>`_
Example
-------
Here is the most simple example of use, sending a message with the BlockingConnection adapter:
.. code :: python
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_publish(exchange='example',
routing_key='test',
body='Test Message')
connection.close()
And an example of writing a blocking consumer:
.. code :: python
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
for method_frame, properties, body in channel.consume('test'):
# Display the message parts and ack the message
print(method_frame, properties, body)
channel.basic_ack(method_frame.delivery_tag)
# Escape out of the loop after 10 messages
if method_frame.delivery_tag == 10:
break
# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
connection.close()
Pika provides the following adapters
------------------------------------
- AsyncioConnection - adapter for the Python3 AsyncIO event loop
- BlockingConnection - enables blocking, synchronous operation on top of library for simple uses
- SelectConnection - fast asynchronous adapter
- TornadoConnection - adapter for use with the Tornado IO Loop http://tornadoweb.org
- TwistedConnection - adapter for use with the Twisted asynchronous package http://twistedmatrix.com/
Requesting message ACKs from another thread
-------------------------------------------
The single-threaded usage constraint of an individual Pika connection adapter
instance may result in a dropped AMQP/stream connection due to AMQP heartbeat
timeout in consumers that take a long time to process an incoming message. A
common solution is to delegate processing of the incoming messages to another
thread, while the connection adapter's thread continues to service its ioloop's
message pump, permitting AMQP heartbeats and other I/O to be serviced in a
timely fashion.
Messages processed in another thread may not be ACK'ed directly from that thread,
since all accesses to the connection adapter instance must be from a single
thread - the thread that is running the adapter's ioloop. However, this may be
accomplished by requesting a callback to be executed in the adapter's ioloop
thread. For example, the callback function's implementation might look like this:
.. code :: python
def ack_message(channel, delivery_tag):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
The code running in the other thread may request the `ack_message()` function
to be executed in the connection adapter's ioloop thread using an
adapter-specific mechanism:
- :py:class:`pika.BlockingConnection` abstracts its ioloop from the application
and thus exposes :py:meth:`pika.BlockingConnection.add_callback_threadsafe()`.
Refer to this method's docstring for additional information. For example:
.. code :: python
connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))
- When using a non-blocking connection adapter, such as
:py:class:`pika.adapters.asyncio_connection.AsyncioConnection` or
:py:class:`pika.SelectConnection`, you use the underlying asynchronous
framework's native API for requesting an ioloop-bound callback from
another thread. For example, `SelectConnection`'s `IOLoop` provides
`add_callback_threadsafe()`, `Tornado`'s `IOLoop` has
`add_callback()`, while `asyncio`'s event loop exposes
`call_soon_threadsafe()`.
This threadsafe callback request mechanism may also be used to delegate
publishing of messages, etc., from a background thread to the connection adapter's
thread.
Contributing
------------
To contribute to pika, please make sure that any new features or changes
to existing functionality **include test coverage**.
*Pull requests that add or change code without coverage will most likely be rejected.*
Additionally, please format your code using `yapf <http://pypi.python.org/pypi/yapf>`_
with ``google`` style prior to issuing your pull request. *Note: only format those
lines that you have changed in your pull request. If you format an entire file and
change code outside of the scope of your PR, it will likely be rejected.*
.. |Version| image:: https://img.shields.io/pypi/v/pika.svg?
:target: http://badge.fury.io/py/pika
.. |Python versions| image:: https://img.shields.io/pypi/pyversions/pika.svg
:target: https://pypi.python.org/pypi/pika
.. |Status| image:: https://img.shields.io/travis/pika/pika.svg?
:target: https://travis-ci.org/pika/pika
.. |Coverage| image:: https://img.shields.io/codecov/c/github/pika/pika.svg?
:target: https://codecov.io/github/pika/pika?branch=master
.. |License| image:: https://img.shields.io/pypi/l/pika.svg?
:target: https://pika.readthedocs.io
.. |Docs| image:: https://readthedocs.org/projects/pika/badge/?version=stable
:target: https://pika.readthedocs.io
:alt: Documentation Status

View File

@ -0,0 +1,107 @@
# Windows build and test of Pika
environment:
erlang_download_url: "http://erlang.org/download/otp_win64_19.3.exe"
erlang_exe_path: "C:\\Users\\appveyor\\erlang_19.3.exe"
erlang_home_dir: "C:\\Users\\appveyor\\erlang"
erlang_erts_version: "erts-8.3"
rabbitmq_version: 3.7.4
rabbitmq_installer_download_url: "https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.4/rabbitmq-server-3.7.4.exe"
rabbitmq_installer_path: "C:\\Users\\appveyor\\rabbitmq-server-3.7.4.exe"
matrix:
- PYTHON_ARCH: "32"
PYTHONHOME: "C:\\Python27"
PIKA_TEST_TLS: false
- PYTHON_ARCH: "32"
PYTHONHOME: "C:\\Python27"
PIKA_TEST_TLS: true
cache:
# RabbitMQ is a pretty big package, so caching it in hopes of expediting the
# runtime
- "%erlang_exe_path%"
- "%rabbitmq_installer_path%"
install:
- SET PYTHONPATH=%PYTHONHOME%
- SET PATH=%PYTHONHOME%\Scripts;%PYTHONHOME%;%PATH%
# For diagnostics
- ECHO %PYTHONPATH%
- ECHO %PATH%
- python --version
- ECHO Upgrading pip...
- python -m pip install --upgrade pip setuptools
- pip --version
- ECHO Installing wheel...
- pip install wheel
build_script:
- ECHO Building distributions...
- python setup.py sdist bdist bdist_wheel
- DIR /s *.whl
artifacts:
- path: 'dist\*.whl'
name: pika wheel
before_test:
# Install test requirements
- ECHO Installing pika...
- python setup.py install
- ECHO Installing pika test requirements...
- pip install -r test-requirements.txt
# List conents of C:\ to help debug caching of rabbitmq artifacts
# - DIR C:\
- ps: $webclient=New-Object System.Net.WebClient
- ECHO Downloading Erlang...
- ps: if (-Not (Test-Path "$env:erlang_exe_path")) { $webclient.DownloadFile("$env:erlang_download_url", "$env:erlang_exe_path") } else { Write-Host "Found" $env:erlang_exe_path "in cache." }
- ECHO Installing Erlang...
- start /B /WAIT %erlang_exe_path% /S /D=%erlang_home_dir%
- set ERLANG_HOME=%erlang_home_dir%
- ECHO Downloading RabbitMQ...
- ps: if (-Not (Test-Path "$env:rabbitmq_installer_path")) { $webclient.DownloadFile("$env:rabbitmq_installer_download_url", "$env:rabbitmq_installer_path") } else { Write-Host "Found" $env:rabbitmq_installer_path "in cache." }
- ECHO Creating directory %AppData%\RabbitMQ...
- ps: New-Item -ItemType Directory -ErrorAction Continue -Path "$env:AppData/RabbitMQ"
- ECHO Creating RabbitMQ configuration file in %AppData%\RabbitMQ...
- ps: Get-Content C:/Projects/pika/testdata/rabbitmq.conf.in | %{ $_ -replace 'PIKA_DIR', 'C:/projects/pika' } | Set-Content -Path "$env:AppData/RabbitMQ/rabbitmq.conf"
- ps: Get-Content "$env:AppData/RabbitMQ/rabbitmq.conf"
- ECHO Creating Erlang cookie files...
- ps: '[System.IO.File]::WriteAllText("C:\Users\appveyor\.erlang.cookie", "PIKAISTHEBEST", [System.Text.Encoding]::ASCII)'
- ps: '[System.IO.File]::WriteAllText("C:\Windows\System32\config\systemprofile\.erlang.cookie", "PIKAISTHEBEST", [System.Text.Encoding]::ASCII)'
- ECHO Installing and starting RabbitMQ with default config...
- start /B /WAIT %rabbitmq_installer_path% /S
- ps: (Get-Service -Name RabbitMQ).Status
- ECHO Waiting for epmd to report that RabbitMQ has started...
- ps: 'C:\projects\pika\testdata\wait-epmd.ps1'
- ps: 'C:\projects\pika\testdata\wait-rabbitmq.ps1'
- ECHO Getting RabbitMQ status...
- cmd /c "C:\Program Files\RabbitMQ Server\rabbitmq_server-%rabbitmq_version%\sbin\rabbitmqctl.bat" status
test_script:
- nosetests
# Since Pika is source-only there's no need to deploy from Windows
deploy: false

View File

@ -0,0 +1,153 @@
# Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
# the i18n builder cannot share the environment and doctrees with the others
I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext
help:
@echo "Please use \`make <target>' where <target> is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " singlehtml to make a single large HTML file"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " devhelp to make HTML files and a Devhelp project"
@echo " epub to make an epub"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " latexpdf to make LaTeX files and run them through pdflatex"
@echo " text to make text files"
@echo " man to make manual pages"
@echo " texinfo to make Texinfo files"
@echo " info to make Texinfo files and run them through makeinfo"
@echo " gettext to make PO message catalogs"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
clean:
-rm -rf $(BUILDDIR)/*
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
singlehtml:
$(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
@echo
@echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/pika.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/pika.qhc"
devhelp:
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
@echo
@echo "Build finished."
@echo "To view the help file:"
@echo "# mkdir -p $$HOME/.local/share/devhelp/pika"
@echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/pika"
@echo "# devhelp"
epub:
$(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
@echo
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make' in that directory to run these through (pdf)latex" \
"(use \`make latexpdf' here to do that automatically)."
latexpdf:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through pdflatex..."
$(MAKE) -C $(BUILDDIR)/latex all-pdf
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
text:
$(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
@echo
@echo "Build finished. The text files are in $(BUILDDIR)/text."
man:
$(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
@echo
@echo "Build finished. The manual pages are in $(BUILDDIR)/man."
texinfo:
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo
@echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo."
@echo "Run \`make' in that directory to run these through makeinfo" \
"(use \`make info' here to do that automatically)."
info:
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo "Running Texinfo files through makeinfo..."
make -C $(BUILDDIR)/texinfo info
@echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo."
gettext:
$(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale
@echo
@echo "Build finished. The message catalogs are in $(BUILDDIR)/locale."
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."

View File

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
import sys
sys.path.insert(0, '../')
#needs_sphinx = '1.0'
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.viewcode',
'sphinx.ext.intersphinx']
intersphinx_mapping = {'python': ('https://docs.python.org/3/',
'https://docs.python.org/3/objects.inv'),
'tornado': ('http://www.tornadoweb.org/en/stable/',
'http://www.tornadoweb.org/en/stable/objects.inv')}
templates_path = ['_templates']
source_suffix = '.rst'
master_doc = 'index'
project = 'pika'
copyright = '2009-2017, Tony Garnock-Jones, Gavin M. Roy, Pivotal Software, Inc and contributors.'
import pika
release = pika.__version__
version = '.'.join(release.split('.')[0:1])
exclude_patterns = ['_build']
add_function_parentheses = True
add_module_names = True
show_authors = True
pygments_style = 'sphinx'
modindex_common_prefix = ['pika']
html_theme = 'default'
html_static_path = ['_static']
htmlhelp_basename = 'pikadoc'

View File

@ -0,0 +1,104 @@
Contributors
============
The following people have directly contributes code by way of new features and/or bug fixes to Pika:
- Gavin M. Roy
- Tony Garnock-Jones
- Vitaly Kruglikov
- Michael Laing
- Marek Majkowski
- Jan Urbański
- Brian K. Jones
- Ask Solem
- ml
- Will
- atatsu
- Fredrik Svensson
- Pedro Abranches
- Kyösti Herrala
- Erik Andersson
- Charles Law
- Alex Chandel
- Tristan Penman
- Raphaël De Giusti
- Jozef Van Eenbergen
- Josh Braegger
- Jason J. W. Williams
- James Mutton
- Cenk Alti
- Asko Soukka
- Antti Haapala
- Anton Ryzhov
- cellscape
- cacovsky
- bra-fsn
- ateska
- Roey Berman
- Robert Weidlich
- Riccardo Cirimelli
- Perttu Ranta-aho
- Pau Gargallo
- Kane
- Kamil Kisiel
- Jonty Wareing
- Jonathan Kirsch
- Jacek 'Forger' Całusiński
- Garth Williamson
- Erik Olof Gunnar Andersson
- David Strauss
- Anton V. Yanchenko
- Alexey Myasnikov
- Alessandro Tagliapietra
- Adam Flynn
- skftn
- saarni
- pavlobaron
- nonleaf
- markcf
- george y
- eivanov
- bstemshorn
- a-tal
- Yang Yang
- Stuart Longland
- Sigurd Høgsbro
- Sean Dwyer
- Samuel Stauffer
- Roberto Decurnex
- Rikard Hultén
- Richard Boulton
- Ralf Nyren
- Qi Fan
- Peter Magnusson
- Pankrat
- Olivier Le Thanh Duong
- Njal Karevoll
- Milan Skuhra
- Mik Kocikowski
- Michael Kenney
- Mark Unsworth
- Luca Wehrstedt
- Laurent Eschenauer
- Lars van de Kerkhof
- Kyösti Herrala
- Juhyeong Park
- JuhaS
- Josh Hansen
- Jorge Puente Sarrín
- Jeff Tang
- Jeff Fein-Worton
- Jeff
- Hunter Morris
- Guruprasad
- Garrett Cooper
- Frank Slaughter
- Dustin Koupal
- Bjorn Sandberg
- Axel Eirola
- Andrew Smith
- Andrew Grigorev
- Andrew
- Allard Hoeve
- A.Shaposhnikov
*Contributors listed by commit count.*

View File

@ -0,0 +1,23 @@
Usage Examples
==============
Pika has various methods of use, between the synchronous BlockingConnection adapter and the various asynchronous connection adapter. The following examples illustrate the various ways that you can use Pika in your projects.
.. toctree::
:glob:
:maxdepth: 1
examples/using_urlparameters
examples/connecting_async
examples/blocking_basic_get
examples/blocking_consume
examples/blocking_consumer_generator
examples/comparing_publishing_sync_async
examples/blocking_delivery_confirmations
examples/blocking_publish_mandatory
examples/asynchronous_consumer_example
examples/asynchronous_publisher_example
examples/twisted_example
examples/tornado_consumer
examples/tls_mutual_authentication
examples/tls_server_authentication

View File

@ -0,0 +1,357 @@
Asynchronous consumer example
=============================
The following example implements a consumer that will respond to RPC commands sent from RabbitMQ. For example, it will reconnect if RabbitMQ closes the connection and will shutdown if RabbitMQ cancels the consumer or closes the channel. While it may look intimidating, each method is very short and represents a individual actions that a consumer can do.
consumer.py::
# -*- coding: utf-8 -*-
import logging
import pika
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
class ExampleConsumer(object):
"""This is an example consumer that will handle unexpected interactions
with RabbitMQ such as channel and connection closures.
If RabbitMQ closes the connection, it will reopen it. You should
look at the output, as there are limited reasons why the connection may
be closed, which usually are tied to permission related issues or
socket timeouts.
If the channel is closed, it will indicate a problem with one of the
commands that were issued and that should surface in the output as well.
"""
EXCHANGE = 'message'
EXCHANGE_TYPE = 'topic'
QUEUE = 'text'
ROUTING_KEY = 'example.text'
def __init__(self, amqp_url):
"""Create a new instance of the consumer class, passing in the AMQP
URL used to connect to RabbitMQ.
:param str amqp_url: The AMQP url to connect with
"""
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
LOGGER.info('Connecting to %s', self._url)
return pika.SelectConnection(pika.URLParameters(self._url),
self.on_connection_open,
stop_ioloop_on_close=False)
def on_connection_open(self, unused_connection):
"""This method is called by pika once the connection to RabbitMQ has
been established. It passes the handle to the connection object in
case we need it, but in this case, we'll just mark it unused.
:type unused_connection: pika.SelectConnection
"""
LOGGER.info('Connection opened')
self.add_on_connection_close_callback()
self.open_channel()
def add_on_connection_close_callback(self):
"""This method adds an on close callback that will be invoked by pika
when RabbitMQ closes the connection to the publisher unexpectedly.
"""
LOGGER.info('Adding connection close callback')
self._connection.add_on_close_callback(self.on_connection_closed)
def on_connection_closed(self, connection, reply_code, reply_text):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.
:param pika.connection.Connection connection: The closed connection obj
:param int reply_code: The server provided reply_code if given
:param str reply_text: The server provided reply_text if given
"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
self._connection.add_timeout(5, self.reconnect)
def reconnect(self):
"""Will be invoked by the IOLoop timer if the connection is
closed. See the on_connection_closed method.
"""
# This is the old connection IOLoop instance, stop its ioloop
self._connection.ioloop.stop()
if not self._closing:
# Create a new connection
self._connection = self.connect()
# There is now a new connection, needs a new ioloop to run
self._connection.ioloop.start()
def open_channel(self):
"""Open a new channel with RabbitMQ by issuing the Channel.Open RPC
command. When RabbitMQ responds that the channel is open, the
on_channel_open callback will be invoked by pika.
"""
LOGGER.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel):
"""This method is invoked by pika when the channel has been opened.
The channel object is passed in so we can make use of it.
Since the channel is now open, we'll declare the exchange to use.
:param pika.channel.Channel channel: The channel object
"""
LOGGER.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
self.setup_exchange(self.EXCHANGE)
def add_on_channel_close_callback(self):
"""This method tells pika to call the on_channel_closed method if
RabbitMQ unexpectedly closes the channel.
"""
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reply_code, reply_text):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.
:param pika.channel.Channel: The closed channel
:param int reply_code: The numeric reason the channel was closed
:param str reply_text: The text reason the channel was closed
"""
LOGGER.warning('Channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
self._connection.close()
def setup_exchange(self, exchange_name):
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
command. When it is complete, the on_exchange_declareok method will
be invoked by pika.
:param str|unicode exchange_name: The name of the exchange to declare
"""
LOGGER.info('Declaring exchange %s', exchange_name)
self._channel.exchange_declare(self.on_exchange_declareok,
exchange_name,
self.EXCHANGE_TYPE)
def on_exchange_declareok(self, unused_frame):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
command.
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
"""
LOGGER.info('Exchange declared')
self.setup_queue(self.QUEUE)
def setup_queue(self, queue_name):
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
command. When it is complete, the on_queue_declareok method will
be invoked by pika.
:param str|unicode queue_name: The name of the queue to declare.
"""
LOGGER.info('Declaring queue %s', queue_name)
self._channel.queue_declare(self.on_queue_declareok, queue_name)
def on_queue_declareok(self, method_frame):
"""Method invoked by pika when the Queue.Declare RPC call made in
setup_queue has completed. In this method we will bind the queue
and exchange together with the routing key by issuing the Queue.Bind
RPC command. When this command is complete, the on_bindok method will
be invoked by pika.
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
"""
LOGGER.info('Binding %s to %s with %s',
self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
self._channel.queue_bind(self.on_bindok, self.QUEUE,
self.EXCHANGE, self.ROUTING_KEY)
def on_bindok(self, unused_frame):
"""Invoked by pika when the Queue.Bind method has completed. At this
point we will start consuming messages by calling start_consuming
which will invoke the needed RPC commands to start the process.
:param pika.frame.Method unused_frame: The Queue.BindOk response frame
"""
LOGGER.info('Queue bound')
self.start_consuming()
def start_consuming(self):
"""This method sets up the consumer by first calling
add_on_cancel_callback so that the object is notified if RabbitMQ
cancels the consumer. It then issues the Basic.Consume RPC command
which returns the consumer tag that is used to uniquely identify the
consumer with RabbitMQ. We keep the value to use it when we want to
cancel consuming. The on_message method is passed in as a callback pika
will invoke when a message is fully received.
"""
LOGGER.info('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
self._consumer_tag = self._channel.basic_consume(self.on_message,
self.QUEUE)
def add_on_cancel_callback(self):
"""Add a callback that will be invoked if RabbitMQ cancels the consumer
for some reason. If RabbitMQ does cancel the consumer,
on_consumer_cancelled will be invoked by pika.
"""
LOGGER.info('Adding consumer cancellation callback')
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
def on_consumer_cancelled(self, method_frame):
"""Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
receiving messages.
:param pika.frame.Method method_frame: The Basic.Cancel frame
"""
LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
method_frame)
if self._channel:
self._channel.close()
def on_message(self, unused_channel, basic_deliver, properties, body):
"""Invoked by pika when a message is delivered from RabbitMQ. The
channel is passed for your convenience. The basic_deliver object that
is passed in carries the exchange, routing key, delivery tag and
a redelivered flag for the message. The properties passed in is an
instance of BasicProperties with the message properties and the body
is the message that was sent.
:param pika.channel.Channel unused_channel: The channel object
:param pika.Spec.Basic.Deliver: basic_deliver method
:param pika.Spec.BasicProperties: properties
:param str|unicode body: The message body
"""
LOGGER.info('Received message # %s from %s: %s',
basic_deliver.delivery_tag, properties.app_id, body)
self.acknowledge_message(basic_deliver.delivery_tag)
def acknowledge_message(self, delivery_tag):
"""Acknowledge the message delivery from RabbitMQ by sending a
Basic.Ack RPC method for the delivery tag.
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
"""
LOGGER.info('Acknowledging message %s', delivery_tag)
self._channel.basic_ack(delivery_tag)
def stop_consuming(self):
"""Tell RabbitMQ that you would like to stop consuming by sending the
Basic.Cancel RPC command.
"""
if self._channel:
LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
def on_cancelok(self, unused_frame):
"""This method is invoked by pika when RabbitMQ acknowledges the
cancellation of a consumer. At this point we will close the channel.
This will invoke the on_channel_closed method once the channel has been
closed, which will in-turn close the connection.
:param pika.frame.Method unused_frame: The Basic.CancelOk frame
"""
LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
self.close_channel()
def close_channel(self):
"""Call to close the channel with RabbitMQ cleanly by issuing the
Channel.Close RPC command.
"""
LOGGER.info('Closing the channel')
self._channel.close()
def run(self):
"""Run the example consumer by connecting to RabbitMQ and then
starting the IOLoop to block and allow the SelectConnection to operate.
"""
self._connection = self.connect()
self._connection.ioloop.start()
def stop(self):
"""Cleanly shutdown the connection to RabbitMQ by stopping the consumer
with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
will be invoked by pika, which will then closing the channel and
connection. The IOLoop is started again because this method is invoked
when CTRL-C is pressed raising a KeyboardInterrupt exception. This
exception stops the IOLoop which needs to be running for pika to
communicate with RabbitMQ. All of the commands issued prior to starting
the IOLoop will be buffered but not processed.
"""
LOGGER.info('Stopping')
self._closing = True
self.stop_consuming()
self._connection.ioloop.start()
LOGGER.info('Stopped')
def close_connection(self):
"""This method closes the connection to RabbitMQ."""
LOGGER.info('Closing connection')
self._connection.close()
def main():
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F')
try:
example.run()
except KeyboardInterrupt:
example.stop()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,359 @@
Asynchronous publisher example
==============================
The following example implements a publisher that will respond to RPC commands sent from RabbitMQ and uses delivery confirmations. It will reconnect if RabbitMQ closes the connection and will shutdown if RabbitMQ closes the channel. While it may look intimidating, each method is very short and represents a individual actions that a publisher can do.
publisher.py::
# -*- coding: utf-8 -*-
import logging
import pika
import json
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
class ExamplePublisher(object):
"""This is an example publisher that will handle unexpected interactions
with RabbitMQ such as channel and connection closures.
If RabbitMQ closes the connection, it will reopen it. You should
look at the output, as there are limited reasons why the connection may
be closed, which usually are tied to permission related issues or
socket timeouts.
It uses delivery confirmations and illustrates one way to keep track of
messages that have been sent and if they've been confirmed by RabbitMQ.
"""
EXCHANGE = 'message'
EXCHANGE_TYPE = 'topic'
PUBLISH_INTERVAL = 1
QUEUE = 'text'
ROUTING_KEY = 'example.text'
def __init__(self, amqp_url):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
:param str amqp_url: The URL for connecting to RabbitMQ
"""
self._connection = None
self._channel = None
self._deliveries = None
self._acked = None
self._nacked = None
self._message_number = None
self._stopping = False
self._url = amqp_url
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika. If you want the reconnection to work, make
sure you set stop_ioloop_on_close to False, which is not the default
behavior of this adapter.
:rtype: pika.SelectConnection
"""
LOGGER.info('Connecting to %s', self._url)
return pika.SelectConnection(pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_close_callback=self.on_connection_closed,
stop_ioloop_on_close=False)
def on_connection_open(self, unused_connection):
"""This method is called by pika once the connection to RabbitMQ has
been established. It passes the handle to the connection object in
case we need it, but in this case, we'll just mark it unused.
:type unused_connection: pika.SelectConnection
"""
LOGGER.info('Connection opened')
self.open_channel()
def on_connection_closed(self, connection, reply_code, reply_text):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.
:param pika.connection.Connection connection: The closed connection obj
:param int reply_code: The server provided reply_code if given
:param str reply_text: The server provided reply_text if given
"""
self._channel = None
if self._stopping:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
self._connection.add_timeout(5, self._connection.ioloop.stop)
def open_channel(self):
"""This method will open a new channel with RabbitMQ by issuing the
Channel.Open RPC command. When RabbitMQ confirms the channel is open
by sending the Channel.OpenOK RPC reply, the on_channel_open method
will be invoked.
"""
LOGGER.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel):
"""This method is invoked by pika when the channel has been opened.
The channel object is passed in so we can make use of it.
Since the channel is now open, we'll declare the exchange to use.
:param pika.channel.Channel channel: The channel object
"""
LOGGER.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
self.setup_exchange(self.EXCHANGE)
def add_on_channel_close_callback(self):
"""This method tells pika to call the on_channel_closed method if
RabbitMQ unexpectedly closes the channel.
"""
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reply_code, reply_text):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.
:param pika.channel.Channel channel: The closed channel
:param int reply_code: The numeric reason the channel was closed
:param str reply_text: The text reason the channel was closed
"""
LOGGER.warning('Channel was closed: (%s) %s', reply_code, reply_text)
self._channel = None
if not self._stopping:
self._connection.close()
def setup_exchange(self, exchange_name):
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
command. When it is complete, the on_exchange_declareok method will
be invoked by pika.
:param str|unicode exchange_name: The name of the exchange to declare
"""
LOGGER.info('Declaring exchange %s', exchange_name)
self._channel.exchange_declare(self.on_exchange_declareok,
exchange_name,
self.EXCHANGE_TYPE)
def on_exchange_declareok(self, unused_frame):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
command.
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
"""
LOGGER.info('Exchange declared')
self.setup_queue(self.QUEUE)
def setup_queue(self, queue_name):
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
command. When it is complete, the on_queue_declareok method will
be invoked by pika.
:param str|unicode queue_name: The name of the queue to declare.
"""
LOGGER.info('Declaring queue %s', queue_name)
self._channel.queue_declare(self.on_queue_declareok, queue_name)
def on_queue_declareok(self, method_frame):
"""Method invoked by pika when the Queue.Declare RPC call made in
setup_queue has completed. In this method we will bind the queue
and exchange together with the routing key by issuing the Queue.Bind
RPC command. When this command is complete, the on_bindok method will
be invoked by pika.
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
"""
LOGGER.info('Binding %s to %s with %s',
self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
self._channel.queue_bind(self.on_bindok, self.QUEUE,
self.EXCHANGE, self.ROUTING_KEY)
def on_bindok(self, unused_frame):
"""This method is invoked by pika when it receives the Queue.BindOk
response from RabbitMQ. Since we know we're now setup and bound, it's
time to start publishing."""
LOGGER.info('Queue bound')
self.start_publishing()
def start_publishing(self):
"""This method will enable delivery confirmations and schedule the
first message to be sent to RabbitMQ
"""
LOGGER.info('Issuing consumer related RPC commands')
self.enable_delivery_confirmations()
self.schedule_next_message()
def enable_delivery_confirmations(self):
"""Send the Confirm.Select RPC method to RabbitMQ to enable delivery
confirmations on the channel. The only way to turn this off is to close
the channel and create a new one.
When the message is confirmed from RabbitMQ, the
on_delivery_confirmation method will be invoked passing in a Basic.Ack
or Basic.Nack method from RabbitMQ that will indicate which messages it
is confirming or rejecting.
"""
LOGGER.info('Issuing Confirm.Select RPC command')
self._channel.confirm_delivery(self.on_delivery_confirmation)
def on_delivery_confirmation(self, method_frame):
"""Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
command, passing in either a Basic.Ack or Basic.Nack frame with
the delivery tag of the message that was published. The delivery tag
is an integer counter indicating the message number that was sent
on the channel via Basic.Publish. Here we're just doing house keeping
to keep track of stats and remove message numbers that we expect
a delivery confirmation of from the list used to keep track of messages
that are pending confirmation.
:param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame
"""
confirmation_type = method_frame.method.NAME.split('.')[1].lower()
LOGGER.info('Received %s for delivery tag: %i',
confirmation_type,
method_frame.method.delivery_tag)
if confirmation_type == 'ack':
self._acked += 1
elif confirmation_type == 'nack':
self._nacked += 1
self._deliveries.remove(method_frame.method.delivery_tag)
LOGGER.info('Published %i messages, %i have yet to be confirmed, '
'%i were acked and %i were nacked',
self._message_number, len(self._deliveries),
self._acked, self._nacked)
def schedule_next_message(self):
"""If we are not closing our connection to RabbitMQ, schedule another
message to be delivered in PUBLISH_INTERVAL seconds.
"""
LOGGER.info('Scheduling next message for %0.1f seconds',
self.PUBLISH_INTERVAL)
self._connection.add_timeout(self.PUBLISH_INTERVAL,
self.publish_message)
def publish_message(self):
"""If the class is not stopping, publish a message to RabbitMQ,
appending a list of deliveries with the message number that was sent.
This list will be used to check for delivery confirmations in the
on_delivery_confirmations method.
Once the message has been sent, schedule another message to be sent.
The main reason I put scheduling in was just so you can get a good idea
of how the process is flowing by slowing down and speeding up the
delivery intervals by changing the PUBLISH_INTERVAL constant in the
class.
"""
if self._channel is None or not self._channel.is_open:
return
hdrs = {u'مفتاح': u' قيمة',
u'键': u'值',
u'キー': u'値'}
properties = pika.BasicProperties(app_id='example-publisher',
content_type='application/json',
headers=hdrs)
message = u'مفتاح قيمة 键 值 キー 値'
self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY,
json.dumps(message, ensure_ascii=False),
properties)
self._message_number += 1
self._deliveries.append(self._message_number)
LOGGER.info('Published message # %i', self._message_number)
self.schedule_next_message()
def run(self):
"""Run the example code by connecting and then starting the IOLoop.
"""
while not self._stopping:
self._connection = None
self._deliveries = []
self._acked = 0
self._nacked = 0
self._message_number = 0
try:
self._connection = self.connect()
self._connection.ioloop.start()
except KeyboardInterrupt:
self.stop()
if (self._connection is not None and
not self._connection.is_closed):
# Finish closing
self._connection.ioloop.start()
LOGGER.info('Stopped')
def stop(self):
"""Stop the example by closing the channel and connection. We
set a flag here so that we stop scheduling new messages to be
published. The IOLoop is started because this method is
invoked by the Try/Catch below when KeyboardInterrupt is caught.
Starting the IOLoop again will allow the publisher to cleanly
disconnect from RabbitMQ.
"""
LOGGER.info('Stopping')
self._stopping = True
self.close_channel()
self.close_connection()
def close_channel(self):
"""Invoke this command to close the channel with RabbitMQ by sending
the Channel.Close RPC command.
"""
if self._channel is not None:
LOGGER.info('Closing the channel')
self._channel.close()
def close_connection(self):
"""This method closes the connection to RabbitMQ."""
if self._connection is not None:
LOGGER.info('Closing connection')
self._connection.close()
def main():
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
# Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
example = ExamplePublisher('amqp://guest:guest@localhost:5672/%2F?connection_attempts=3&heartbeat_interval=3600')
example.run()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,355 @@
Asyncio Consumer
================
The following example implements a consumer using the
:class:`Asyncio adapter <pika.adapters.asyncio_connection.AsyncioConnection>` for the
`Asyncio library <https://docs.python.org/3/library/asyncio.html>`_ that will respond to RPC commands sent
from RabbitMQ. For example, it will reconnect if RabbitMQ closes the connection and will shutdown if
RabbitMQ cancels the consumer or closes the channel. While it may look intimidating, each method is
very short and represents a individual actions that a consumer can do.
consumer.py::
from pika import adapters
import pika
import logging
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
class ExampleConsumer(object):
"""This is an example consumer that will handle unexpected interactions
with RabbitMQ such as channel and connection closures.
If RabbitMQ closes the connection, it will reopen it. You should
look at the output, as there are limited reasons why the connection may
be closed, which usually are tied to permission related issues or
socket timeouts.
If the channel is closed, it will indicate a problem with one of the
commands that were issued and that should surface in the output as well.
"""
EXCHANGE = 'message'
EXCHANGE_TYPE = 'topic'
QUEUE = 'text'
ROUTING_KEY = 'example.text'
def __init__(self, amqp_url):
"""Create a new instance of the consumer class, passing in the AMQP
URL used to connect to RabbitMQ.
:param str amqp_url: The AMQP url to connect with
"""
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
LOGGER.info('Connecting to %s', self._url)
return adapters.asyncio_connection.AsyncioConnection(pika.URLParameters(self._url),
self.on_connection_open)
def close_connection(self):
"""This method closes the connection to RabbitMQ."""
LOGGER.info('Closing connection')
self._connection.close()
def add_on_connection_close_callback(self):
"""This method adds an on close callback that will be invoked by pika
when RabbitMQ closes the connection to the publisher unexpectedly.
"""
LOGGER.info('Adding connection close callback')
self._connection.add_on_close_callback(self.on_connection_closed)
def on_connection_closed(self, connection, reply_code, reply_text):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.
:param pika.connection.Connection connection: The closed connection obj
:param int reply_code: The server provided reply_code if given
:param str reply_text: The server provided reply_text if given
"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
self._connection.add_timeout(5, self.reconnect)
def on_connection_open(self, unused_connection):
"""This method is called by pika once the connection to RabbitMQ has
been established. It passes the handle to the connection object in
case we need it, but in this case, we'll just mark it unused.
:type unused_connection: pika.SelectConnection
"""
LOGGER.info('Connection opened')
self.add_on_connection_close_callback()
self.open_channel()
def reconnect(self):
"""Will be invoked by the IOLoop timer if the connection is
closed. See the on_connection_closed method.
"""
if not self._closing:
# Create a new connection
self._connection = self.connect()
def add_on_channel_close_callback(self):
"""This method tells pika to call the on_channel_closed method if
RabbitMQ unexpectedly closes the channel.
"""
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reply_code, reply_text):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.
:param pika.channel.Channel: The closed channel
:param int reply_code: The numeric reason the channel was closed
:param str reply_text: The text reason the channel was closed
"""
LOGGER.warning('Channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
self._connection.close()
def on_channel_open(self, channel):
"""This method is invoked by pika when the channel has been opened.
The channel object is passed in so we can make use of it.
Since the channel is now open, we'll declare the exchange to use.
:param pika.channel.Channel channel: The channel object
"""
LOGGER.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
self.setup_exchange(self.EXCHANGE)
def setup_exchange(self, exchange_name):
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
command. When it is complete, the on_exchange_declareok method will
be invoked by pika.
:param str|unicode exchange_name: The name of the exchange to declare
"""
LOGGER.info('Declaring exchange %s', exchange_name)
self._channel.exchange_declare(self.on_exchange_declareok,
exchange_name,
self.EXCHANGE_TYPE)
def on_exchange_declareok(self, unused_frame):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
command.
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
"""
LOGGER.info('Exchange declared')
self.setup_queue(self.QUEUE)
def setup_queue(self, queue_name):
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
command. When it is complete, the on_queue_declareok method will
be invoked by pika.
:param str|unicode queue_name: The name of the queue to declare.
"""
LOGGER.info('Declaring queue %s', queue_name)
self._channel.queue_declare(self.on_queue_declareok, queue_name)
def on_queue_declareok(self, method_frame):
"""Method invoked by pika when the Queue.Declare RPC call made in
setup_queue has completed. In this method we will bind the queue
and exchange together with the routing key by issuing the Queue.Bind
RPC command. When this command is complete, the on_bindok method will
be invoked by pika.
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
"""
LOGGER.info('Binding %s to %s with %s',
self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
self._channel.queue_bind(self.on_bindok, self.QUEUE,
self.EXCHANGE, self.ROUTING_KEY)
def add_on_cancel_callback(self):
"""Add a callback that will be invoked if RabbitMQ cancels the consumer
for some reason. If RabbitMQ does cancel the consumer,
on_consumer_cancelled will be invoked by pika.
"""
LOGGER.info('Adding consumer cancellation callback')
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
def on_consumer_cancelled(self, method_frame):
"""Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
receiving messages.
:param pika.frame.Method method_frame: The Basic.Cancel frame
"""
LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
method_frame)
if self._channel:
self._channel.close()
def acknowledge_message(self, delivery_tag):
"""Acknowledge the message delivery from RabbitMQ by sending a
Basic.Ack RPC method for the delivery tag.
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
"""
LOGGER.info('Acknowledging message %s', delivery_tag)
self._channel.basic_ack(delivery_tag)
def on_message(self, unused_channel, basic_deliver, properties, body):
"""Invoked by pika when a message is delivered from RabbitMQ. The
channel is passed for your convenience. The basic_deliver object that
is passed in carries the exchange, routing key, delivery tag and
a redelivered flag for the message. The properties passed in is an
instance of BasicProperties with the message properties and the body
is the message that was sent.
:param pika.channel.Channel unused_channel: The channel object
:param pika.Spec.Basic.Deliver: basic_deliver method
:param pika.Spec.BasicProperties: properties
:param str|unicode body: The message body
"""
LOGGER.info('Received message # %s from %s: %s',
basic_deliver.delivery_tag, properties.app_id, body)
self.acknowledge_message(basic_deliver.delivery_tag)
def on_cancelok(self, unused_frame):
"""This method is invoked by pika when RabbitMQ acknowledges the
cancellation of a consumer. At this point we will close the channel.
This will invoke the on_channel_closed method once the channel has been
closed, which will in-turn close the connection.
:param pika.frame.Method unused_frame: The Basic.CancelOk frame
"""
LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
self.close_channel()
def stop_consuming(self):
"""Tell RabbitMQ that you would like to stop consuming by sending the
Basic.Cancel RPC command.
"""
if self._channel:
LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
def start_consuming(self):
"""This method sets up the consumer by first calling
add_on_cancel_callback so that the object is notified if RabbitMQ
cancels the consumer. It then issues the Basic.Consume RPC command
which returns the consumer tag that is used to uniquely identify the
consumer with RabbitMQ. We keep the value to use it when we want to
cancel consuming. The on_message method is passed in as a callback pika
will invoke when a message is fully received.
"""
LOGGER.info('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
self._consumer_tag = self._channel.basic_consume(self.on_message,
self.QUEUE)
def on_bindok(self, unused_frame):
"""Invoked by pika when the Queue.Bind method has completed. At this
point we will start consuming messages by calling start_consuming
which will invoke the needed RPC commands to start the process.
:param pika.frame.Method unused_frame: The Queue.BindOk response frame
"""
LOGGER.info('Queue bound')
self.start_consuming()
def close_channel(self):
"""Call to close the channel with RabbitMQ cleanly by issuing the
Channel.Close RPC command.
"""
LOGGER.info('Closing the channel')
self._channel.close()
def open_channel(self):
"""Open a new channel with RabbitMQ by issuing the Channel.Open RPC
command. When RabbitMQ responds that the channel is open, the
on_channel_open callback will be invoked by pika.
"""
LOGGER.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def run(self):
"""Run the example consumer by connecting to RabbitMQ and then
starting the IOLoop to block and allow the SelectConnection to operate.
"""
self._connection = self.connect()
self._connection.ioloop.start()
def stop(self):
"""Cleanly shutdown the connection to RabbitMQ by stopping the consumer
with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
will be invoked by pika, which will then closing the channel and
connection. The IOLoop is started again because this method is invoked
when CTRL-C is pressed raising a KeyboardInterrupt exception. This
exception stops the IOLoop which needs to be running for pika to
communicate with RabbitMQ. All of the commands issued prior to starting
the IOLoop will be buffered but not processed.
"""
LOGGER.info('Stopping')
self._closing = True
self.stop_consuming()
self._connection.ioloop.start()
LOGGER.info('Stopped')
def main():
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F')
try:
example.run()
except KeyboardInterrupt:
example.stop()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,23 @@
Using the Blocking Connection to get a message from RabbitMQ
============================================================
.. _example_blocking_basic_get:
The :py:meth:`BlockingChannel.basic_get <pika.adapters.blocking_connection.BlockingChannel.basic_get>` method will return a tuple with the members.
If the server returns a message, the first item in the tuple will be a :class:`pika.spec.Basic.GetOk` object with the current message count, the redelivered flag, the routing key that was used to put the message in the queue, and the exchange the message was published to. The second item will be a :py:class:`~pika.spec.BasicProperties` object and the third will be the message body.
If the server did not return a message a tuple of None, None, None will be returned.
Example of getting a message and acknowledging it::
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
method_frame, header_frame, body = channel.basic_get('test')
if method_frame:
print(method_frame, header_frame, body)
channel.basic_ack(method_frame.delivery_tag)
else:
print('No message returned')

View File

@ -0,0 +1,29 @@
Using the Blocking Connection to consume messages from RabbitMQ
===============================================================
.. _example_blocking_basic_consume:
The :py:meth:`BlockingChannel.basic_consume <pika.adapters.blocking_connection.BlockingChannel.basic_consume>` method assign a callback method to be called every time that RabbitMQ delivers messages to your consuming application.
When pika calls your method, it will pass in the channel, a :py:class:`pika.spec.Basic.Deliver` object with the delivery tag, the redelivered flag, the routing key that was used to put the message in the queue, and the exchange the message was published to. The third argument will be a :py:class:`pika.spec.BasicProperties` object and the last will be the message body.
Example of consuming messages and acknowledging them::
import pika
def on_message(channel, method_frame, header_frame, body):
print(method_frame.delivery_tag)
print(body)
print()
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume(on_message, 'test')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()

View File

@ -0,0 +1,73 @@
Using the BlockingChannel.consume generator to consume messages
===============================================================
.. _example_blocking_basic_get:
The :py:meth:`BlockingChannel.consume <pika.adapters.blocking_connection.BlockingChannel.consume>` method is a generator that will return a tuple of method, properties and body.
When you escape out of the loop, be sure to call consumer.cancel() to return any unprocessed messages.
Example of consuming messages and acknowledging them::
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
# Get ten messages and break out
for method_frame, properties, body in channel.consume('test'):
# Display the message parts
print(method_frame)
print(properties)
print(body)
# Acknowledge the message
channel.basic_ack(method_frame.delivery_tag)
# Escape out of the loop after 10 messages
if method_frame.delivery_tag == 10:
break
# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
# Close the channel and the connection
channel.close()
connection.close()
If you have pending messages in the test queue, your output should look something like::
(pika)gmr-0x02:pika gmr$ python blocking_nack.py
<Basic.Deliver(['consumer_tag=ctag1.0', 'redelivered=True', 'routing_key=test', 'delivery_tag=1', 'exchange=test'])>
<BasicProperties(['delivery_mode=1', 'content_type=text/plain'])>
Hello World!
<Basic.Deliver(['consumer_tag=ctag1.0', 'redelivered=True', 'routing_key=test', 'delivery_tag=2', 'exchange=test'])>
<BasicProperties(['delivery_mode=1', 'content_type=text/plain'])>
Hello World!
<Basic.Deliver(['consumer_tag=ctag1.0', 'redelivered=True', 'routing_key=test', 'delivery_tag=3', 'exchange=test'])>
<BasicProperties(['delivery_mode=1', 'content_type=text/plain'])>
Hello World!
<Basic.Deliver(['consumer_tag=ctag1.0', 'redelivered=True', 'routing_key=test', 'delivery_tag=4', 'exchange=test'])>
<BasicProperties(['delivery_mode=1', 'content_type=text/plain'])>
Hello World!
<Basic.Deliver(['consumer_tag=ctag1.0', 'redelivered=True', 'routing_key=test', 'delivery_tag=5', 'exchange=test'])>
<BasicProperties(['delivery_mode=1', 'content_type=text/plain'])>
Hello World!
<Basic.Deliver(['consumer_tag=ctag1.0', 'redelivered=True', 'routing_key=test', 'delivery_tag=6', 'exchange=test'])>
<BasicProperties(['delivery_mode=1', 'content_type=text/plain'])>
Hello World!
<Basic.Deliver(['consumer_tag=ctag1.0', 'redelivered=True', 'routing_key=test', 'delivery_tag=7', 'exchange=test'])>
<BasicProperties(['delivery_mode=1', 'content_type=text/plain'])>
Hello World!
<Basic.Deliver(['consumer_tag=ctag1.0', 'redelivered=True', 'routing_key=test', 'delivery_tag=8', 'exchange=test'])>
<BasicProperties(['delivery_mode=1', 'content_type=text/plain'])>
Hello World!
<Basic.Deliver(['consumer_tag=ctag1.0', 'redelivered=True', 'routing_key=test', 'delivery_tag=9', 'exchange=test'])>
<BasicProperties(['delivery_mode=1', 'content_type=text/plain'])>
Hello World!
<Basic.Deliver(['consumer_tag=ctag1.0', 'redelivered=True', 'routing_key=test', 'delivery_tag=10', 'exchange=test'])>
<BasicProperties(['delivery_mode=1', 'content_type=text/plain'])>
Hello World!
Requeued 1894 messages

View File

@ -0,0 +1,28 @@
Using Delivery Confirmations with the BlockingConnection
========================================================
The following code demonstrates how to turn on delivery confirmations with the BlockingConnection and how to check for confirmation from RabbitMQ::
import pika
# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection()
# Open the channel
channel = connection.channel()
# Declare the queue
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)
# Turn on delivery confirmations
channel.confirm_delivery()
# Send a message
if channel.basic_publish(exchange='test',
routing_key='test',
body='Hello World!',
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=1)):
print('Message publish was confirmed')
else:
print('Message could not be confirmed')

View File

@ -0,0 +1,29 @@
Ensuring message delivery with the mandatory flag
=================================================
The following example demonstrates how to check if a message is delivered by setting the mandatory flag and checking the return result when using the BlockingConnection::
import pika
# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection()
# Open the channel
channel = connection.channel()
# Declare the queue
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)
# Enabled delivery confirmations
channel.confirm_delivery()
# Send a message
if channel.basic_publish(exchange='test',
routing_key='test',
body='Hello World!',
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=1),
mandatory=True):
print('Message was published')
else:
print('Message was returned')

View File

@ -0,0 +1,64 @@
Comparing Message Publishing with BlockingConnection and SelectConnection
=========================================================================
For those doing simple, non-asynchronous programming, :py:meth:`pika.adapters.blocking_connection.BlockingConnection` proves to be the easiest way to get up and running with Pika to publish messages.
In the following example, a connection is made to RabbitMQ listening to port *5672* on *localhost* using the username *guest* and password *guest* and virtual host */*. Once connected, a channel is opened and a message is published to the *test_exchange* exchange using the *test_routing_key* routing key. The BasicProperties value passed in sets the message to delivery mode *1* (non-persisted) with a content-type of *text/plain*. Once the message is published, the connection is closed::
import pika
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_publish('test_exchange',
'test_routing_key',
'message body value',
pika.BasicProperties(content_type='text/plain',
delivery_mode=1))
connection.close()
In contrast, using :py:meth:`pika.adapters.select_connection.SelectConnection` and the other asynchronous adapters is more complicated and less pythonic, but when used with other asynchronous services can have tremendous performance improvements. In the following code example, all of the same parameters and values are used as were used in the previous example::
import pika
# Step #3
def on_open(connection):
connection.channel(on_channel_open)
# Step #4
def on_channel_open(channel):
channel.basic_publish('test_exchange',
'test_routing_key',
'message body value',
pika.BasicProperties(content_type='text/plain',
delivery_mode=1))
connection.close()
# Step #1: Connect to RabbitMQ
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
on_open_callback=on_open)
try:
# Step #2 - Block on the IOLoop
connection.ioloop.start()
# Catch a Keyboard Interrupt to make sure that the connection is closed cleanly
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Start the IOLoop again so Pika can communicate, it will stop on its own when the connection is closed
connection.ioloop.start()

View File

@ -0,0 +1,49 @@
Connecting to RabbitMQ with Callback-Passing Style
==================================================
When you connect to RabbitMQ with an asynchronous adapter, you are writing event
oriented code. The connection adapter will block on the IOLoop that is watching
to see when pika should read data from and write data to RabbitMQ. Because you're
now blocking on the IOLoop, you will receive callback notifications when specific
events happen.
Example Code
------------
In the example, there are three steps that take place:
1. Setup the connection to RabbitMQ
2. Start the IOLoop
3. Once connected, the on_open method will be called by Pika with a handle to
the connection. In this method, a new channel will be opened on the connection.
4. Once the channel is opened, you can do your other actions, whether they be
publishing messages, consuming messages or other RabbitMQ related activities.::
import pika
# Step #3
def on_open(connection):
connection.channel(on_channel_open)
# Step #4
def on_channel_open(channel):
channel.basic_publish('exchange_name',
'routing_key',
'Test Message',
pika.BasicProperties(content_type='text/plain',
type='example'))
# Step #1: Connect to RabbitMQ
connection = pika.SelectConnection(on_open_callback=on_open)
try:
# Step #2 - Block on the IOLoop
connection.ioloop.start()
# Catch a Keyboard Interrupt to make sure that the connection is closed cleanly
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Start the IOLoop again so Pika can communicate, it will stop on its own when the connection is closed
connection.ioloop.start()

View File

@ -0,0 +1,81 @@
Direct reply-to example
==============================
The following example demonstrates the use of the RabbitMQ "Direct reply-to" feature via `pika.BlockingConnection`. See https://www.rabbitmq.com/direct-reply-to.html for more info about this feature.
direct_reply_to.py::
# -*- coding: utf-8 -*-
"""
This example demonstrates the RabbitMQ "Direct reply-to" usage via
`pika.BlockingConnection`. See https://www.rabbitmq.com/direct-reply-to.html
for more info about this feature.
"""
import pika
SERVER_QUEUE = 'rpc.server.queue'
def main():
""" Here, Client sends "Marco" to RPC Server, and RPC Server replies with
"Polo".
NOTE Normally, the server would be running separately from the client, but
in this very simple example both are running in the same thread and sharing
connection and channel.
"""
with pika.BlockingConnection() as conn:
channel = conn.channel()
# Set up server
channel.queue_declare(queue=SERVER_QUEUE,
exclusive=True,
auto_delete=True)
channel.basic_consume(on_server_rx_rpc_request, queue=SERVER_QUEUE)
# Set up client
# NOTE Client must create its consumer and publish RPC requests on the
# same channel to enable the RabbitMQ broker to make the necessary
# associations.
#
# Also, client must create the consumer *before* starting to publish the
# RPC requests.
#
# Client must create its consumer with no_ack=True, because the reply-to
# queue isn't real.
channel.basic_consume(on_client_rx_reply_from_server,
queue='amq.rabbitmq.reply-to',
no_ack=True)
channel.basic_publish(
exchange='',
routing_key=SERVER_QUEUE,
body='Marco',
properties=pika.BasicProperties(reply_to='amq.rabbitmq.reply-to'))
channel.start_consuming()
def on_server_rx_rpc_request(ch, method_frame, properties, body):
print 'RPC Server got request:', body
ch.basic_publish('', routing_key=properties.reply_to, body='Polo')
ch.basic_ack(delivery_tag=method_frame.delivery_tag)
print 'RPC Server says good bye'
def on_client_rx_reply_from_server(ch, method_frame, properties, body):
print 'RPC Client got reply:', body
# NOTE A real client might want to make additional RPC requests, but in this
# simple example we're closing the channel after getting our first reply
# to force control to return from channel.start_consuming()
print 'RPC Client says bye'
ch.close()

View File

@ -0,0 +1,37 @@
Ensuring well-behaved connection with heartbeat and blocked-connection timeouts
===============================================================================
This example demonstrates explicit setting of heartbeat and blocked connection timeouts.
Starting with RabbitMQ 3.5.5, the broker's default heartbeat timeout decreased from 580 seconds to 60 seconds. As a result, applications that perform lengthy processing in the same thread that also runs their Pika connection may experience unexpected dropped connections due to heartbeat timeout. Here, we specify an explicit lower bound for heartbeat timeout.
When RabbitMQ broker is running out of certain resources, such as memory and disk space, it may block connections that are performing resource-consuming operations, such as publishing messages. Once a connection is blocked, RabbitMQ stops reading from that connection's socket, so no commands from the client will get through to the broker on that connection until the broker unblocks it. A blocked connection may last for an indefinite period of time, stalling the connection and possibly resulting in a hang (e.g., in BlockingConnection) until the connection is unblocked. Blocked Connection Timeout is intended to interrupt (i.e., drop) a connection that has been blocked longer than the given timeout value.
Example of configuring hertbeat and blocked-connection timeouts::
import pika
def main():
# NOTE: These parameters work with all Pika connection types
params = pika.ConnectionParameters(heartbeat_interval=600,
blocked_connection_timeout=300)
conn = pika.BlockingConnection(params)
chan = conn.channel()
chan.basic_publish('', 'my-alphabet-queue', "abc")
# If publish causes the connection to become blocked, then this conn.close()
# would hang until the connection is unblocked, if ever. However, the
# blocked_connection_timeout connection parameter would interrupt the wait,
# resulting in ConnectionClosed exception from BlockingConnection (or the
# on_connection_closed callback call in an asynchronous adapter)
conn.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,61 @@
TLS parameters example
=============================
This examples demonstrates a TLS session with RabbitMQ using mutual authentication.
It was tested against RabbitMQ 3.6.10, using Python 3.6.1 and pre-release Pika `0.11.0`
Note the use of `ssl_version=ssl.PROTOCOL_TLSv1`. The recent verions of RabbitMQ disable older versions of
SSL due to security vulnerabilities.
See https://www.rabbitmq.com/ssl.html for certificate creation and rabbitmq SSL configuration instructions.
tls_example.py::
import ssl
import pika
import logging
logging.basicConfig(level=logging.INFO)
cp = pika.ConnectionParameters(
ssl=True,
ssl_options=dict(
ssl_version=ssl.PROTOCOL_TLSv1,
ca_certs="/Users/me/tls-gen/basic/testca/cacert.pem",
keyfile="/Users/me/tls-gen/basic/client/key.pem",
certfile="/Users/me/tls-gen/basic/client/cert.pem",
cert_reqs=ssl.CERT_REQUIRED))
conn = pika.BlockingConnection(cp)
ch = conn.channel()
print(ch.queue_declare("sslq"))
ch.publish("", "sslq", "abc")
print(ch.basic_get("sslq"))
rabbitmq.config::
%% Both the client and rabbitmq server were running on the same machine, a MacBookPro laptop.
%%
%% rabbitmq.config was created in its default location for OS X: /usr/local/etc/rabbitmq/rabbitmq.config.
%%
%% The contents of the example rabbitmq.config are for demonstration purposes only. See https://www.rabbitmq.com/ssl.html for instructions about creating the test certificates and the contents of rabbitmq.config.
[
{rabbit,
[
{ssl_listeners, [{"127.0.0.1", 5671}]},
%% Configuring SSL.
%% See http://www.rabbitmq.com/ssl.html for full documentation.
%%
{ssl_options, [{cacertfile, "/Users/me/tls-gen/basic/testca/cacert.pem"},
{certfile, "/Users/me/tls-gen/basic/server/cert.pem"},
{keyfile, "/Users/me/tls-gen/basic/server/key.pem"},
{verify, verify_peer},
{fail_if_no_peer_cert, true}]}
]
}
].

View File

@ -0,0 +1,60 @@
TLS parameters example
=============================
This examples demonstrates a TLS session with RabbitMQ using server authentication.
It was tested against RabbitMQ 3.6.10, using Python 3.6.1 and pre-release Pika `0.11.0`
Note the use of `ssl_version=ssl.PROTOCOL_TLSv1`. The recent versions of RabbitMQ disable older versions of
SSL due to security vulnerabilities.
See https://www.rabbitmq.com/ssl.html for certificate creation and rabbitmq SSL configuration instructions.
tls_example.py::
import ssl
import pika
import logging
logging.basicConfig(level=logging.INFO)
cp = pika.ConnectionParameters(
ssl=True,
ssl_options=dict(
ssl_version=ssl.PROTOCOL_TLSv1,
ca_certs="/Users/me/tls-gen/basic/testca/cacert.pem",
cert_reqs=ssl.CERT_REQUIRED))
conn = pika.BlockingConnection(cp)
ch = conn.channel()
print(ch.queue_declare("sslq"))
ch.publish("", "sslq", "abc")
print(ch.basic_get("sslq"))
rabbitmq.config::
%% Both the client and rabbitmq server were running on the same machine, a MacBookPro laptop.
%%
%% rabbitmq.config was created in its default location for OS X: /usr/local/etc/rabbitmq/rabbitmq.config.
%%
%% The contents of the example rabbitmq.config are for demonstration purposes only. See https://www.rabbitmq.com/ssl.html for instructions about creating the test certificates and the contents of rabbitmq.config.
%%
%% Note that the {fail_if_no_peer_cert,false} option, states that RabbitMQ should accept clients that don't have a certificate to send to the broker, but through the {verify,verify_peer} option, we state that if the client does send a certificate to the broker, the broker must be able to establish a chain of trust to it.
[
{rabbit,
[
{ssl_listeners, [{"127.0.0.1", 5671}]},
%% Configuring SSL.
%% See http://www.rabbitmq.com/ssl.html for full documentation.
%%
{ssl_options, [{cacertfile, "/Users/me/tls-gen/basic/testca/cacert.pem"},
{certfile, "/Users/me/tls-gen/basic/server/cert.pem"},
{keyfile, "/Users/me/tls-gen/basic/server/key.pem"},
{verify, verify_peer},
{fail_if_no_peer_cert, false}]}
]
}
].

View File

@ -0,0 +1,349 @@
Tornado Consumer
================
The following example implements a consumer using the :class:`Tornado adapter <pika.adapters.tornado_connection.TornadoConnection>` for the `Tornado framework <http://tornadoweb.org>`_ that will respond to RPC commands sent from RabbitMQ. For example, it will reconnect if RabbitMQ closes the connection and will shutdown if RabbitMQ cancels the consumer or closes the channel. While it may look intimidating, each method is very short and represents a individual actions that a consumer can do.
consumer.py::
from pika import adapters
import pika
import logging
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
class ExampleConsumer(object):
"""This is an example consumer that will handle unexpected interactions
with RabbitMQ such as channel and connection closures.
If RabbitMQ closes the connection, it will reopen it. You should
look at the output, as there are limited reasons why the connection may
be closed, which usually are tied to permission related issues or
socket timeouts.
If the channel is closed, it will indicate a problem with one of the
commands that were issued and that should surface in the output as well.
"""
EXCHANGE = 'message'
EXCHANGE_TYPE = 'topic'
QUEUE = 'text'
ROUTING_KEY = 'example.text'
def __init__(self, amqp_url):
"""Create a new instance of the consumer class, passing in the AMQP
URL used to connect to RabbitMQ.
:param str amqp_url: The AMQP url to connect with
"""
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
LOGGER.info('Connecting to %s', self._url)
return adapters.tornado_connection.TornadoConnection(pika.URLParameters(self._url),
self.on_connection_open)
def close_connection(self):
"""This method closes the connection to RabbitMQ."""
LOGGER.info('Closing connection')
self._connection.close()
def add_on_connection_close_callback(self):
"""This method adds an on close callback that will be invoked by pika
when RabbitMQ closes the connection to the publisher unexpectedly.
"""
LOGGER.info('Adding connection close callback')
self._connection.add_on_close_callback(self.on_connection_closed)
def on_connection_closed(self, connection, reply_code, reply_text):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.
:param pika.connection.Connection connection: The closed connection obj
:param int reply_code: The server provided reply_code if given
:param str reply_text: The server provided reply_text if given
"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
self._connection.add_timeout(5, self.reconnect)
def on_connection_open(self, unused_connection):
"""This method is called by pika once the connection to RabbitMQ has
been established. It passes the handle to the connection object in
case we need it, but in this case, we'll just mark it unused.
:type unused_connection: pika.SelectConnection
"""
LOGGER.info('Connection opened')
self.add_on_connection_close_callback()
self.open_channel()
def reconnect(self):
"""Will be invoked by the IOLoop timer if the connection is
closed. See the on_connection_closed method.
"""
if not self._closing:
# Create a new connection
self._connection = self.connect()
def add_on_channel_close_callback(self):
"""This method tells pika to call the on_channel_closed method if
RabbitMQ unexpectedly closes the channel.
"""
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reply_code, reply_text):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.
:param pika.channel.Channel: The closed channel
:param int reply_code: The numeric reason the channel was closed
:param str reply_text: The text reason the channel was closed
"""
LOGGER.warning('Channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
self._connection.close()
def on_channel_open(self, channel):
"""This method is invoked by pika when the channel has been opened.
The channel object is passed in so we can make use of it.
Since the channel is now open, we'll declare the exchange to use.
:param pika.channel.Channel channel: The channel object
"""
LOGGER.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
self.setup_exchange(self.EXCHANGE)
def setup_exchange(self, exchange_name):
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
command. When it is complete, the on_exchange_declareok method will
be invoked by pika.
:param str|unicode exchange_name: The name of the exchange to declare
"""
LOGGER.info('Declaring exchange %s', exchange_name)
self._channel.exchange_declare(self.on_exchange_declareok,
exchange_name,
self.EXCHANGE_TYPE)
def on_exchange_declareok(self, unused_frame):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
command.
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
"""
LOGGER.info('Exchange declared')
self.setup_queue(self.QUEUE)
def setup_queue(self, queue_name):
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
command. When it is complete, the on_queue_declareok method will
be invoked by pika.
:param str|unicode queue_name: The name of the queue to declare.
"""
LOGGER.info('Declaring queue %s', queue_name)
self._channel.queue_declare(self.on_queue_declareok, queue_name)
def on_queue_declareok(self, method_frame):
"""Method invoked by pika when the Queue.Declare RPC call made in
setup_queue has completed. In this method we will bind the queue
and exchange together with the routing key by issuing the Queue.Bind
RPC command. When this command is complete, the on_bindok method will
be invoked by pika.
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
"""
LOGGER.info('Binding %s to %s with %s',
self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
self._channel.queue_bind(self.on_bindok, self.QUEUE,
self.EXCHANGE, self.ROUTING_KEY)
def add_on_cancel_callback(self):
"""Add a callback that will be invoked if RabbitMQ cancels the consumer
for some reason. If RabbitMQ does cancel the consumer,
on_consumer_cancelled will be invoked by pika.
"""
LOGGER.info('Adding consumer cancellation callback')
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
def on_consumer_cancelled(self, method_frame):
"""Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
receiving messages.
:param pika.frame.Method method_frame: The Basic.Cancel frame
"""
LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
method_frame)
if self._channel:
self._channel.close()
def acknowledge_message(self, delivery_tag):
"""Acknowledge the message delivery from RabbitMQ by sending a
Basic.Ack RPC method for the delivery tag.
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
"""
LOGGER.info('Acknowledging message %s', delivery_tag)
self._channel.basic_ack(delivery_tag)
def on_message(self, unused_channel, basic_deliver, properties, body):
"""Invoked by pika when a message is delivered from RabbitMQ. The
channel is passed for your convenience. The basic_deliver object that
is passed in carries the exchange, routing key, delivery tag and
a redelivered flag for the message. The properties passed in is an
instance of BasicProperties with the message properties and the body
is the message that was sent.
:param pika.channel.Channel unused_channel: The channel object
:param pika.Spec.Basic.Deliver: basic_deliver method
:param pika.Spec.BasicProperties: properties
:param str|unicode body: The message body
"""
LOGGER.info('Received message # %s from %s: %s',
basic_deliver.delivery_tag, properties.app_id, body)
self.acknowledge_message(basic_deliver.delivery_tag)
def on_cancelok(self, unused_frame):
"""This method is invoked by pika when RabbitMQ acknowledges the
cancellation of a consumer. At this point we will close the channel.
This will invoke the on_channel_closed method once the channel has been
closed, which will in-turn close the connection.
:param pika.frame.Method unused_frame: The Basic.CancelOk frame
"""
LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
self.close_channel()
def stop_consuming(self):
"""Tell RabbitMQ that you would like to stop consuming by sending the
Basic.Cancel RPC command.
"""
if self._channel:
LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
def start_consuming(self):
"""This method sets up the consumer by first calling
add_on_cancel_callback so that the object is notified if RabbitMQ
cancels the consumer. It then issues the Basic.Consume RPC command
which returns the consumer tag that is used to uniquely identify the
consumer with RabbitMQ. We keep the value to use it when we want to
cancel consuming. The on_message method is passed in as a callback pika
will invoke when a message is fully received.
"""
LOGGER.info('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
self._consumer_tag = self._channel.basic_consume(self.on_message,
self.QUEUE)
def on_bindok(self, unused_frame):
"""Invoked by pika when the Queue.Bind method has completed. At this
point we will start consuming messages by calling start_consuming
which will invoke the needed RPC commands to start the process.
:param pika.frame.Method unused_frame: The Queue.BindOk response frame
"""
LOGGER.info('Queue bound')
self.start_consuming()
def close_channel(self):
"""Call to close the channel with RabbitMQ cleanly by issuing the
Channel.Close RPC command.
"""
LOGGER.info('Closing the channel')
self._channel.close()
def open_channel(self):
"""Open a new channel with RabbitMQ by issuing the Channel.Open RPC
command. When RabbitMQ responds that the channel is open, the
on_channel_open callback will be invoked by pika.
"""
LOGGER.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def run(self):
"""Run the example consumer by connecting to RabbitMQ and then
starting the IOLoop to block and allow the SelectConnection to operate.
"""
self._connection = self.connect()
self._connection.ioloop.start()
def stop(self):
"""Cleanly shutdown the connection to RabbitMQ by stopping the consumer
with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
will be invoked by pika, which will then closing the channel and
connection. The IOLoop is started again because this method is invoked
when CTRL-C is pressed raising a KeyboardInterrupt exception. This
exception stops the IOLoop which needs to be running for pika to
communicate with RabbitMQ. All of the commands issued prior to starting
the IOLoop will be buffered but not processed.
"""
LOGGER.info('Stopping')
self._closing = True
self.stop_consuming()
self._connection.ioloop.start()
LOGGER.info('Stopped')
def main():
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F')
try:
example.run()
except KeyboardInterrupt:
example.stop()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,49 @@
Twisted Consumer Example
========================
Example of writing a consumer using the :py:class:`Twisted connection adapter <pika.adapters.twisted_connection.TwistedConnection>`::
# -*- coding:utf-8 -*-
import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task
@defer.inlineCallbacks
def run(connection):
channel = yield connection.channel()
exchange = yield channel.exchange_declare(exchange='topic_link', exchange_type='topic')
queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)
yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world')
yield channel.basic_qos(prefetch_count=1)
queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False)
l = task.LoopingCall(read, queue_object)
l.start(0.01)
@defer.inlineCallbacks
def read(queue_object):
ch,method,properties,body = yield queue_object.get()
if body:
print(body)
yield ch.basic_ack(delivery_tag=method.delivery_tag)
parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()

View File

@ -0,0 +1,68 @@
Using URLParameters
===================
Pika has two methods of encapsulating the data that lets it know how to connect
to RabbitMQ, :py:class:`pika.connection.ConnectionParameters` and :py:class:`pika.connection.URLParameters`.
.. note::
If you're connecting to RabbitMQ on localhost on port 5672, with the default virtual host of */* and the default username and password of *guest* and *guest*, you do not need to specify connection parameters when connecting.
Using :py:class:`pika.connection.URLParameters` is an easy way to minimize the
variables required to connect to RabbitMQ and supports all of the directives
that :py:class:`pika.connection.ConnectionParameters` supports.
The following is the format for the URLParameters connection value::
scheme://username:password@host:port/virtual_host?key=value&key=value
As you can see, by default, the scheme (amqp, amqps), username, password, host, port and virtual host make up the core of the URL and any other parameter is passed in as query string values.
Example Connection URLS
-----------------------
The default connection URL connects to the / virtual host as guest using the guest password on localhost port 5672. Note the forwardslash in the URL is encoded to %2F::
amqp://guest:guest@localhost:5672/%2F
Connect to a host *rabbit1* as the user *www-data* using the password *rabbit_pwd* on the virtual host *web_messages*::
amqp://www-data:rabbit_pwd@rabbit1/web_messages
Connecting via SSL is pretty easy too. To connect via SSL for the previous example, simply change the scheme to *amqps*. If you do not specify a port, Pika will use the default SSL port of 5671::
amqps://www-data:rabbit_pwd@rabbit1/web_messages
If you're looking to tweak other parameters, such as enabling heartbeats, simply add the key/value pair as a query string value. The following builds upon the SSL connection, enabling heartbeats every 30 seconds::
amqps://www-data:rabbit_pwd@rabbit1/web_messages?heartbeat=30
Options that are available as query string values:
- backpressure_detection: Pass in a value of *t* to enable backpressure detection, it is disabled by default.
- channel_max: Alter the default channel maximum by passing in a 32-bit integer value here.
- connection_attempts: Alter the default of 1 connection attempt by passing in an integer value here.
- frame_max: Alter the default frame maximum size value by passing in a long integer value [#f1]_.
- heartbeat: Pass a value greater than zero to enable heartbeats between the server and your application. The integer value you pass here will be the number of seconds between heartbeats.
- locale: Set the locale of the client using underscore delimited posix Locale code in ll_CC format (en_US, pt_BR, de_DE).
- retry_delay: The number of seconds to wait before attempting to reconnect on a failed connection, if connection_attempts is > 0.
- socket_timeout: Change the default socket timeout duration from 0.25 seconds to another integer or float value. Adjust with caution.
- ssl_options: A url encoded dict of values for the SSL connection. The available keys are:
- ca_certs
- cert_reqs
- certfile
- keyfile
- ssl_version
For an information on what the ssl_options can be set to reference the `official Python documentation <http://docs.python.org/2/library/ssl.html>`_. Here is an example of setting the client certificate and key::
amqp://www-data:rabbit_pwd@rabbit1/web_messages?heartbeat=30&ssl_options=%7B%27keyfile%27%3A+%27%2Fetc%2Fssl%2Fmykey.pem%27%2C+%27certfile%27%3A+%27%2Fetc%2Fssl%2Fmycert.pem%27%7D
The following example demonstrates how to generate the ssl_options string with `Python's urllib <http://docs.python.org/2/library/urllib.html>`_::
import urllib
urllib.urlencode({'ssl_options': {'certfile': '/etc/ssl/mycert.pem', 'keyfile': '/etc/ssl/mykey.pem'}})
.. rubric:: Footnotes
.. [#f1] The AMQP specification states that a server can reject a request for a frame size larger than the value it passes during content negotiation.

View File

@ -0,0 +1,18 @@
Frequently Asked Questions
--------------------------
- Is Pika thread safe?
Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method `add_callback_threadsafe` from another thread to schedule a callback within an active pika connection.
- How do I report a bug with Pika?
The `main Pika repository <https://github.com/pika/pika>`_ is hosted on `Github <https://github.com>`_ and we use the Issue tracker at `https://github.com/pika/pika/issues <https://github.com/pika/pika/issues>`_.
- Is there a mailing list for Pika?
Yes, Pika's mailing list is available `on Google Groups <https://groups.google.com/forum/?fromgroups#!forum/pika-python>`_ and the email address is pika-python@googlegroups.com, though traditionally questions about Pika have been asked on the `RabbitMQ-Discuss mailing list <http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss>`_.
- How can I contribute to Pika?
You can `fork the project on Github <http://help.github.com/forking/>`_ and issue `Pull Requests <http://help.github.com/pull-requests/>`_ when you believe you have something solid to be added to the main repository.

View File

@ -0,0 +1,37 @@
Introduction to Pika
====================
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.
If you have not developed with Pika or RabbitMQ before, the :doc:`intro` documentation is a good place to get started.
Installing Pika
---------------
Pika is available for download via PyPI and may be installed using easy_install or pip::
pip install pika
or::
easy_install pika
To install from source, run "python setup.py install" in the root source directory.
Using Pika
----------
.. toctree::
:glob:
:maxdepth: 1
intro
modules/index
examples
faq
contributors
version_history
Indices and tables
------------------
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

View File

@ -0,0 +1,125 @@
Introduction to Pika
====================
IO and Event Looping
--------------------
As AMQP is a two-way RPC protocol where the client can send requests to the server and the server can send requests to a client, Pika implements or extends IO loops in each of its asynchronous connection adapters. These IO loops are blocking methods which loop and listen for events. Each asynchronous adapter follows the same standard for invoking the IO loop. The IO loop is created when the connection adapter is created. To start an IO loop for any given adapter, call the ``connection.ioloop.start()`` method.
If you are using an external IO loop such as Tornado's :class:`~tornado.ioloop.IOLoop` you invoke it normally and then add the Pika Tornado adapter to it.
Example::
import pika
def on_open(connection):
# Invoked when the connection is open
pass
# Create our connection object, passing in the on_open method
connection = pika.SelectConnection(on_open_callback=on_open)
try:
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
.. _intro_to_cps:
Continuation-Passing Style
--------------------------
Interfacing with Pika asynchronously is done by passing in callback methods you would like to have invoked when a certain event completes. For example, if you are going to declare a queue, you pass in a method that will be called when the RabbitMQ server returns a `Queue.DeclareOk <http://www.rabbitmq.com/amqp-0-9-1-quickref.html#queue.declare>`_ response.
In our example below we use the following five easy steps:
#. We start by creating our connection object, then starting our event loop.
#. When we are connected, the *on_connected* method is called. In that method we create a channel.
#. When the channel is created, the *on_channel_open* method is called. In that method we declare a queue.
#. When the queue is declared successfully, *on_queue_declared* is called. In that method we call :py:meth:`channel.basic_consume <channel.Channel.basic_consume>` telling it to call the handle_delivery for each message RabbitMQ delivers to us.
#. When RabbitMQ has a message to send us, it calls the handle_delivery method passing the AMQP Method frame, Header frame, and Body.
.. NOTE::
Step #1 is on line #28 and Step #2 is on line #6. This is so that Python knows about the functions we'll call in Steps #2 through #5.
.. _cps_example:
Example::
import pika
# Create a global channel variable to hold our channel object in
channel = None
# Step #2
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
# Open a channel
connection.channel(on_channel_open)
# Step #3
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False, callback=on_queue_declared)
# Step #4
def on_queue_declared(frame):
"""Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
channel.basic_consume(handle_delivery, queue='test')
# Step #5
def handle_delivery(channel, method, header, body):
"""Called when we receive a message from RabbitMQ"""
print(body)
# Step #1: Connect to RabbitMQ using the default parameters
parameters = pika.ConnectionParameters()
connection = pika.SelectConnection(parameters, on_connected)
try:
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
Credentials
-----------
The :mod:`pika.credentials` module provides the mechanism by which you pass the username and password to the :py:class:`ConnectionParameters <pika.connection.ConnectionParameters>` class when it is created.
Example::
import pika
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters(credentials=credentials)
.. _connection_parameters:
Connection Parameters
---------------------
There are two types of connection parameter classes in Pika to allow you to pass the connection information into a connection adapter, :class:`ConnectionParameters <pika.connection.ConnectionParameters>` and :class:`URLParameters <pika.connection.URLParameters>`. Both classes share the same default connection values.
.. _intro_to_backpressure:
TCP Backpressure
----------------
As of RabbitMQ 2.0, client side `Channel.Flow <http://www.rabbitmq.com/amqp-0-9-1-quickref.html#channel.flow>`_ has been removed [#f1]_. Instead, the RabbitMQ broker uses TCP Backpressure to slow your client if it is delivering messages too fast. If you pass in backpressure_detection into your connection parameters, Pika attempts to help you handle this situation by providing a mechanism by which you may be notified if Pika has noticed too many frames have yet to be delivered. By registering a callback function with the :py:meth:`add_backpressure_callback <pika.connection.Connection.add_backpressure_callback>` method of any connection adapter, your function will be called when Pika sees that a backlog of 10 times the average frame size you have been sending has been exceeded. You may tweak the notification multiplier value by calling the :py:meth:`set_backpressure_multiplier <pika.connection.Connection.set_backpressure_multiplier>` method passing any integer value.
Example::
import pika
parameters = pika.URLParameters('amqp://guest:guest@rabbit-server1:5672/%2F?backpressure_detection=t')
.. rubric:: Footnotes
.. [#f1] "more effective flow control mechanism that does not require cooperation from clients and reacts quickly to prevent the broker from exhausting memory - see http://www.rabbitmq.com/extensions.html#memsup" from http://lists.rabbitmq.com/pipermail/rabbitmq-announce/attachments/20100825/2c672695/attachment.txt

View File

@ -0,0 +1,9 @@
asyncio Connection Adapter
==========================
.. automodule:: pika.adapters.asyncio_connection
Be sure to check out the :doc:`asynchronous examples </examples>` including the asyncio specific :doc:`consumer </examples/asyncio_consumer>` example.
.. autoclass:: pika.adapters.asyncio_connection.AsyncioConnection
:members:
:inherited-members:

View File

@ -0,0 +1,13 @@
BlockingConnection
------------------
.. automodule:: pika.adapters.blocking_connection
Be sure to check out examples in :doc:`/examples`.
.. autoclass:: pika.adapters.blocking_connection.BlockingConnection
:members:
:inherited-members:
.. autoclass:: pika.adapters.blocking_connection.BlockingChannel
:members:
:inherited-members:

View File

@ -0,0 +1,15 @@
Connection Adapters
===================
Pika uses connection adapters to provide a flexible method for adapting pika's
core communication to different IOLoop implementations. In addition to asynchronous adapters, there is the :class:`BlockingConnection <pika.adapters.blocking_connection.BlockingConnection>` adapter that provides a more idiomatic procedural approach to using Pika.
Adapters
--------
.. toctree::
:glob:
:maxdepth: 1
blocking
select
tornado
twisted

Some files were not shown because too many files have changed in this diff Show More