>seankerr.dev

These are my experiences in life and the past 2 decades of software development.

Aug 26, 2024

Serializing Celery tasks with orjson

I came across this Celery GitHub issue today. The OP was wondering if it’s currently possible to configure Celery with support for orjson serializer.

As it turns out, it is not possible out of the box. And the reasoning provided is to prevent yet another dependency from being added to the already growing list of Celery dependencies.

The OP is posting the following reasons for wanting orjson support:

Both of these issues have affected my Celery implementations as well. Less of an issue due to performance, because last I checked I am already using Python. 😂

But moreso due to the missing support for datetime and UUID. These are quite common considerations in just about every Python codebase these days.

All jokes aside, I thought I’d share a working example of orjson serialization in Celery. Cheers!

# system imports
from typing import Any

# dependency imports
from celery import Celery
from kombu import serialization

import orjson

def initialize_orjson_serializer(celery: Celery) -> None:
    def deserialize(data: bytes | str) -> tuple[Any, Any, Any]:
        return orjson.loads(data)

    def serialize(data: Any) -> str:
        return orjson.dumps(data).decode("utf-8")

    # accept tasks serialized with the following content types
    celery.conf.accept_content = ["json"]

    # accept results serialized with the following content types
    celery.conf.result_accept_content = ["json"]

    # celery will serialize outgoing tasks with this content type
    celery.conf.task_serializer = "json"

    # register json as a content type
    serialization.register(
        "json",
        serialize,
        deserialize,
        "json",
    )
Aug 26, 2024

Using Celery with an SQS broker

If you know me, you know that I’ve pretty much married Celery and RabbitMQ together ever since I discovered them. They work well in unison for large projects where the cost of RabbitMQ is justified. Sometimes it’s easy to forget that Celery supports other brokers such as Redis and SQS, both of which are excellent choices.

Reliability

Most people know what Redis is, but I feel that SQS isn’t so widely known in the Celery community. It stands for Simple Queue Service, and it’s AWS’ undeniably reliable message queue. And when I say reliable, I’m doing the SQS name a disservice. Here are their words, not mine:

Amazon SQS stores all message queues and messages within a single, highly-available AWS region with multiple redundant Availability Zones (AZs), so that no single computer, network, or AZ failure can make messages inaccessible.

Why use SQS over RabbitMQ or Redis?

All that aside, SQS is a great choice as a Celery broker because there are no servers to maintain, no upfront costs, and no reason to ever consider scaling issues because it’s practically infinite. You can store unlimited SQS messages (tasks in our case) for 14 days at no cost.

Maybe there is a cost…

The only cost that comes to mind is that you can’t run Flower. Without Flower, you won’t be able to see your currently running celery instances or get task insights. These are mission critical requirements for large applications, but are negiotable for smaller ones.

Let’s get down to the nitty gritty…

ACKs late

My recommendation is to enable the acks_late Celery configuration setting. By default, with acks_late disabled, as soon as the task is received, Celery would delete the message from SQS rendering the other important features such as visibility timeout, dead letter queues and redrive policies useless.

Visibility timeout

SQS has the concept of a visibility timeout. This simply means when a message is sent to a consumer, that consumer is the only one that will be able to see the message while the visibility timeout is in place. If the consumer doesn’t delete the message within the visibility timeout, inevitably the message will be sent to another consumer.

In Celery terms, this means you need to do one of two things when you process the task:

This functionality differs from a typical broker installation, wherein a task executes and regardless of the ACK status, the same task will not be delivered to another consumer. ACKing late in that scenario only protects you from losing your task in the event of a worker failure.

In my recent implementation, I found 600 seconds (10 minutes) to be a good visibility timeout. This is enough time to run the task and factors in enough time for all retries as well as potential HTTP timeouts for outbound tasks. If my task fails all retries, Celery will log the exception. Shortly after, the visibility timeout in SQS will expire and the same exact task will run again. This will repeat until the redrive policy kicks in.

Redrive policy and dead letter queues

When you configure the redrive policy for an SQS queue, you specify the maxReceiveCount setting which tells SQS how many times a message can be consumed before it is sent to the policy’s dead letter queue.

If you want SQS to function the same way RabbitMQ and Redis do, you can set the maxReceiveCount setting to 1. If your task completes successfully before the visibility timeout expires, it’ll be deleted from the queue. But if the task does not complete successfully, SQS will move it to the dead letter queue when the visibility timeout expires. From there you can examine the cause of failure, and possibly move messages back into a working queue once you’ve corrected the code causing the failure.

