Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Much higher performance with these changes #8

Open
buybackoff opened this issue Mar 6, 2020 · 3 comments
Open

Much higher performance with these changes #8

buybackoff opened this issue Mar 6, 2020 · 3 comments

Comments

@buybackoff
Copy link

buybackoff commented Mar 6, 2020

FYI

  • Do spin with Spinwait, especially given that this queue is not lock-free per author's description.
  • Do not use _bufferMask, but use & _buffer.Length - 1. This probably removes bound checks (already or likely in the future)
  • Use two fields for the buffer, which point to the same buffer. Enqueue and Dequeue load their own field and prefetch their position value. Most importantly, this avoids false sharing while accessing the buffer pointer.
  • Use var ref for Cell.

With these changes performance with 12 threads (2x6 HT) is almost the same as with 1 thread at around 39 millions dequeue+enqueue operations per second. Before the changes it was much worse.

More details here.

Benchmarking code is here. Not sure how that compares to the numbers from readme here in nanos.

BTW, #6 has negative impact.

This code could be copy-pasted to .NET Standard 2.0.

[StructLayout(LayoutKind.Explicit, Size = 384)]
    public class MPMCQueue1
    {
        /// <summary>
        /// 128 bytes cache line already exists in some CPUs.
        /// </summary>
        /// <remarks>
        /// Also "the spatial prefetcher strives to keep pairs of cache lines in the L2 cache."
        /// https://stackoverflow.com/questions/29199779/false-sharing-and-128-byte-alignment-padding
        /// </remarks>
        internal const int SAFE_CACHE_LINE = 128;

        [FieldOffset(SAFE_CACHE_LINE)]
        private readonly Cell[] _enqueueBuffer;

        [FieldOffset(SAFE_CACHE_LINE + 8)]
        private volatile int _enqueuePos;

        // Separate access to buffers from enqueue and dequeue.
        // This removes false sharing and accessing a buffer 
        // reference also prefetches the following Pos with [(64 - (8 + 4 + 4)) = 52]/64 probability.

        [FieldOffset(SAFE_CACHE_LINE * 2)]
        private readonly Cell[] _dequeueBuffer;

        [FieldOffset(SAFE_CACHE_LINE * 2 + 8)]
        private volatile int _dequeuePos;

        public MPMCQueue1(int bufferSize)
        {
            if (bufferSize < 2) throw new ArgumentException($"{nameof(bufferSize)} should be greater than or equal to 2");
            if ((bufferSize & (bufferSize - 1)) != 0) throw new ArgumentException($"{nameof(bufferSize)} should be a power of 2");

            _enqueueBuffer = new Cell[bufferSize];

            for (var i = 0; i < bufferSize; i++)
            {
                _enqueueBuffer[i] = new Cell(i, null);
            }

            _dequeueBuffer = _enqueueBuffer;
            _enqueuePos = 0;
            _dequeuePos = 0;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool TryEnqueue(object item)
        {
            var spinner = new SpinWait();
            do
            {
                var buffer = _enqueueBuffer;
                var pos = _enqueuePos;
                var index = pos & (buffer.Length - 1);
                ref var cell = ref buffer[index];
                if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
                {
                    cell.Element = item;
                    cell.Sequence = pos + 1;
                    return true;
                }

                if (cell.Sequence < pos)
                {
                    return false;
                }

                spinner.SpinOnce();
            } while (true);
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool TryDequeue(out object result)
        {
            result = null;
            var spinner = new SpinWait();
            do
            {
                var buffer = _dequeueBuffer;
                var pos = _dequeuePos;
                var index = pos & (buffer.Length - 1);
                ref var cell = ref buffer[index];
                if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
                {
                    result = cell.Element;
                    cell.Element = null;
                    cell.Sequence = pos + buffer.Length;
                    break;
                }

                if (cell.Sequence < pos + 1)
                {
                    break;
                }

                spinner.SpinOnce();
            } while (true);

            return result != null;
        }

        [StructLayout(LayoutKind.Explicit, Size = 16)]
        private struct Cell
        {
            [FieldOffset(0)]
            public volatile int Sequence;

            [FieldOffset(8)]
            public object Element;

            public Cell(int sequence, object element)
            {
                Sequence = sequence;
                Element = element;
            }
        }
    }

@omariom
Copy link

omariom commented Mar 6, 2020

👍 but why not a PR?

@buybackoff
Copy link
Author

@omariom Because it's 100 lines of self-contained code. This should be distributed as source. For my needs I had to change API somewhat and I also have my own benchmark.

buybackoff added a commit to buybackoff/MPMCQueue.NET that referenced this issue Mar 6, 2020
See alexandrnikitin#8. This is helpful to see the diff.
@buybackoff
Copy link
Author

See #9 for diff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants