使用现有的ngx_slab_pool_t在Nginx中实现进程间读写锁定

如何解决使用现有的ngx_slab_pool_t在Nginx中实现进程间读写锁定

根据ngx_shared_memory_addDevelopment guide - Shared memory中的src\core\ngx_cycle.c,我知道ngx_shm_zone_t是在私有进程(即与共享内存进行通信的处理程序)中分配的。 基于ngx_init_cycle中的src\core\ngx_cycle.c,我发现ngx_slab_pool_t被分配在一个进程共享块中。因此它可以由有权访问它的任何进程中的任何线程操作。

在普通情况下,Nginxngx_slab_pool_t *shpool使用它,我可以找到&shpool->mutex的许多用法/引用,但找不到&shpool->lock的任何用法。 将ngx_slab_pool_t#lock#lockngx_rwlock结合使用以实现进程间读写锁(如下面的示例所示)或如何操作是否安全?

内存结构

Nginx: master process                   shared memory (managed by slab.c)
    ngx_shm_zone_t +------------+  +-+-----------------+
                   |    data    |  | | ngx_slab_pool_t | overhead
                   +------------+  | |-----------------|
                   |  shm.addr  |--+ |                 |
                   |  shm.size  |  | |  slab slots     |
                   |    ...     |  | |                 |
                   +------------+  | |                 |
                   |            |  | |                 |
                   +------------+  | +-----------------+
Nginx: worker process              |
    ngx_shm_zone_t +------------+  |
                   |    data    |  |
                   +------------+  |
                   |  shm.addr  |--+ (copy-On-Write?)
                   |  shm.size  |  |
                   |    ...     |  |
                   +------------+  |
                   |            |  |
                   +------------+  |
Nginx: worker process              |
    ngx_shm_zone_t +------------+  |
                   |    data    |  |
                   +------------+  |
                   |  shm.addr  |--+
                   |  shm.size  |
                   |    ...     |
                   +------------+
                   |            |
                   +------------+

Nginx代码

/** src\core\ngx_cycle.h */

typedef struct ngx_shm_zone_s  ngx_shm_zone_t;
typedef ngx_int_t (*ngx_shm_zone_init_pt) (ngx_shm_zone_t *zone,void *data);
struct ngx_shm_zone_s {
    void                     *data;
    ngx_shm_t                 shm;
    ngx_shm_zone_init_pt      init;
    void                     *tag;
    void                     *sync;
    ngx_uint_t                noreuse;  /* unsigned  noreuse:1; */
};


/** src\os\unix\ngx_shmem.h */

typedef struct {
    u_char      *addr; /* Mapped shared memory address,initially NULL */
    size_t       size; /* Shared memory size */
    ngx_str_t    name; /* Shared memory name */
    ngx_log_t   *log; /* Shared memory log */
    /* Flag that indicates shared memory was inherited from the master process (Windows-specific) */
    ngx_uint_t   exists;   /* unsigned  exists:1;  */
} ngx_shm_t;
ngx_int_t ngx_shm_alloc(ngx_shm_t *shm) {
    // See also named POSIX shared memory objects at https://www.man7.org/linux/man-pages/man3/shm_open.3.html
    // System V shared memory segment at https://linux.die.net/man/2/shmget
    int id = shmget(IPC_PRIVATE,shm->size,(SHM_R|SHM_W|IPC_CREAT));
    shm->addr = shmat(id,NULL,0);
    shmctl(id,IPC_RMID,NULL);
}
void ngx_shm_free(ngx_shm_t *shm) {
    shmdt(shm->addr) == -1;
}


/** src\core\ngx_cycle.c */
ngx_cycle_t * ngx_init_cycle(ngx_cycle_t *old_cycle) {
    // ...
    /* create shared memory */
    // 1. find existing one in shm_zone in cycle->shared_memory.part->elts
    // 2. else ngx_shm_alloc(&shm_zone[i].shm)
    //      ngx_init_zone_pool(cycle,&shm_zone[i])
    //      shm_zone[i].init(&shm_zone[i],NULL)
}

static ngx_int_t ngx_init_zone_pool(ngx_cycle_t *cycle,ngx_shm_zone_t *zn) {
    ngx_slab_pool_t *sp = (ngx_slab_pool_t *) zn->shm.addr;
    if (zn->shm.exists) {
        if (sp == sp->addr) {
            return NGX_OK;
        }
        //...
    }

    sp->end = zn->shm.addr + zn->shm.size;
    sp->min_shift = 3;
    sp->addr = zn->shm.addr;
    //file =...
    ngx_shmtx_create(&sp->mutex,&sp->lock,file);
    ngx_slab_init(sp);
    return NGX_OK;
}


/** src\core\ngx_slab.h */
typedef struct {
    ngx_shmtx_sh_t    lock;

    size_t            min_size;
    size_t            min_shift;

    ngx_slab_page_t  *pages;
    ngx_slab_page_t  *last;
    ngx_slab_page_t   free;

    ngx_slab_stat_t  *stats;
    ngx_uint_t        pfree;

    u_char           *start;
    u_char           *end;

    ngx_shmtx_t       mutex;

    u_char           *log_ctx;
    u_char            zero;

    unsigned          log_nomem:1;

    void             *data;
    void             *addr;
} ngx_slab_pool_t;

void ngx_slab_init(ngx_slab_pool_t *pool) {
    //set up slots
    //slots = ngx_slab_slots(pool);
}


/** src\core\ngx_shmtx.h */

typedef struct {
    ngx_atomic_t   lock;
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_t   wait;
#endif
} ngx_shmtx_sh_t;

typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)
    ngx_atomic_t  *lock;
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_t  *wait;
    ngx_uint_t     semaphore;
    sem_t          sem;
#endif
#else
    ngx_fd_t       fd;
    u_char        *name;
#endif
    ngx_uint_t     spin;
} ngx_shmtx_t;

读/写锁定示例

/*
cc -pipe -O -W -Wall -Wpointer-arith -Wno-unused-parameter -Werror -g -O0 -g -o rwlock rwlock.c -ldl -lpthread -lrt -Wl,-E
*/

#define _GNU_SOURCE // using glibc
extern char *program_invocation_name;
extern char *program_invocation_short_name;
//char* currentprocname = getprogname();
extern char *__progname;

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

#include <unistd.h>
#include <fcntl.h> /* For O_* constants */
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/prctl.h>
#include <sys/stat.h> /* For mode constants */

#include <pthread.h>

#define ANSI_COLOR_RED "\x1b[31m" //"\e[31m" "\33[0:31m\\]"
#define ANSI_COLOR_GREEN "\x1b[32m"
#define ANSI_COLOR_YELLOW "\x1b[33m"
#define ANSI_COLOR_BLUE "\x1b[34m"
#define ANSI_COLOR_magenta "\x1b[35m"
#define ANSI_COLOR_CYAN "\x1b[36m"
#define ANSI_COLOR_RESET "\x1b[0m"

/* &a[0] degrades to a pointer: a different type from an array */
#define __must_be_array(a) BUILD_BUG_ON_ZERO(__builtin_types_compatible_p(typeof(a),typeof(&a[0])))

#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]) + __must_be_array(arr))

#define INTERPROC_RWLOCK_HOLD_ATTR

typedef struct rwlock_resource_s
{
    pthread_rwlock_t lock;
#ifdef INTERPROC_RWLOCK_HOLD_ATTR
    pthread_rwlockattr_t attr;
#endif
    int num;
} rwlock_resource_t;

/* A resource protected by a mutex */
typedef struct mutex_resource_s
{
    int resources;
    pthread_mutex_t mutex;
    pthread_mutexattr_t mutexattr;
} mutex_resource_t;

void test_interprocess_access(rwlock_resource_t *resource)
{
#define CHILD_COUNT 3

    struct worker_s
    {
        int pid;
        // any other data - what a child has to do
    } works[CHILD_COUNT];
    pid_t pid;
    int idx;

    for (idx = 0; idx < CHILD_COUNT; ++idx)
    {
        pid = fork();
        if (-1 == pid)
        {
            printf("PID=%d: error %d in fork()\n",getppid(),errno);
            return;
        }
        if (0 == pid)
        {
            char name[1024];
#if defined(__APPLE__) || defined(__FreeBSD__)
            const char *appname = getprogname();
#elif defined(_GNU_SOURCE)
            const char *appname = program_invocation_short_name;
#else
            const char *appname = "?";
#endif
            sprintf(name,"%s_idx=%d",appname,idx);
            if (-1 == prctl(PR_SET_NAME,name,NULL))
                printf("child PID=%d: prctl Failed %d\n",errno);

            // Prevent children from continuing forking child's child,i.e.
            // 0   1         2         3
            // p-+-p_idx=0-+-p_idx=1---p_idx=2
            //   |         `-p_idx=2
            //   |-p_idx=1---p_idx=2
            //   `-p_idx=2
            // As a result,there are level 1 children only. (pstree -a $(pgrep -f 'Nginx: m'))
            break;
        }
        printf("My %d-th child pid is: %d\n",idx,pid);
        works[idx].pid = pid;
    }

    if (0 == pid) // in a child
    {
        for (idx = 0; idx < 10; ++idx)
        {
            //printf(">>[child-process]entry %d\n",idx);
            pthread_rwlock_wrlock(&resource->lock);
            ++resource->num;
            //printf("  [child-process]++num => %02d\n",resource->num);
            printf("[ child]++num  => %02d (%d-%d)\n",resource->num,getpid());
            pthread_rwlock_unlock(&resource->lock);
            //printf("<<[child-process]leave %d\n\n",idx);
            sleep(1);
        }
        exit(EXIT_SUCCESS);
    }
    else
    {
        pid_t childpid;
        for (idx = 0; idx < 10; idx++)
        {
            sleep(1);
            //printf(">>[parent-process]entry %d\n",idx);
            pthread_rwlock_wrlock(&resource->lock);
            resource->num += 2;
            //printf("  [parent-process]num += 2 => %02d\n",resource->num);
            printf("[parent]num+=2 => %02d (%d-%d)\n",getpid());
            pthread_rwlock_unlock(&resource->lock);
            //printf("<<[parent-process]leave %d\n\n",idx);
        }
        // (childpid = wait(NULL)) > 0 this way,the father waits for all the child processes
        while (childpid = wait(NULL),childpid > 0)
        {
            // a child terminated - find out which one it was

            for (idx = 0; idx < CHILD_COUNT; ++idx)
                if (works[idx].pid == childpid)
                {
                    // use the i-th child results here
                    printf("[parent]works[%d] (PID=%d) terminated\n",childpid);
                }
        }
        // no more children to wait for
        printf("after all child processes end\n");
    }
}

void test_rwlock(rwlock_resource_t *resource,int process_shared,void (*test_pfn)(rwlock_resource_t *resource))
{
    int retval;
#ifdef INTERPROC_RWLOCK_HOLD_ATTR
    retval = pthread_rwlockattr_init(&resource->attr);
    retval = pthread_rwlockattr_setpshared(&resource->attr,process_shared);
    retval = pthread_rwlock_init(&resource->lock,&resource->attr);

    int attrval = 0;
    retval = pthread_rwlockattr_getpshared(&resource->attr,&attrval);
    printf("pthread_rwlockattr_getpshared=%d: process-shared attribtue value is %d\n",retval,attrval);
#else
    pthread_rwlockattr_t attr;
    retval = thread_rwlockattr_init(&attr);
    retval = pthread_rwlockattr_setpshared(&attr,&attr);

    int attrval = 0;
    retval = pthread_rwlockattr_getpshared(&resource->attr,attrval);
#endif

    test_pfn(resource);

#ifdef INTERPROC_RWLOCK_HOLD_ATTR
    retval = pthread_rwlockattr_destroy(&resource->attr);
#else
    retval = pthread_rwlockattr_destroy(&attr);
#endif
    retval = pthread_rwlock_destroy(&resource->lock);
}

/* Link with -lrt
https://www.man7.org/linux/man-pages/man3/shm_open.3.html

kekkou@MONITO:~$ lsof /dev/shm && ls -l /dev/shm/ && ipcs -lm && df -k /dev/shm
COMMAND   PID   USER   FD   TYPE DEVICE SIZE              NODE NAME
rwlock    169 kekkou    3u   REG    0,9   72 31243722414920824 /run/shm/rwlock_shm
rwlock_id 173 kekkou    3u   REG    0,9   72 31243722414920824 /run/shm/rwlock_shm
rwlock_id 174 kekkou    3u   REG    0,9   72 31243722414920824 /run/shm/rwlock_shm
rwlock_id 175 kekkou    3u   REG    0,9   72 31243722414920824 /run/shm/rwlock_shm
total 0
-rw------- 1 kekkou kekkou 72 Aug 31 16:08 rwlock_shm

------ Shared Memory Limits --------
max number of segments = 4096
max seg size (kbytes) = 18014398509481983
max total shared memory (kbytes) = 18014398509481980
min seg size (bytes) = 1

Filesystem     1K-blocks      Used Available Use% Mounted on
none           975923528 629672464 346251064  65% /run/shm
*/
void test_rwlock_named_shared_memory(const char *shmpath) {
    /* Create shared memory object and set its size to the size of our structure by ftruncate */
    int fd = shm_open(shmpath,O_CREAT | O_EXCL | O_RDWR,S_IRUSR | S_IWUSR);
    if (-1 == fd) {
        // EEXIST Both O_CREAT and O_EXCL were specified to shm_open() and the shared memory object specified by name already exists.
        printf("shm_open faield with %d,%s\n",errno,strerror(errno));
        return;
    }
    /* Truncate shared memory segment so it would contain rwlock_resource_t. */
    if (0 != ftruncate(fd,sizeof(rwlock_resource_t))) {
        perror("ftruncate");
        goto clean_up_shm;
    }
    /* Map the object into the caller's address space */
    rwlock_resource_t *shmp = mmap(NULL,sizeof(rwlock_resource_t),PROT_READ | PROT_WRITE,MAP_SHARED,fd,0);
    if (MAP_Failed == shmp)
        perror("mmap");

    test_rwlock(shmp,PTHREAD_PROCESS_SHARED,test_interprocess_access);

clean_up_shm:
    shm_unlink(shmpath);
}


int main(int argc,char *argv[])
{
    rwlock_resource_t *process_shared_resource; //rwlock_resource_t resource,*process_shared_resource;

    process_shared_resource = mmap(NULL,sizeof(*process_shared_resource),MAP_SHARED | MAP_ANON,-1,0);
    printf("process_shared_resource@%p,size=%d\n",process_shared_resource,(int)sizeof(*process_shared_resource));
    //memset(resource,0x00,sizeof(*resource));
    test_rwlock(process_shared_resource,test_interprocess_access);

    //test_rwlock(&resource,PTHREAD_PROCESS_PRIVATE,test_interthread_access);

    test_rwlock_named_shared_memory(2 == argc ? argv[1] : "rwlock_shm");

    printf("\033[22;34mHello,world!\033[0m\n"); // shows a blue hello world

    return munmap(process_shared_resource,sizeof(*process_shared_resource));
}

已更新:

unsigned long中的volatile ngx_rwlockBuilt-in functions for atomic memory access似乎不能用作(在我的情况下,读者更喜欢)多个进程的读/写锁定。 / p>

typedef long                        ngx_atomic_int_t;
typedef unsigned long               ngx_atomic_uint_t;

typedef volatile ngx_atomic_uint_t  ngx_atomic_t;


#define ngx_atomic_cmp_set(lock,old,set)                                    \
    __sync_bool_compare_and_swap(lock,set)

#define ngx_atomic_fetch_add(value,add)                                      \
    __sync_fetch_and_add(value,add)

#define ngx_memory_barrier()        __sync_synchronize()

#if ( __i386__ || __i386 || __amd64__ || __amd64 )
#define ngx_cpu_pause()             __asm__ ("pause")
#else
#define ngx_cpu_pause()
#endif

参考

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?