There are various reasons why you would want to raise the maxReceiveCount, but they are for you to decide.

Jan 11, 2024

The OAuth Flows of HighLevel

This guest post first appeared in cbnsndwch.io

HighLevel’s API v2 uses the OAuth 2.0 Authorization Code flow for authentication. This means that in order for your app or integration to make API calls, you’ll need to implement an OAuth client that can receive Authorzation Codes, exchange them for a pair of Access Token + Refresh Token, and save them both to persistent storage for continued use.

See the Authorization page in HighLevel’s API docs for a getting-started guide and a list of all the scopes available.

Understanding the OAuth Flows for Marketplace Apps

The OAuth Authorization Code flow as implemented by HighLevel presents two variants or flavors, depending on whether you’re building a Public App or a Private App.

Public Marketplace Apps

For verified, approved, public Marketplace Apps, the HighLevel web app takes care of most of the heavy lifting. It provides discovery and a UI for installing apps into agencies and/or locations. Here’s what the flow looks like:

Public Marketplace Apps Diagram

Private Marketplace Apps

For private Marketplace Apps — that is, apps that have not been reviewed by HighLevel and will therefore not be listed inside the HighLevel web app — you are responsible for initiating the OAuth flow on your end. You will need to prepare an Authorization URL containing your app’s Client ID and any scopes your app needs, and send the user to it. Here’s what the flow looks like:

Private Marketplace Apps Diagram

OAuth 2.0 FAQs

Highlevel lists a few Frequently Asked Questions (FAQs) on their API docs. I’ve clarified the most critical ones below. I will add more as I answer questions in the Dev Council. Feel free to tag me in there and I’ll do my best to get you an answer.

How long are Access Tokens valid for?

Access Tokens are valid for a day. After that, you can use the Refresh Token to get a new Access Token which will be valid for another day.

How long are Refresh Tokens valid for?

Refresh Tokens are valid for a year or until they are used once, whichever comes first. When you call the /token endpoint with a Refresh Token instead of an Authorization Code, that refresh Token will become invalid and you’ll get a new one in the response. Save the new token in your database or storage service in place of the original one.

Have more questions?

Reach out to me on the Dev Council Slack Channel or on Twitter @cbnsndwch. I’m happy to help!

Nov 17, 2023

Decrypting HighLevel SSO sessions using Python

HighLevel recently rolled out support for single-sign-on access, which allows Agencies and Accounts to connect seamlessly to an installed marketplace application. Because it’s such a new feature, there is no documentation available at this time. But thankfully for me I stumbled across this video by Sergio Leon which explains the entire SSO process from the application developer perspective. But most importantly, it shows how to retrieve the encrypted SSO session from GHL. It also explains how to decrypt it using Javascript (CryptoJS library).

This is where my luck ran out, because I’m not using Javascript on my backend, I’m using Python.

The first thing I did was gather the encryption details from the video and I realized the session was encrypted using AES. That’s all I had to go with. I installed Pycryptodome and away I went. I could safely assume that the session data was base64 encoded, so I decoded it and saw the data in its raw format. This gave away the clue I needed. The raw data was prefixed with Salted__. When working with OpenSSL, this is standard practice. In fact, when you google “Encryption Salted__” you will see endless results talking about decrypting OpenSSL data. It’s also standard practice to use PBKDF2 as the key derivation function. PBKDF2 uses an iteration of the password and random salt to sha256 hash the password, from which the key and initial value are derived. Sadly, that is why I wasted a day trying to finish the task.

I made the worst assumption from the get go. I saw the Salted__ prefix and assumed OpenSSL format. I had to take a step back and find documentation (which doesn’t exist) or find somebody who has first hand experience with this. That person is Sergio Leon, from the video above. He pointed me to a stack overflow post regarding how CryptoJS encrypts AES data, and so I took a look through the CryptoJS source code and noticed something I wasn’t expecting…

var key = EvpKDF.create({ ... }).compute(password, salt);

Ahhh! There it was. That doesn’t look like PBKDF2. It’s EVPKDF! That was the turning point where I realized I was using the wrong key derivation from the beginning. I switched to EVP and instantly my problem was solved.

The crux of the problem is two parts:

These two factors will invariably lead others to make the same assumption I did, which is why I’m writing this post.

And without further ado, here is a working example of decrypting a HighLevel SSO session in Python.

# system imports
from base64 import b64decode
from typing import Tuple

# pycryptodome imports
from Crypto.Cipher import AES
from Crypto.Hash import MD5
from Crypto.Util.Padding import unpad

