创建线程和结束线程
pthread_create
#include <pthread.h>
extern int pthread_create (pthread_t *__restrict __newthread,
const pthread_attr_t *__restrict __attr,
void *(*__start_routine) (void *),
void *__restrict __arg) __THROWNL __nonnull ((1, 3));
_newthread是新线程的标识符,后续pthread*函数通过它来引用新线程。其类型pthread_t的定义如下:
#include <bits/pthreadtypes.h>
typedef unsigned long int pthread_t;
attr参数用于设置新线程的属性。给它传递NULL表示使用默认线程属性。start_routine 和arg参数分别指定新线程将运行的丽数及其参数。
pthread_create 成功时返回0,失败时返回错误码。一个用户可以打开的线程数量不能超过RLIMIT NPROC软资源限制。此外,系统上所有用户能创建的线程总数也不得超过/proc/sys/kermnel/threads-max 内核参数所定义的值。
pthread_exit
线程一旦被创建好,内核就可以调度内核线程来执行start_routine函数数指针所指向的函数了。线程函数在结束时最好调用如下函数,以确保安全、干净地退出:
extern void pthread_exit (void *__retval) __attribute__ ((__noreturn__));
pthread_exit 函数通过retval参数向线程的回收者传递其退出信息。它执行完之后不会返回到调用者,而且永远不会失败。
pthread_join
一个进程中的所有线程都可以调用pthread_join 函数来回收其他线程( 前提是目标线程是可回收的,见后文),即等待其他线程结束,这类似于回收进程的wait和waitpid系统调用。pthread_join 的定义如下:
extern int pthread_join (pthread_t __th, void **__thread_return);
__th参数是目标线程的标识符,__thread_return 参数则是目标线程返回的退出信息。该函数会一直阻塞,直到被回收的线程结束为止。该函数成功时返回0,失败则返回错误码。可能的错误码如表所示。
pthread_cancel
有时候我们希望异常终止-一个线程,即取消线程,它是通过如下函数实现的:
extern int pthread_cancel (pthread_t __th);
__th参数是目标线程的标识符。该函数成功时返回0,失败则返回错误码。不过,接收到取消请求的目标线程可以决定是否允许被取消以及如何取消,这分别由如下两个函数完成:
extern int pthread_setcancelstate (int __state, int *__oldstate);
extern int pthread_setcanceltype (int __type, int *__oldtype);
这两个函数的第一个参数分别用于设置线程的取消状态(是否允许取消)和取消类型(如何取消),第二个参数则分别记录线程原来的取消状态和取消类型。state 参数有两个可选值:
- PTHREAD_CANCEL_ENABLE,允许线程被取消。它是线程被创建时的默认取消状态。
- PTHREAD_CANCEL_DISABLE,禁止线程被取消。这种情况下,如果一个线程收到取消请求,则它会将请求挂起,直到该线程允许被取消。
type参数也有两个可选值:
- PTHREAD_CANCEL_ASYNCHRONOUS,线程随时都可以被取消。它将使得接收到取消请求的目标线程立即采取行动。
- PTHREAD_CANCEL_DEFERRED,允许目标线程推迟行动,直到它调用了下面几个所谓的取消点函数中的一个: pthread_join、 pthread_testcancel、 pthread_cond_wait、pthread_cond_timedwait、sem_wait 和sigwait。根据POSIX标准,其他可能阻塞的系统调用,比如read、wait, 也可以成为取消点。不过为了安全起见,我们最好在可能会被取消的代码中调用pthread_testcancel 函数以设置取消点。
pthread_setcancelstate和pthread_setcanceltype 成功时返回0,失败则返回错误码。
线程属性
# define __SIZEOF_PTHREAD_ATTR_T 56
union pthread_attr_t
{
char __size[__SIZEOF_PTHREAD_ATTR_T];
long int __align;
};
#ifndef __have_pthread_attr_t
typedef union pthread_attr_t pthread_attr_t;
# define __have_pthread_attr_t 1
#endif
extern int pthread_attr_init (pthread_attr_t *__attr) __THROW __nonnull ((1));
extern int pthread_attr_destroy (pthread_attr_t *__attr)
__THROW __nonnull ((1));
extern int pthread_attr_getdetachstate (const pthread_attr_t *__attr,
int *__detachstate)
__THROW __nonnull ((1, 2));
extern int pthread_attr_setdetachstate (pthread_attr_t *__attr,
int __detachstate)
__THROW __nonnull ((1));
extern int pthread_attr_getguardsize (const pthread_attr_t *__attr,
size_t *__guardsize)
__THROW __nonnull ((1, 2));
extern int pthread_attr_setguardsize (pthread_attr_t *__attr,
size_t __guardsize)
__THROW __nonnull ((1));
extern int pthread_attr_getschedparam (const pthread_attr_t *__restrict __attr,
struct sched_param *__restrict __param)
__THROW __nonnull ((1, 2));
extern int pthread_attr_setschedparam (pthread_attr_t *__restrict __attr,
const struct sched_param *__restrict
__param) __THROW __nonnull ((1, 2));
extern int pthread_attr_getschedpolicy (const pthread_attr_t *__restrict
__attr, int *__restrict __policy)
__THROW __nonnull ((1, 2));
extern int pthread_attr_setschedpolicy (pthread_attr_t *__attr, int __policy)
__THROW __nonnull ((1));
extern int pthread_attr_getinheritsched (const pthread_attr_t *__restrict
__attr, int *__restrict __inherit)
__THROW __nonnull ((1, 2));
extern int pthread_attr_setinheritsched (pthread_attr_t *__attr,
int __inherit)
__THROW __nonnull ((1));
extern int pthread_attr_getscope (const pthread_attr_t *__restrict __attr,
int *__restrict __scope)
__THROW __nonnull ((1, 2));
extern int pthread_attr_setscope (pthread_attr_t *__attr, int __scope)
__THROW __nonnull ((1));
extern int pthread_attr_getstackaddr (const pthread_attr_t *__restrict
__attr, void **__restrict __stackaddr)
__THROW __nonnull ((1, 2)) __attribute_deprecated__;
extern int pthread_attr_setstackaddr (pthread_attr_t *__attr,
void *__stackaddr)
__THROW __nonnull ((1)) __attribute_deprecated__;
extern int pthread_attr_getstacksize (const pthread_attr_t *__restrict
__attr, size_t *__restrict __stacksize)
__THROW __nonnull ((1, 2));
extern int pthread_attr_setstacksize (pthread_attr_t *__attr,
size_t __stacksize)
__THROW __nonnull ((1));
POSIX信号量
#include <semaphore.h>
extern int sem_init (sem_t *__sem, int __pshared, unsigned int __value)
__THROW __nonnull ((1));
extern int sem_destroy (sem_t *__sem) __THROW __nonnull ((1));
extern int sem_wait (sem_t *__sem) __nonnull ((1));
extern int sem_trywait (sem_t *__sem) __THROWNL __nonnull ((1));
extern int sem_post (sem_t *__sem) __THROWNL __nonnull ((1));
这些函数的第一个参数sem指向被操作的信号量。
- sem_init 函数用于初始化一个未命名的信号量(POSIX 信号量API支持命名信号量)。pshared 参数指定信号量的类型。如果其值为0,就表示这个信号量是当前进程的局部信号量,否则该信号量就可以在多个进程之间共享。value 参数指定信号量的初始值。此外,初始化一个已经被初始化的信号量将导致不可预期的结果。
- sem_destroy 函数用于销毁信号量,以释放其占用的内核资源。如果销毁-一个正被其他线程等待的信号量,则将导致不可预期的结果。
- sem_wait 函数以原子操作的方式将信号量的值减1。如果信号量的值为0,则sem_wait将被阻塞,直到这个信号量具有非0值。
- sem_trywait 与sem_wait 函数相似,不过它始终立即返回,而不论被操作的信号量是否具有非0值,相当于sem_wait 的非阻塞版本。当信号量的值非0时,sem_trywait 对信号量执行减1操作。当信号量的值为0时,它将返回-1并设置errno为EAGAIN。
- sem_post函数以原子操作的方式将信号量的值加1。当信号量的值大于0时,其他正在调用sem_wait 等待信号量的线程将被唤醒。
上面这些函数成功时返回0,失败则返回-1并设置errno。
互斥锁
互斥锁(也称互斥量)可以用于保护关键代码段,以确保其独占式的访问,这有点像一个二进制信号量。当进入关键代码段时,我们需要获得互斥锁并将其加锁,这等价于二进制信号量的P操作;当离开关键代码段时,我们需要对互斥锁解锁,以唤醒其他等待该互斥锁的线程,这等价于二进制信号量的V操作。
extern int pthread_mutex_init (pthread_mutex_t *__mutex,
const pthread_mutexattr_t *__mutexattr)
__THROW __nonnull ((1));
extern int pthread_mutex_destroy (pthread_mutex_t *__mutex)
__THROW __nonnull ((1));
extern int pthread_mutex_trylock (pthread_mutex_t *__mutex)
__THROWNL __nonnull ((1));
extern int pthread_mutex_lock (pthread_mutex_t *__mutex)
__THROWNL __nonnull ((1));
这些函数的第一个参数mutex指向要操作的目标互斥锁,互斥锁的类型是pthread_mutex_t结构体。
- pthread_pthread_mutex_init 函数用于初始化互斥锁。mutexattr 参数指定互斥锁的属性。如果将它设置为NULL,则表示使用默认属性。除了这个函数外,我们还可以使用如下方式来初始化一个互斥锁:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 宏PTHREAD_MUTEX_INITIALIZER 实际上只是把互斥锁的各个字段都初始化为0。 - pthread_mutex_destroy 函数用于销毁互斥锁,以释放其占用的内核资源。销毁一个已经加锁的互斥锁将导致不可预期的后果。
- pthread_mutex_lock 函数以原子操作的方式给一个互斥锁加锁。如果目标互斥锁已经被锁上,则pthread_mutex_lock 调用将阻塞,直到该互斥锁的占有者将其解锁。
- pthread_mutex_trylock 与pthread_mutex_lock 函数类似,不过它始终立即返回,而不论被操作的互斥锁是否已经被加锁,相当于pthread_mutex_lock 的非阻塞版本。当目标互斥锁未被加锁时pthread_mutex_trylock对互斥锁执行加锁操作。当互斥锁已经被加锁时,pthread_mutex_trylock 将返回错误码EBUSY。需要注意的是,这里讨论的pthread_mutex_lock和pthread_mutex_trylock 的行为是针对普通锁而言的。后面我们将看到,对于其他类型的锁而言,这两个加锁函数会有不同的行为。
- pthread_mutex_unlock 函数以原子操作的方式给一个互斥锁解锁。如果此时有其他线程正在等待这个互斥锁,则这些线程中的某一个将获得它。
互斥锁属性
typedef union
{
char __size[__SIZEOF_PTHREAD_MUTEXATTR_T];
int __align;
} pthread_mutexattr_t;
extern int pthread_mutexattr_init (pthread_mutexattr_t *__attr)
__THROW __nonnull ((1));
extern int pthread_mutexattr_destroy (pthread_mutexattr_t *__attr)
__THROW __nonnull ((1));
extern int pthread_mutexattr_getpshared (const pthread_mutexattr_t *
__restrict __attr,
int *__restrict __pshared)
__THROW __nonnull ((1, 2));
extern int pthread_mutexattr_setpshared (pthread_mutexattr_t *__attr,
int __pshared)
__THROW __nonnull ((1));
#if defined __USE_UNIX98 || defined __USE_XOPEN2K8
extern int pthread_mutexattr_gettype (const pthread_mutexattr_t *__restrict
__attr, int *__restrict __kind)
__THROW __nonnull ((1, 2));
extern int pthread_mutexattr_settype (pthread_mutexattr_t *__attr, int __kind)
__THROW __nonnull ((1));
#endif
互斥锁的两种常用属性: pshared 和type。互斥锁属性pshared指定是否允许跨进程共享互斥锁,其可选值有两个:
- PTHREAD_PROCESS_SHARED。互斥锁可以被跨进程共享。
- PTHREAD_PROCESS_PRIVATE。 互斥锁只能被和锁的初始化线程隶属于同一个进程的线程共享。
互斥锁属性type指定互斥锁的类型。Linux 支持如下4种类型的互斥锁:
- PTHREAD_MUTEX_NORMAL,普通锁。这是互斥锁默认的类型。当一个线程对一个普通锁加锁以后,其余请求该锁的线程将形成一个等待队列,并在该锁解锁后按优先级获得它。这种锁类型保证了资源分配的公平性。但这种锁也很容易引发问题:一个线程如果对一个已经加锁的普通锁再次加锁,将引发死锁;对一个已经被其他线程加锁的普通锁解锁,或者对一个已经解锁的普通锁再次解锁,将导致不可预期的后果。
- PTHREAD_MUTEX_ERRORCHECK,检错锁。一个线程如果对一个已经加锁的检错锁再次加锁,则加锁操作返回EDEADLK。对一个已经被其他线程加锁的检错锁解锁,或者对一个已经解锁的检错锁再次解锁,则解锁操作返回EPERM。
- PTHREAD_MUTEX_RECURSIVE,嵌套锁。这种锁允许一个线程在释放锁之前多次对它加锁而不发生死锁。不过其他线程如果要获得这个锁,则当前锁的拥有者必须执行相应次数的解锁操作。对一个已经被其他线程加锁的嵌套锁解锁,或者对一个已经解锁的嵌套锁再次解锁,则解锁操作返回EPERM。
- PTHREAD_MUTEX_DEFAULT,默认锁。一个线程如果对一个已经加锁的默认锁再次加锁,或者对一个已 经被其他线程加锁的默认锁解锁,或者对一个已经解锁的默认锁再次解锁,将导致不可预期的后果。这种锁在实现的时候可能被映射为上面三种锁之一。
条件变量
如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量则是用于在线程之间同步共享数据的值。条件变量提供了一种线程间的通知机制:当某个共享数据达到某个值的时候,唤醒等待这个共享数据的线程。条件变量的相关函数主要有如下5个:
typedef union
{
struct __pthread_cond_s __data;
char __size[__SIZEOF_PTHREAD_COND_T];
__extension__ long long int __align;
} pthread_cond_t;
extern int pthread_cond_init (pthread_cond_t *__restrict __cond,
const pthread_condattr_t *__restrict __cond_attr)
__THROW __nonnull ((1));
extern int pthread_cond_destroy (pthread_cond_t *__cond)
__THROW __nonnull ((1));
extern int pthread_cond_signal (pthread_cond_t *__cond)
__THROWNL __nonnull ((1));
extern int pthread_cond_broadcast (pthread_cond_t *__cond)
__THROWNL __nonnull ((1));
extern int pthread_cond_wait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex)
__nonnull ((1, 2));
这些函数的第一个参数cond指向要操作的目标条件变量,条件变量的类型是pthread_cond_t结构体。
- pthread_cond_init 函数用于初始化条件变量。cond_attr 参数指定条件变量的属性。如果将它设置为NULL,则表示使用默认属性。条件变量的属性不多,而且和互斥锁的属性类型相似,所以我们不再赘述。除了pthread_cond_init 丽数外,我们还可以使用如下方式来初始化一个条件变量:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 宏PTHREAD_COND_INITIALIZER 实际上只是把条件变量的各个字段都初始化为0。 - pthread_cond_destroy 函数用于销毁条件变量,以释放其占用的内核资源。销毁一个正在被等待的条件变量将失败并返回EBUSY。
- pthread_cond_broadcast 函数以广播的方式唤醒所有等待目标条件变量的线程。pthread_cond_signal 函数用于唤醒一个等待目标条件变量的线程。至于哪个线程将被唤醒,则取决于线程的优先级和调度策略。有时候我们可能想唤醒一个指定的线程,但pthread没有对该需求提供解决方法。不过我们可以间接地实现该需求:定义一个能够唯一表示目标线程的全局变量,在唤醒等待条件变量的线程前先设置该变量为目标线程,然后采用广播方式唤醒所有等待条件变量的线程,这些线程被唤醒后都检查该变量以判断被唤醒的是否是自己,如果是就开始执行后续代码,如果不是则返回继续等待。
- pthread_cond_wait 函数用于等待目标条件变量。mutex 参数是用于保护条件变量的互斥锁,以确保pthread_cond_wait 操作的原子性。在调用pthread_cond_wait 前,必须确保互斥锁mutex已经加锁,否则将导致不可预期的结果。pthread_cond_wait 函数执行时,首先把调用线程放入条件变量的等待队列中,然后将互斥锁mutex解锁。可见,从pthread_cond_wait开始执行到其调用线程被放入条件变量的等待队列之间的这段时间内, pthread_cond_signal和pthread_cond_broadcast 等函数不会修改条件变量。换言之,pthread_cond_wait 丽数不会错过目标条件变量的任何变化。当pthread_cond_wait 丽数成功返回时,互斥锁mutex将再次被锁上。
可重入函数
如果一个丽数能被多个线程同时调用且不发生竞态条件,则我们称它是线程安全的(thread safe),或者说它是可重人函数。Linux 库函数只有一小部分是不可重人的,比如inet_ntoa 函数,以及getservbyname和getservbyport函数。这些库函数之所以不可重人,主要是因为其内部使用了静态变量。不过Linux对很多不可重人的库函数提供了对应的可重入版本,这些可重人版本的函数名是在原函数名尾部加上_r。 比如,函数localtime对应的可重入函数是localtime_ r。在多线程程序中调用库函数,一定要使用其可重人版本,否则可能导致预想不到的结果。
线程和进程
思考这样一个问题:如果一个多线程程序的某个线程调用了fork 函数,那么新创建的子进程是否将自动创建和父进程相同数量的线程呢?答案是“否”,正如我们期望的那样。子进程只拥有一个执行线程,它是调用fork的那个线程的完整复制。并且子进程将自动继承父进程中互斥锁(条件变量与之类似)的状态。也就是说,父进程中已经被加锁的互斥锁在子进程中也是被锁住的。这就引起了一个问题:子进程可能不清楚从父进程继承而来的互斥锁的具体状态(是加锁状态还是解锁状态)。这个互斥锁可能被加锁了,但并不是由调用fork函数的那个线程锁住的,而是由其他线程锁住的。如果是这种情况,则子进程若再次对该互斥锁执行加锁操作就会导致死锁。
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <wait.h>
pthread_mutex_t mutex;
void *another(void *arg)
{
printf("in child thread, lock the mutex\n");
pthread_mutex_lock(&mutex);
sleep(5);
pthread_mutex_unlock(&mutex);
}
void prepare()
{
pthread_mutex_lock(&mutex);
}
void infork()
{
pthread_mutex_unlock(&mutex);
}
int main()
{
pthread_mutex_init(&mutex, NULL);
pthread_t id;
pthread_create(&id, NULL, another, NULL);
sleep(1);
int pid = fork();
if (pid < 0)
{
pthread_join(id, NULL);
pthread_mutex_destroy(&mutex);
return 1;
}
else if (pid == 0)
{
printf("I anm in the child, want to get the lock\n");
pthread_mutex_lock(&mutex);
printf("I can not run to here, oop...\n");
pthread_mutex_unlock(&mutex);
exit(0);
}
else
{
pthread_mutex_unlock(&mutex);
wait(NULL);
}
pthread_join(id, NULL);
pthread_mutex_destroy(&mutex);
return 0;
}
不过,pthread 提供了一个专门的函数pthread_ atfork, 以确保fork 调用后父进程和子进程都拥有一个清楚的锁状态。该函数的定义如下:
extern int pthread_atfork (void (*__prepare) (void),
void (*__parent) (void),
void (*__child) (void)) __THROW;
该函数将建立3个fork句柄来帮助我们清理互斥锁的状态。prepare句柄将在fork调用创建出子进程之前被执行。它可以用来锁住所有父进程中的互斥锁。parent 句柄则是fork调用创建出子进程之后,而fork返回之前,在父进程中被执行。它的作用是释放所有在prepare句柄中被锁住的互斥锁。child 句柄是fork返回之前,在子进程中被执行。和parent句柄一样,child 句柄也是用于释放所有在prepare句柄中被锁住的互斥锁。
线程和信号
extern int pthread_sigmask (int __how,
const __sigset_t *__restrict __newmask,
__sigset_t *__restrict __oldmask)__THROW;
由于进程中的所有线程共享该进程的信号,所以线程库将根据线程掩码决定把信号发送给哪个具体的线程。因此,如果我们在每个子线程中都单独设置信号掩码,就很容易导致逻辑错误。此外,所有线程共享信号处理函数。也就是说,当我们在一个线 程中设置了某个信号的信号处理函数后,它将覆盖其他线程为同-一个信号设置的信号处理函数。这两点都说明,我们应该定义一个专门的线程来处理所有的信号。这可以通过如下两个步骤来实现:
- 在主线程创建出其他子线程之前就调用pthread_sigmask来设置好信号掩码,所有新创建的子线程都将自动继承这个信号掩码。这样做之后,实际上所有线程都不会响应被屏蔽的信号了。
- 在某个线程中调用如下函数来等待信号并处理之:
# ifdef __USE_POSIX199506
extern int sigwait (const sigset_t *__restrict __set, int *__restrict __sig)
__nonnull ((1, 2));
# endif
set参数指定需要等待的信号的集合。我们可以简单地将其指定为在第1步中创建的信号掩码,表示在该线程中等待所有被屏蔽的信号。参数sig指向的整数用于存储该函数返回的信号值。sigwait 成功时返回0,失败则返回错误码。一旦sigwait正确返回,我们就可以对接收到的信号做处理了。很显然,如果我们使用了sigwait, 就不应该再为信号设置信号处理函数了。这是因为当程序接收到信号时,二者中只能有一个起作用。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <errno.h>
#define handle_error_en(en, msg) \
do \
{ \
errno = en; \
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)
static void *
sig_thread(void *arg)
{
printf("yyyyy, thread id is: %ld\n", pthread_self());
sigset_t aset;
int s, sig;
sigemptyset(&aset);
sigaddset(&aset, SIGQUIT);
sigaddset(&aset, SIGUSR1);
sigset_t *set = (sigset_t *)arg;
for (;;)
{
s = sigwait(set, &sig);
if (s != 0)
handle_error_en(s, "sigwait");
printf("Signal handling thread got signal %d\n", sig);
}
}
static void handler(int arg)
{
printf("xxxxx, thread id is: %ld\n", pthread_self());
}
int main(int argc, char *argv[])
{
pthread_t thread;
sigset_t set;
int s;
signal(SIGQUIT, handler);
s = pthread_create(&thread, NULL, &sig_thread, (void *)&set);
sigemptyset(&set);
sigaddset(&set, SIGQUIT);
sigaddset(&set, SIGUSR1);
if (s != 0)
handle_error_en(s, "pthread_create");
printf("sub thread with id: %ld\n", thread);
pause();
}
最后, pthread还提供了下面的方法,使得我们可以明确地将一个信号发送给指定的线程:
extern int pthread_kill (pthread_t __threadid, int __signo) __THROW;
其中,thread 参数指定目标线程,sig 参数指定待发送的信号。如果sig为0,则pthread_kill不发送信号,但它任然会执行错误检查。我们可以利用这种方式来检测目标线程是否存在。pthread_kill 成功时返回0,失败则返回错误码。
|