驱动适配层
相关文件
frameworks/base/include/utils/IPCThreadState.h frameworks/base/libs/utils/IPCThreadState.cpp frameworks/base/include/utils/ProcessState.h frameworks/base/include/utils/ProcessState.cpp frameworks/base/media/mediaserver/main_mediaserver.cpp
适配层模型
适配层本质上实现了如何与驱动通信的方法,ProcessState用来打开Binder,所有数据传递操作放在IPCThreadState处理。
mediaserver的范例
- 我们以mediaserver的源码作为例子,讲述适配层在Binder通信中的作用,首先从mediaserver.cpp的main方法开始。
Binder初始化阶段
-
ProcessState::self()用来初始化ProcessState的单例对象,其构造方法会调用
open_driver()方法负责打开Binder即/dev/binder设备,映射1M的内存,为进程间通信做准备。 -
接下来
defaultServiceManager()会获取ServiceManager的Binder引用,为以后在ServiceManager中注册服务做准备。(参见守护进程的驱动者)
向Binder驱动注册服务
-
为每个服务做初始化,分别是调用各自的
::instantiate(),可参考CameraService.cpp的实现。 -
在初始化过程中会向Binder服务注册各自服务的Binder对象,调用
addService()。使用IServiceManager来实现,参见守护进程的驱动者
Binder消息处理阶段
-
通过
ProcessState::startThreadPool()方法和ProcessState::spawnPooledThread(bool isMain)方法开启线程。 -
IPCThreadState.joinThreadPool()方法建立消息处理循环,它会调用PoolThread的threadLoop()方法为线程执行的入口方法。 -
当client呼叫
BpBinder的IBinder的transact()方法,该transact()则呼叫IPCThreadState的transact()方法。 -
transact()会调用
talkWithDriver(),该方法会用到ProcessState初始化时打开的文件描述符和Binder驱动交互,该方法有两个职责:-
接收来自驱动的请求(即代理端请求)。发送和接收的数据都被写入binder_write_read数据结构,最终通过
ioctl()方法调用与驱动交互。 - 必要时发送结果给驱动(回复代理端请求);
-
接收来自驱动的请求(即代理端请求)。发送和接收的数据都被写入binder_write_read数据结构,最终通过
-
处理请求的方法是
executeCommand(),BR_TRANSACTION响应会读取请求数据,将Binder地址换成BBinder,并调用它的transact()方法处理请求,同时BBinder的继承类会调用它的onTransact()方法。当处理请求后会将结果写入reply并返回。 -
回到executeCommand()方法,如果当前处于同步模式,则会调用
sendReply()方法,它先将结果打包进binder_transaction_data,再调用waitForResponse()提交给驱动。 - 线程不断循环重复以上数据交换的流程直到退出循环。
要点总结
- ProcessState是对进程的抽象类,该类保证了在一个进程中对于设备节点/dev/binder只会被打开一次,它使用单例模式,对象通过ProcessState::self()获取。
- IPCThreadState是对处理线程的抽象类,该类保证了与驱动交互的行为只会在一个线程中被执行,它使用单例模式,对象通过IPCThreadState::self()获取。该类会使用ProcessState对象来取得驱动设备描述符。
- IPCThreadState的实现遵循了Binder协议,它将传输数据封装成binder_transaction_data结构。请求和应答都使用Parcel作为传输介质。(参见Parcel传输介质)
mediaserver.cpp
main
int main(int argc, char** argv)
{
sp<ProcessState> proc(ProcessState::self());
sp<IServiceManager> sm = defaultServiceManager();
AudioFlinger::instantiate();
MediaPlayerService::instantiate();
CameraService::instantiate();
AudioPolicyService::instantiate();
ProcessState::self()->startThreadPool();
IPCThreadState::self()->joinThreadPool();
}
CameraService.cpp
CameraService::instantiate
void CameraService::instantiate() {
defaultServiceManager()->addService(
String16("media.camera"), new CameraService());
}
ProcessState.cpp
ProcessState::ProcessState
-
该构造方法做打开驱动的初始化,并映射内存,初始化时调用
open_driver方法。
ProcessState::ProcessState()
: mDriverFD(open_driver())
, mVMStart(MAP_FAILED)
, mManagesContexts(false)
, mBinderContextCheckFunc(NULL)
, mBinderContextUserData(NULL)
, mThreadPoolStarted(false)
, mThreadPoolSeq(1)
{
if (mDriverFD >= 0) {
// XXX Ideally, there should be a specific define for whether we
// have mmap (or whether we could possibly have the kernel module
// availabla).
#if !defined(HAVE_WIN32_IPC)
// mmap the binder, providing a chunk of virtual address space to receive transactions.
mVMStart = mmap(0, BINDER_VM_SIZE, PROT_READ, MAP_PRIVATE | MAP_NORESERVE, mDriverFD, 0);
if (mVMStart == MAP_FAILED) {
// *sigh*
close(mDriverFD);
mDriverFD = -1;
}
#else
mDriverFD = -1;
#endif
}
if (mDriverFD < 0) {
// Need to run without the driver, starting our own thread pool.
}
}
ProcessState::self
- 单例模式的类,用来构造ProcessState对象。
sp<ProcessState> ProcessState::self()
{
if (gProcess != NULL) return gProcess;
AutoMutex _l(gProcessMutex);
if (gProcess == NULL) gProcess = new ProcessState;
return gProcess;
}
open_driver
- 打开驱动设备,返回描述符。
// 全局变量gSingleProcess初始为false
static int open_driver()
{
if (gSingleProcess) {
return -1;
}
int fd = open("/dev/binder", O_RDWR);
if (fd >= 0) {
......
} else {
}
return fd;
}
startThreadPool
void ProcessState::startThreadPool()
{
AutoMutex _l(mLock);
if (!mThreadPoolStarted) {
mThreadPoolStarted = true;
spawnPooledThread(true);
}
}
spawnPooledThread
void ProcessState::spawnPooledThread(bool isMain)
{
if (mThreadPoolStarted) {
int32_t s = android_atomic_add(1, &mThreadPoolSeq);
char buf[32];
sprintf(buf, "Binder Thread #%d", s);
sp<Thread> t = new PoolThread(isMain);
t->run(buf);
}
}
getContextObject 方法1
sp<IBinder> ProcessState::getContextObject(const sp<IBinder>& caller)
{
if (supportsProcesses()) {
// 根据0句柄查找ServiceManager
return getStrongProxyForHandle(0);
} else {
return getContextObject(String16("default"), caller);
}
}
getContextObject 方法2
sp<IBinder> ProcessState::getContextObject(const String16& name, const sp<IBinder>& caller)
{
mLock.lock();
sp<IBinder> object(
mContexts.indexOfKey(name) >= 0 ? mContexts.valueFor(name) : NULL);
mLock.unlock();
if (object != NULL) return object;
// Don't attempt to retrieve contexts if we manage them
if (mManagesContexts) {
return NULL;
}
// 取得IPC通信渠道
IPCThreadState* ipc = IPCThreadState::self();
{
// 封装Binder的请求数据
Parcel data, reply;
data.writeString16(name);
data.writeStrongBinder(caller);
// 向0号引用的ServiceManager发送请求
status_t result = ipc->transact(0 /*magic*/, 0, data, &reply, 0);
if (result == NO_ERROR) {
object = reply.readStrongBinder();
}
}
ipc->flushCommands();
if (object != NULL) setContextObject(object, name);
return object;
}
getStrongProxyForHandle
sp<IBinder> ProcessState::getStrongProxyForHandle(int32_t handle)
{
sp<IBinder> result;
AutoMutex _l(mLock);
handle_entry* e = lookupHandleLocked(handle);
if (e != NULL) {
// We need to create a new BpBinder if there isn't currently one, OR we
// are unable to acquire a weak reference on this current one. See comment
// in getWeakProxyForHandle() for more info about this.
IBinder* b = e->binder;
if (b == NULL || !e->refs->attemptIncWeak(this)) {
b = new BpBinder(handle);
e->binder = b;
if (b) e->refs = b->getWeakRefs();
result = b;
} else {
// This little bit of nastyness is to allow us to add a primary
// reference to the remote proxy when this team doesn't have one
// but another team is sending the handle to us.
result.force_set(b);
e->refs->decWeak(this);
}
}
return result;
}
ProcessState::getWeakProxyForHandle
wp<IBinder> ProcessState::getWeakProxyForHandle(int32_t handle)
{
wp<IBinder> result;
AutoMutex _l(mLock);
handle_entry* e = lookupHandleLocked(handle);
if (e != NULL) {
// We need to create a new BpBinder if there isn't currently one, OR we
// are unable to acquire a weak reference on this current one. The
// attemptIncWeak() is safe because we know the BpBinder destructor will always
// call expungeHandle(), which acquires the same lock we are holding now.
// We need to do this because there is a race condition between someone
// releasing a reference on this BpBinder, and a new reference on its handle
// arriving from the driver.
IBinder* b = e->binder;
if (b == NULL || !e->refs->attemptIncWeak(this)) {
b = new BpBinder(handle);
result = b;
e->binder = b;
if (b) e->refs = b->getWeakRefs();
} else {
result = b;
e->refs->decWeak(this);
}
}
return result;
}
PoolThread类
class PoolThread : public Thread
{
public:
PoolThread(bool isMain)
: mIsMain(isMain)
{
}
protected:
virtual bool threadLoop()
{
// 通知驱动进入Loop模式,线程的入口函数
IPCThreadState::self()->joinThreadPool(mIsMain);
return false;
}
const bool mIsMain;
};
IServiceManager.cpp
defaultServiceManager
- 该方法返回ServiceManager的Binder引用。
if (gDefaultServiceManager != NULL) return gDefaultServiceManager;
{
AutoMutex _l(gDefaultServiceManagerLock);
if (gDefaultServiceManager == NULL) {
gDefaultServiceManager = interface_cast<IServiceManager>(
ProcessState::self()->getContextObject(NULL));
}
}
return gDefaultServiceManager;
IPCThreadState.cpp
IPCThreadState::IPCThreadState
IPCThreadState::IPCThreadState()
: mProcess(ProcessState::self())
{
pthread_setspecific(gTLS, this);
clearCaller();
// 初始化接收和返回数据的Parcel对象
mIn.setDataCapacity(256);
mOut.setDataCapacity(256);
}
IPCThreadState::self
IPCThreadState* IPCThreadState::self()
{
if (gHaveTLS) {
restart:
const pthread_key_t k = gTLS;
IPCThreadState* st = (IPCThreadState*)pthread_getspecific(k);
if (st) return st;
// 创建IPCThreadState对象
return new IPCThreadState;
}
if (gShutdown) return NULL;
pthread_mutex_lock(&gTLSMutex);
if (!gHaveTLS) {
if (pthread_key_create(&gTLS, threadDestructor) != 0) {
pthread_mutex_unlock(&gTLSMutex);
return NULL;
}
gHaveTLS = true;
}
pthread_mutex_unlock(&gTLSMutex);
goto restart;
}
joinThreadPool
void IPCThreadState::joinThreadPool(bool isMain)
{
// 通知驱动进入BC_ENTER_LOOPER模式
mOut.writeInt32(isMain ? BC_ENTER_LOOPER : BC_REGISTER_LOOPER);
// 进入消息接收循环
status_t result;
do {
int32_t cmd;
// When we've cleared the incoming command queue, process any pending derefs
if (mIn.dataPosition() >= mIn.dataSize()) {
size_t numPending = mPendingWeakDerefs.size();
if (numPending > 0) {
for (size_t i = 0; i < numPending; i++) {
RefBase::weakref_type* refs = mPendingWeakDerefs[i];
refs->decWeak(mProcess.get());
}
mPendingWeakDerefs.clear();
}
numPending = mPendingStrongDerefs.size();
if (numPending > 0) {
for (size_t i = 0; i < numPending; i++) {
BBinder* obj = mPendingStrongDerefs[i];
obj->decStrong(mProcess.get());
}
mPendingStrongDerefs.clear();
}
}
// 与Binder驱动进行交互
result = talkWithDriver();
if (result >= NO_ERROR) {
size_t IN = mIn.dataAvail();
if (IN < sizeof(int32_t)) continue;
cmd = mIn.readInt32();
// 根据驱动的返回结果做出相应的动作
result = executeCommand(cmd);
}
// After executing the command, ensure that the thread is returned to the
// default cgroup and priority before rejoining the pool. This is a failsafe
// in case the command implementation failed to properly restore the thread's
// scheduling parameters upon completion.
int my_id;
#ifdef HAVE_GETTID
my_id = gettid();
#else
my_id = getpid();
#endif
if (!set_sched_policy(my_id, SP_FOREGROUND)) {
// success; reset the priority as well
setpriority(PRIO_PROCESS, my_id, ANDROID_PRIORITY_NORMAL);
}
// Let this thread exit the thread pool if it is no longer
// needed and it is not the main process thread.
if(result == TIMED_OUT && !isMain) {
break;
}
} while (result != -ECONNREFUSED && result != -EBADF);
mOut.writeInt32(BC_EXIT_LOOPER);
talkWithDriver(false);
}
talkWithDriver
- 该方法将mIn、mOut数据写入binder_write_read结构,并通过ioctl与驱动交互。参见Binder通信协议。
status_t IPCThreadState::talkWithDriver(bool doReceive)
{
// Binder驱动ioctl需要binder_write_read结构作为参数
binder_write_read bwr;
// 从mIn.dataSize判断是否需要读驱动的数据
const bool needRead = mIn.dataPosition() >= mIn.dataSize();
// We don't want to write anything if we are still reading
// from data left in the input buffer and the caller
// has requested to read the next data.
const size_t outAvail = (!doReceive || needRead) ? mOut.dataSize() : 0;
bwr.write_size = outAvail;
bwr.write_buffer = (long unsigned int)mOut.data();
// This is what we'll read.
if (doReceive && needRead) {
bwr.read_size = mIn.dataCapacity();
bwr.read_buffer = (long unsigned int)mIn.data();
} else {
bwr.read_size = 0;
}
// Return immediately if there is nothing to do.
if ((bwr.write_size == 0) && (bwr.read_size == 0)) return NO_ERROR;
bwr.write_consumed = 0;
bwr.read_consumed = 0;
status_t err;
do {
#if defined(HAVE_ANDROID_OS)
// 将BINDER_WRITE_READ结构数据写入驱动
if (ioctl(mProcess->mDriverFD, BINDER_WRITE_READ, &bwr) >= 0)
err = NO_ERROR;
else
err = -errno;
#else
err = INVALID_OPERATION;
#endif
} while (err == -EINTR);
if (err >= NO_ERROR) {
if (bwr.write_consumed > 0) {
if (bwr.write_consumed < (ssize_t)mOut.dataSize())
mOut.remove(0, bwr.write_consumed);
else
mOut.setDataSize(0);
}
if (bwr.read_consumed > 0) {
mIn.setDataSize(bwr.read_consumed);
mIn.setDataPosition(0);
}
return NO_ERROR;
}
return err;
}
transact
- 该方法首先将数据封装成binder_transaction_data结构,再判断异步或同步数据处理。
status_t IPCThreadState::transact(int32_t handle,
uint32_t code, const Parcel& data,
Parcel* reply, uint32_t flags)
{
status_t err = data.errorCheck();
flags |= TF_ACCEPT_FDS;
if (err == NO_ERROR) {
// 将数据封装成binder_transaction_data数据包结构
err = writeTransactionData(BC_TRANSACTION, flags, handle, code, data, NULL);
}
if (err != NO_ERROR) {
if (reply) reply->setError(err);
return (mLastError = err);
}
// TF_ONE_WAY表示异步处理
if ((flags & TF_ONE_WAY) == 0) {
// 同步模式,则等待驱动应答
if (reply) {
err = waitForResponse(reply);
} else {
Parcel fakeReply;
err = waitForResponse(&fakeReply);
}
} else {
// 异步模式,则传递NULL作为参数
err = waitForResponse(NULL, NULL);
}
return err;
}
writeTransactionData
- 首先将数据封装进binder_transaction_data结构中,再将该结构的数据写入mOut,下次与驱动交互时再将该数据写入驱动。
status_t IPCThreadState::writeTransactionData(int32_t cmd, uint32_t binderFlags,
int32_t handle, uint32_t code, const Parcel& data, status_t* statusBuffer)
{
binder_transaction_data tr;
tr.target.handle = handle;
tr.code = code;
tr.flags = binderFlags;
const status_t err = data.errorCheck();
if (err == NO_ERROR) {
tr.data_size = data.ipcDataSize();
tr.data.ptr.buffer = data.ipcData();
tr.offsets_size = data.ipcObjectsCount()*sizeof(size_t);
tr.data.ptr.offsets = data.ipcObjects();
} else if (statusBuffer) {
tr.flags |= TF_STATUS_CODE;
*statusBuffer = err;
tr.data_size = sizeof(status_t);
tr.data.ptr.buffer = statusBuffer;
tr.offsets_size = 0;
tr.data.ptr.offsets = NULL;
} else {
return (mLastError = err);
}
mOut.writeInt32(cmd);
mOut.write(&tr, sizeof(tr));
return NO_ERROR;
}
waitForResponse
status_t IPCThreadState::waitForResponse(Parcel *reply, status_t *acquireResult)
{
int32_t cmd;
int32_t err;
while (1) {
// 调用了talkWithDriver()与驱动交互
if ((err=talkWithDriver()) < NO_ERROR) break;
err = mIn.errorCheck();
if (err < NO_ERROR) break;
if (mIn.dataAvail() == 0) continue;
cmd = mIn.readInt32();
switch (cmd) {
case BR_TRANSACTION_COMPLETE:
if (!reply && !acquireResult) goto finish;
break;
case BR_DEAD_REPLY:
err = DEAD_OBJECT;
goto finish;
case BR_FAILED_REPLY:
err = FAILED_TRANSACTION;
goto finish;
case BR_ACQUIRE_RESULT:
{
const int32_t result = mIn.readInt32();
if (!acquireResult) continue;
*acquireResult = result ? NO_ERROR : INVALID_OPERATION;
}
goto finish;
case BR_REPLY:
{
binder_transaction_data tr;
err = mIn.read(&tr, sizeof(tr));
if (err != NO_ERROR) goto finish;
if (reply) {
if ((tr.flags & TF_STATUS_CODE) == 0) {
reply->ipcSetDataReference(
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(size_t),
freeBuffer, this);
} else {
err = *static_cast<const status_t*>(tr.data.ptr.buffer);
freeBuffer(NULL,
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(size_t), this);
}
} else {
freeBuffer(NULL,
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(size_t), this);
continue;
}
}
goto finish;
default:
err = executeCommand(cmd);
if (err != NO_ERROR) goto finish;
break;
}
}
finish:
if (err != NO_ERROR) {
if (acquireResult) *acquireResult = err;
if (reply) reply->setError(err);
mLastError = err;
}
return err;
}
executeCommand
- 该方法执行具体的Binder相关命令操作。
-
重点看BR_TRANSACTION部分,该类将读出的Binder数据转换成
BBinder,并调用它的transact()方法。最终将调用BBinder的继承类Bn*Service的onTransact()方法。
status_t IPCThreadState::executeCommand(int32_t cmd)
{
BBinder* obj;
RefBase::weakref_type* refs;
status_t result = NO_ERROR;
switch (cmd) {
case BR_ERROR:
result = mIn.readInt32();
break;
case BR_OK:
break;
case BR_ACQUIRE:
refs = (RefBase::weakref_type*)mIn.readInt32();
obj = (BBinder*)mIn.readInt32();
obj->incStrong(mProcess.get());
mOut.writeInt32(BC_ACQUIRE_DONE);
mOut.writeInt32((int32_t)refs);
mOut.writeInt32((int32_t)obj);
break;
case BR_RELEASE:
refs = (RefBase::weakref_type*)mIn.readInt32();
obj = (BBinder*)mIn.readInt32();
mPendingStrongDerefs.push(obj);
break;
case BR_INCREFS:
refs = (RefBase::weakref_type*)mIn.readInt32();
obj = (BBinder*)mIn.readInt32();
refs->incWeak(mProcess.get());
mOut.writeInt32(BC_INCREFS_DONE);
mOut.writeInt32((int32_t)refs);
mOut.writeInt32((int32_t)obj);
break;
case BR_DECREFS:
refs = (RefBase::weakref_type*)mIn.readInt32();
obj = (BBinder*)mIn.readInt32();
mPendingWeakDerefs.push(refs);
break;
case BR_ATTEMPT_ACQUIRE:
refs = (RefBase::weakref_type*)mIn.readInt32();
obj = (BBinder*)mIn.readInt32();
{
const bool success = refs->attemptIncStrong(mProcess.get());
mOut.writeInt32(BC_ACQUIRE_RESULT);
mOut.writeInt32((int32_t)success);
}
break;
case BR_TRANSACTION:
{
binder_transaction_data tr;
result = mIn.read(&tr, sizeof(tr));
if (result != NO_ERROR) break;
Parcel buffer;
buffer.ipcSetDataReference(
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(size_t), freeBuffer, this);
const pid_t origPid = mCallingPid;
const uid_t origUid = mCallingUid;
mCallingPid = tr.sender_pid;
mCallingUid = tr.sender_euid;
Parcel reply;
if (tr.target.ptr) {
// 将Binder的数据转换成BBinder,并调用它的transact方法,Bn*类即是继承于BBinder
sp<BBinder> b((BBinder*)tr.cookie);
const status_t error = b->transact(tr.code, buffer, &reply, 0);
if (error < NO_ERROR) reply.setError(error);
} else {
const status_t error = the_context_object->transact(tr.code, buffer, &reply, 0);
if (error < NO_ERROR) reply.setError(error);
}
// 如果是同步模式,则返回结果
if ((tr.flags & TF_ONE_WAY) == 0) {
sendReply(reply, 0);
} else {
}
mCallingPid = origPid;
mCallingUid = origUid;
}
break;
case BR_DEAD_BINDER:
{
BpBinder *proxy = (BpBinder*)mIn.readInt32();
proxy->sendObituary();
mOut.writeInt32(BC_DEAD_BINDER_DONE);
mOut.writeInt32((int32_t)proxy);
} break;
case BR_CLEAR_DEATH_NOTIFICATION_DONE:
{
BpBinder *proxy = (BpBinder*)mIn.readInt32();
proxy->getWeakRefs()->decWeak(proxy);
} break;
case BR_FINISHED:
result = TIMED_OUT;
break;
case BR_NOOP:
break;
case BR_SPAWN_LOOPER:
mProcess->spawnPooledThread(false);
break;
default:
printf("*** BAD COMMAND %d received from Binder driver\n", cmd);
result = UNKNOWN_ERROR;
break;
}
if (result != NO_ERROR) {
mLastError = result;
}
return result;
}
incStrongHandle
void IPCThreadState::incStrongHandle(int32_t handle)
{
mOut.writeInt32(BC_ACQUIRE);
mOut.writeInt32(handle);
}
decStrongHandle
void IPCThreadState::decStrongHandle(int32_t handle)
{
mOut.writeInt32(BC_RELEASE);
mOut.writeInt32(handle);
}
incWeakHandle
void IPCThreadState::incWeakHandle(int32_t handle)
{
mOut.writeInt32(BC_INCREFS);
mOut.writeInt32(handle);
}
decWeakHandle
void IPCThreadState::decWeakHandle(int32_t handle)
{
mOut.writeInt32(BC_DECREFS);
mOut.writeInt32(handle);
}