# encryption details
BLOCK_SIZE = AES.block_size
KEY_SIZE = 32
IV_SIZE = 16
SALT_SIZE = 8

# working data
PASSWORD = "TOPSY KRETT PASSWORD"
ENCRYPTED_DATA = "ENCRYPTED SSO SESSION"

def derive_key_and_iv() -> Tuple[bytes, bytes]:
    result = bytes()

    while len(result) < KEY_SIZE + IV_SIZE:
        hasher = MD5.new()
        hasher.update(result[-IV_SIZE:] + PASSWORD.encode("utf-8") + salt)
        result += hasher.digest()

    return result[:KEY_SIZE], result[KEY_SIZE : KEY_SIZE + IV_SIZE]

# get the raw encrypted data from the base64 encoded string
raw_encrypted_data = b64decode(ENCRYPTED_DATA)

# the first block is "Salted__THESALT", so we extract the salt
salt = raw_encrypted_data[SALT_SIZE:BLOCK_SIZE]

# beginning at the second block is the cipher text
cipher_text = raw_encrypted_data[BLOCK_SIZE:]

# let's do some work
key, iv = derive_key_and_iv()
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(cipher_text)
unpadded = unpad(decrypted, BLOCK_SIZE)

print(unpadded.decode("utf-8"))
Oct 4, 2023

Late to the Tailwind game

I know… I know! I’m late to the Tailwind CSS game. Today’s the day I put it into action on this site, and I’m loving it so far. The support for arbitrary styles where a preset isn’t provided is critical. My hat’s off to you, Tailwind. I’ll continue to use Bootstrap 5 under certain circumstances, but for new projects, hell to the naw.

Did I mention that I’m simultaneously redoing the static generation in Astro too?

Sep 7, 2023

Released celery-sqlalchemy

When it comes to distributed events in Python, Celery shines. It works flawlessly with RabbitMQ allowing you to run a limitless amount of simultaneous tasks across any number of servers.

There is one drawback when using it with SQLAlchemy models though, and that is that Celery doesn’t serialize them out-of-the-box. And even if it did, the data will not be deserialized back into a model on the other end.

I created celery-sqlalchemy to solve this very problem so you can focus on writing code and stop worrying whether or not your data will be correct on the other end.

If you’re using Celery with SQLAlchemy and need to dispatch model tasks then please give celery-sqlalchemy a try. I would appreciate any feedback.

May 31, 2023

Released postgresql-lock

I developed this software specifically for CartHero’s requirements, but wanted to release it to the public since we found it a critical when working at high scale.

The use-case was simple, we are firing hundreds and sometimes thousands of events per second and need to prevent conflicts when writing new records and updating existing ones. There are redis libraries available, but I’m strictly talking about PostgreSQL records. And keeping my locking mechanism as close to the database and with a guaranteed unlock on database connection exit was absolutely critical to our scenario.

If you’re using Python & PostgreSQL and our use-case sounds similar, consider giving postgresql-lock a try.

Mar 31, 2023

Out with the old, in with the new

AppTrends had a great run, but like many great things, it was time to move on. I started AppTrends in 2017 as a private Shopify app. At the time all “private app” meant was that I couldn’t offer my app in the Shopify app store.

Shopify’s demands changed over time until eventually they deprecated private apps in favor of their custom app offering. This gave me the opportunity to fix any shortcomings by developing a new platform with proper Shopify support. Today marks the completion of our user migration to CartHero. There are a few remaining customers who ignored the flurry of notifications about migrating, but we’ll cross that bridge when we get there.

Apr 5, 2022

CartHero development begins

I partnered with Trent Fikes, a long-time AppTrends user and heavy hitter in the e-commerce apparel space, to bring our version of the sales funnel platform to the world. Today mark’s the day we break ground!

Our plan is to design a platform that natively supports customer orders as they should be, which means from start to finish as a single order. This is exactly where other funnel sales platforms fall short, if not a slew of other reasons.

On top of that, send your orders directly to ShipStation/ShipHero, or if needed Shopify. The point though is that tethering users to any single fulfillment platform (I’m looking at you Shopify) is not the way.

Nov 2, 2017

AppTrends launch!

I’m happy to announce after 5 months of development, my first SaaS project. I teamed up with longtime friend Michael Velasquez to put together the best software available in seamless Shopify fulfillment.

Taking orders outside of Shopify isn’t terribly difficult, but batching multiple customer orders and providing post-processing on those orders is, which is where AppTrends steps in. We provide the perfect conduit between your customers and Shopify!