驱动适配层
相关文件
frameworks/base/include/utils/IPCThreadState.h frameworks/base/libs/utils/IPCThreadState.cpp frameworks/base/include/utils/ProcessState.h frameworks/base/include/utils/ProcessState.cpp frameworks/base/media/mediaserver/main_mediaserver.cpp
适配层模型
适配层本质上实现了如何与驱动通信的方法,ProcessState用来打开Binder,所有数据传递操作放在IPCThreadState处理。
mediaserver的范例
- 我们以mediaserver的源码作为例子,讲述适配层在Binder通信中的作用,首先从mediaserver.cpp的main方法开始。
Binder初始化阶段
-
ProcessState::self()用来初始化ProcessState的单例对象,其构造方法会调用
open_driver()
方法负责打开Binder即/dev/binder
设备,映射1M的内存,为进程间通信做准备。 -
接下来
defaultServiceManager()
会获取ServiceManager的Binder引用,为以后在ServiceManager中注册服务做准备。(参见守护进程的驱动者)
向Binder驱动注册服务
-
为每个服务做初始化,分别是调用各自的
::instantiate()
,可参考CameraService.cpp的实现。 -
在初始化过程中会向Binder服务注册各自服务的Binder对象,调用
addService()
。使用IServiceManager来实现,参见守护进程的驱动者
Binder消息处理阶段
-
通过
ProcessState::startThreadPool()
方法和ProcessState::spawnPooledThread(bool isMain)
方法开启线程。 -
IPCThreadState.joinThreadPool()
方法建立消息处理循环,它会调用PoolThread的threadLoop()方法为线程执行的入口方法。 -
当client呼叫
BpBinder
的IBinder的transact()
方法,该transact()则呼叫IPCThreadState
的transact()
方法。 -
transact()会调用
talkWithDriver()
,该方法会用到ProcessState初始化时打开的文件描述符和Binder驱动交互,该方法有两个职责:-
接收来自驱动的请求(即代理端请求)。发送和接收的数据都被写入binder_write_read数据结构,最终通过
ioctl()
方法调用与驱动交互。 - 必要时发送结果给驱动(回复代理端请求);
-
接收来自驱动的请求(即代理端请求)。发送和接收的数据都被写入binder_write_read数据结构,最终通过
-
处理请求的方法是
executeCommand()
,BR_TRANSACTION响应会读取请求数据,将Binder地址换成BBinder,并调用它的transact()
方法处理请求,同时BBinder的继承类会调用它的onTransact()
方法。当处理请求后会将结果写入reply并返回。 -
回到executeCommand()方法,如果当前处于同步模式,则会调用
sendReply()
方法,它先将结果打包进binder_transaction_data,再调用waitForResponse()
提交给驱动。 - 线程不断循环重复以上数据交换的流程直到退出循环。
要点总结
- ProcessState是对进程的抽象类,该类保证了在一个进程中对于设备节点/dev/binder只会被打开一次,它使用单例模式,对象通过ProcessState::self()获取。
- IPCThreadState是对处理线程的抽象类,该类保证了与驱动交互的行为只会在一个线程中被执行,它使用单例模式,对象通过IPCThreadState::self()获取。该类会使用ProcessState对象来取得驱动设备描述符。
- IPCThreadState的实现遵循了Binder协议,它将传输数据封装成binder_transaction_data结构。请求和应答都使用Parcel作为传输介质。(参见Parcel传输介质)
mediaserver.cpp
main
int main(int argc, char** argv) { sp<ProcessState> proc(ProcessState::self()); sp<IServiceManager> sm = defaultServiceManager(); AudioFlinger::instantiate(); MediaPlayerService::instantiate(); CameraService::instantiate(); AudioPolicyService::instantiate(); ProcessState::self()->startThreadPool(); IPCThreadState::self()->joinThreadPool(); }
CameraService.cpp
CameraService::instantiate
void CameraService::instantiate() { defaultServiceManager()->addService( String16("media.camera"), new CameraService()); }
ProcessState.cpp
ProcessState::ProcessState
-
该构造方法做打开驱动的初始化,并映射内存,初始化时调用
open_driver
方法。
ProcessState::ProcessState() : mDriverFD(open_driver()) , mVMStart(MAP_FAILED) , mManagesContexts(false) , mBinderContextCheckFunc(NULL) , mBinderContextUserData(NULL) , mThreadPoolStarted(false) , mThreadPoolSeq(1) { if (mDriverFD >= 0) { // XXX Ideally, there should be a specific define for whether we // have mmap (or whether we could possibly have the kernel module // availabla). #if !defined(HAVE_WIN32_IPC) // mmap the binder, providing a chunk of virtual address space to receive transactions. mVMStart = mmap(0, BINDER_VM_SIZE, PROT_READ, MAP_PRIVATE | MAP_NORESERVE, mDriverFD, 0); if (mVMStart == MAP_FAILED) { // *sigh* close(mDriverFD); mDriverFD = -1; } #else mDriverFD = -1; #endif } if (mDriverFD < 0) { // Need to run without the driver, starting our own thread pool. } }
ProcessState::self
- 单例模式的类,用来构造ProcessState对象。
sp<ProcessState> ProcessState::self() { if (gProcess != NULL) return gProcess; AutoMutex _l(gProcessMutex); if (gProcess == NULL) gProcess = new ProcessState; return gProcess; }
open_driver
- 打开驱动设备,返回描述符。
// 全局变量gSingleProcess初始为false static int open_driver() { if (gSingleProcess) { return -1; } int fd = open("/dev/binder", O_RDWR); if (fd >= 0) { ...... } else { } return fd; }
startThreadPool
void ProcessState::startThreadPool() { AutoMutex _l(mLock); if (!mThreadPoolStarted) { mThreadPoolStarted = true; spawnPooledThread(true); } }
spawnPooledThread
void ProcessState::spawnPooledThread(bool isMain) { if (mThreadPoolStarted) { int32_t s = android_atomic_add(1, &mThreadPoolSeq); char buf[32]; sprintf(buf, "Binder Thread #%d", s); sp<Thread> t = new PoolThread(isMain); t->run(buf); } }
getContextObject 方法1
sp<IBinder> ProcessState::getContextObject(const sp<IBinder>& caller) { if (supportsProcesses()) { // 根据0句柄查找ServiceManager return getStrongProxyForHandle(0); } else { return getContextObject(String16("default"), caller); } }
getContextObject 方法2
sp<IBinder> ProcessState::getContextObject(const String16& name, const sp<IBinder>& caller) { mLock.lock(); sp<IBinder> object( mContexts.indexOfKey(name) >= 0 ? mContexts.valueFor(name) : NULL); mLock.unlock(); if (object != NULL) return object; // Don't attempt to retrieve contexts if we manage them if (mManagesContexts) { return NULL; } // 取得IPC通信渠道 IPCThreadState* ipc = IPCThreadState::self(); { // 封装Binder的请求数据 Parcel data, reply; data.writeString16(name); data.writeStrongBinder(caller); // 向0号引用的ServiceManager发送请求 status_t result = ipc->transact(0 /*magic*/, 0, data, &reply, 0); if (result == NO_ERROR) { object = reply.readStrongBinder(); } } ipc->flushCommands(); if (object != NULL) setContextObject(object, name); return object; }
getStrongProxyForHandle
sp<IBinder> ProcessState::getStrongProxyForHandle(int32_t handle) { sp<IBinder> result; AutoMutex _l(mLock); handle_entry* e = lookupHandleLocked(handle); if (e != NULL) { // We need to create a new BpBinder if there isn't currently one, OR we // are unable to acquire a weak reference on this current one. See comment // in getWeakProxyForHandle() for more info about this. IBinder* b = e->binder; if (b == NULL || !e->refs->attemptIncWeak(this)) { b = new BpBinder(handle); e->binder = b; if (b) e->refs = b->getWeakRefs(); result = b; } else { // This little bit of nastyness is to allow us to add a primary // reference to the remote proxy when this team doesn't have one // but another team is sending the handle to us. result.force_set(b); e->refs->decWeak(this); } } return result; }
ProcessState::getWeakProxyForHandle
wp<IBinder> ProcessState::getWeakProxyForHandle(int32_t handle) { wp<IBinder> result; AutoMutex _l(mLock); handle_entry* e = lookupHandleLocked(handle); if (e != NULL) { // We need to create a new BpBinder if there isn't currently one, OR we // are unable to acquire a weak reference on this current one. The // attemptIncWeak() is safe because we know the BpBinder destructor will always // call expungeHandle(), which acquires the same lock we are holding now. // We need to do this because there is a race condition between someone // releasing a reference on this BpBinder, and a new reference on its handle // arriving from the driver. IBinder* b = e->binder; if (b == NULL || !e->refs->attemptIncWeak(this)) { b = new BpBinder(handle); result = b; e->binder = b; if (b) e->refs = b->getWeakRefs(); } else { result = b; e->refs->decWeak(this); } } return result; }
PoolThread类
class PoolThread : public Thread { public: PoolThread(bool isMain) : mIsMain(isMain) { } protected: virtual bool threadLoop() { // 通知驱动进入Loop模式,线程的入口函数 IPCThreadState::self()->joinThreadPool(mIsMain); return false; } const bool mIsMain; };
IServiceManager.cpp
defaultServiceManager
- 该方法返回ServiceManager的Binder引用。
if (gDefaultServiceManager != NULL) return gDefaultServiceManager; { AutoMutex _l(gDefaultServiceManagerLock); if (gDefaultServiceManager == NULL) { gDefaultServiceManager = interface_cast<IServiceManager>( ProcessState::self()->getContextObject(NULL)); } } return gDefaultServiceManager;
IPCThreadState.cpp
IPCThreadState::IPCThreadState
IPCThreadState::IPCThreadState() : mProcess(ProcessState::self()) { pthread_setspecific(gTLS, this); clearCaller(); // 初始化接收和返回数据的Parcel对象 mIn.setDataCapacity(256); mOut.setDataCapacity(256); }
IPCThreadState::self
IPCThreadState* IPCThreadState::self() { if (gHaveTLS) { restart: const pthread_key_t k = gTLS; IPCThreadState* st = (IPCThreadState*)pthread_getspecific(k); if (st) return st; // 创建IPCThreadState对象 return new IPCThreadState; } if (gShutdown) return NULL; pthread_mutex_lock(&gTLSMutex); if (!gHaveTLS) { if (pthread_key_create(&gTLS, threadDestructor) != 0) { pthread_mutex_unlock(&gTLSMutex); return NULL; } gHaveTLS = true; } pthread_mutex_unlock(&gTLSMutex); goto restart; }
joinThreadPool
void IPCThreadState::joinThreadPool(bool isMain) { // 通知驱动进入BC_ENTER_LOOPER模式 mOut.writeInt32(isMain ? BC_ENTER_LOOPER : BC_REGISTER_LOOPER); // 进入消息接收循环 status_t result; do { int32_t cmd; // When we've cleared the incoming command queue, process any pending derefs if (mIn.dataPosition() >= mIn.dataSize()) { size_t numPending = mPendingWeakDerefs.size(); if (numPending > 0) { for (size_t i = 0; i < numPending; i++) { RefBase::weakref_type* refs = mPendingWeakDerefs[i]; refs->decWeak(mProcess.get()); } mPendingWeakDerefs.clear(); } numPending = mPendingStrongDerefs.size(); if (numPending > 0) { for (size_t i = 0; i < numPending; i++) { BBinder* obj = mPendingStrongDerefs[i]; obj->decStrong(mProcess.get()); } mPendingStrongDerefs.clear(); } } // 与Binder驱动进行交互 result = talkWithDriver(); if (result >= NO_ERROR) { size_t IN = mIn.dataAvail(); if (IN < sizeof(int32_t)) continue; cmd = mIn.readInt32(); // 根据驱动的返回结果做出相应的动作 result = executeCommand(cmd); } // After executing the command, ensure that the thread is returned to the // default cgroup and priority before rejoining the pool. This is a failsafe // in case the command implementation failed to properly restore the thread's // scheduling parameters upon completion. int my_id; #ifdef HAVE_GETTID my_id = gettid(); #else my_id = getpid(); #endif if (!set_sched_policy(my_id, SP_FOREGROUND)) { // success; reset the priority as well setpriority(PRIO_PROCESS, my_id, ANDROID_PRIORITY_NORMAL); } // Let this thread exit the thread pool if it is no longer // needed and it is not the main process thread. if(result == TIMED_OUT && !isMain) { break; } } while (result != -ECONNREFUSED && result != -EBADF); mOut.writeInt32(BC_EXIT_LOOPER); talkWithDriver(false); }
talkWithDriver
- 该方法将mIn、mOut数据写入binder_write_read结构,并通过ioctl与驱动交互。参见Binder通信协议。
status_t IPCThreadState::talkWithDriver(bool doReceive) { // Binder驱动ioctl需要binder_write_read结构作为参数 binder_write_read bwr; // 从mIn.dataSize判断是否需要读驱动的数据 const bool needRead = mIn.dataPosition() >= mIn.dataSize(); // We don't want to write anything if we are still reading // from data left in the input buffer and the caller // has requested to read the next data. const size_t outAvail = (!doReceive || needRead) ? mOut.dataSize() : 0; bwr.write_size = outAvail; bwr.write_buffer = (long unsigned int)mOut.data(); // This is what we'll read. if (doReceive && needRead) { bwr.read_size = mIn.dataCapacity(); bwr.read_buffer = (long unsigned int)mIn.data(); } else { bwr.read_size = 0; } // Return immediately if there is nothing to do. if ((bwr.write_size == 0) && (bwr.read_size == 0)) return NO_ERROR; bwr.write_consumed = 0; bwr.read_consumed = 0; status_t err; do { #if defined(HAVE_ANDROID_OS) // 将BINDER_WRITE_READ结构数据写入驱动 if (ioctl(mProcess->mDriverFD, BINDER_WRITE_READ, &bwr) >= 0) err = NO_ERROR; else err = -errno; #else err = INVALID_OPERATION; #endif } while (err == -EINTR); if (err >= NO_ERROR) { if (bwr.write_consumed > 0) { if (bwr.write_consumed < (ssize_t)mOut.dataSize()) mOut.remove(0, bwr.write_consumed); else mOut.setDataSize(0); } if (bwr.read_consumed > 0) { mIn.setDataSize(bwr.read_consumed); mIn.setDataPosition(0); } return NO_ERROR; } return err; }
transact
- 该方法首先将数据封装成binder_transaction_data结构,再判断异步或同步数据处理。
status_t IPCThreadState::transact(int32_t handle, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags) { status_t err = data.errorCheck(); flags |= TF_ACCEPT_FDS; if (err == NO_ERROR) { // 将数据封装成binder_transaction_data数据包结构 err = writeTransactionData(BC_TRANSACTION, flags, handle, code, data, NULL); } if (err != NO_ERROR) { if (reply) reply->setError(err); return (mLastError = err); } // TF_ONE_WAY表示异步处理 if ((flags & TF_ONE_WAY) == 0) { // 同步模式,则等待驱动应答 if (reply) { err = waitForResponse(reply); } else { Parcel fakeReply; err = waitForResponse(&fakeReply); } } else { // 异步模式,则传递NULL作为参数 err = waitForResponse(NULL, NULL); } return err; }
writeTransactionData
- 首先将数据封装进binder_transaction_data结构中,再将该结构的数据写入mOut,下次与驱动交互时再将该数据写入驱动。
status_t IPCThreadState::writeTransactionData(int32_t cmd, uint32_t binderFlags, int32_t handle, uint32_t code, const Parcel& data, status_t* statusBuffer) { binder_transaction_data tr; tr.target.handle = handle; tr.code = code; tr.flags = binderFlags; const status_t err = data.errorCheck(); if (err == NO_ERROR) { tr.data_size = data.ipcDataSize(); tr.data.ptr.buffer = data.ipcData(); tr.offsets_size = data.ipcObjectsCount()*sizeof(size_t); tr.data.ptr.offsets = data.ipcObjects(); } else if (statusBuffer) { tr.flags |= TF_STATUS_CODE; *statusBuffer = err; tr.data_size = sizeof(status_t); tr.data.ptr.buffer = statusBuffer; tr.offsets_size = 0; tr.data.ptr.offsets = NULL; } else { return (mLastError = err); } mOut.writeInt32(cmd); mOut.write(&tr, sizeof(tr)); return NO_ERROR; }
waitForResponse
status_t IPCThreadState::waitForResponse(Parcel *reply, status_t *acquireResult) { int32_t cmd; int32_t err; while (1) { // 调用了talkWithDriver()与驱动交互 if ((err=talkWithDriver()) < NO_ERROR) break; err = mIn.errorCheck(); if (err < NO_ERROR) break; if (mIn.dataAvail() == 0) continue; cmd = mIn.readInt32(); switch (cmd) { case BR_TRANSACTION_COMPLETE: if (!reply && !acquireResult) goto finish; break; case BR_DEAD_REPLY: err = DEAD_OBJECT; goto finish; case BR_FAILED_REPLY: err = FAILED_TRANSACTION; goto finish; case BR_ACQUIRE_RESULT: { const int32_t result = mIn.readInt32(); if (!acquireResult) continue; *acquireResult = result ? NO_ERROR : INVALID_OPERATION; } goto finish; case BR_REPLY: { binder_transaction_data tr; err = mIn.read(&tr, sizeof(tr)); if (err != NO_ERROR) goto finish; if (reply) { if ((tr.flags & TF_STATUS_CODE) == 0) { reply->ipcSetDataReference( reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer), tr.data_size, reinterpret_cast<const size_t*>(tr.data.ptr.offsets), tr.offsets_size/sizeof(size_t), freeBuffer, this); } else { err = *static_cast<const status_t*>(tr.data.ptr.buffer); freeBuffer(NULL, reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer), tr.data_size, reinterpret_cast<const size_t*>(tr.data.ptr.offsets), tr.offsets_size/sizeof(size_t), this); } } else { freeBuffer(NULL, reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer), tr.data_size, reinterpret_cast<const size_t*>(tr.data.ptr.offsets), tr.offsets_size/sizeof(size_t), this); continue; } } goto finish; default: err = executeCommand(cmd); if (err != NO_ERROR) goto finish; break; } } finish: if (err != NO_ERROR) { if (acquireResult) *acquireResult = err; if (reply) reply->setError(err); mLastError = err; } return err; }
executeCommand
- 该方法执行具体的Binder相关命令操作。
-
重点看BR_TRANSACTION部分,该类将读出的Binder数据转换成
BBinder
,并调用它的transact()方法。最终将调用BBinder的继承类Bn*Service的onTransact()
方法。
status_t IPCThreadState::executeCommand(int32_t cmd) { BBinder* obj; RefBase::weakref_type* refs; status_t result = NO_ERROR; switch (cmd) { case BR_ERROR: result = mIn.readInt32(); break; case BR_OK: break; case BR_ACQUIRE: refs = (RefBase::weakref_type*)mIn.readInt32(); obj = (BBinder*)mIn.readInt32(); obj->incStrong(mProcess.get()); mOut.writeInt32(BC_ACQUIRE_DONE); mOut.writeInt32((int32_t)refs); mOut.writeInt32((int32_t)obj); break; case BR_RELEASE: refs = (RefBase::weakref_type*)mIn.readInt32(); obj = (BBinder*)mIn.readInt32(); mPendingStrongDerefs.push(obj); break; case BR_INCREFS: refs = (RefBase::weakref_type*)mIn.readInt32(); obj = (BBinder*)mIn.readInt32(); refs->incWeak(mProcess.get()); mOut.writeInt32(BC_INCREFS_DONE); mOut.writeInt32((int32_t)refs); mOut.writeInt32((int32_t)obj); break; case BR_DECREFS: refs = (RefBase::weakref_type*)mIn.readInt32(); obj = (BBinder*)mIn.readInt32(); mPendingWeakDerefs.push(refs); break; case BR_ATTEMPT_ACQUIRE: refs = (RefBase::weakref_type*)mIn.readInt32(); obj = (BBinder*)mIn.readInt32(); { const bool success = refs->attemptIncStrong(mProcess.get()); mOut.writeInt32(BC_ACQUIRE_RESULT); mOut.writeInt32((int32_t)success); } break; case BR_TRANSACTION: { binder_transaction_data tr; result = mIn.read(&tr, sizeof(tr)); if (result != NO_ERROR) break; Parcel buffer; buffer.ipcSetDataReference( reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer), tr.data_size, reinterpret_cast<const size_t*>(tr.data.ptr.offsets), tr.offsets_size/sizeof(size_t), freeBuffer, this); const pid_t origPid = mCallingPid; const uid_t origUid = mCallingUid; mCallingPid = tr.sender_pid; mCallingUid = tr.sender_euid; Parcel reply; if (tr.target.ptr) { // 将Binder的数据转换成BBinder,并调用它的transact方法,Bn*类即是继承于BBinder sp<BBinder> b((BBinder*)tr.cookie); const status_t error = b->transact(tr.code, buffer, &reply, 0); if (error < NO_ERROR) reply.setError(error); } else { const status_t error = the_context_object->transact(tr.code, buffer, &reply, 0); if (error < NO_ERROR) reply.setError(error); } // 如果是同步模式,则返回结果 if ((tr.flags & TF_ONE_WAY) == 0) { sendReply(reply, 0); } else { } mCallingPid = origPid; mCallingUid = origUid; } break; case BR_DEAD_BINDER: { BpBinder *proxy = (BpBinder*)mIn.readInt32(); proxy->sendObituary(); mOut.writeInt32(BC_DEAD_BINDER_DONE); mOut.writeInt32((int32_t)proxy); } break; case BR_CLEAR_DEATH_NOTIFICATION_DONE: { BpBinder *proxy = (BpBinder*)mIn.readInt32(); proxy->getWeakRefs()->decWeak(proxy); } break; case BR_FINISHED: result = TIMED_OUT; break; case BR_NOOP: break; case BR_SPAWN_LOOPER: mProcess->spawnPooledThread(false); break; default: printf("*** BAD COMMAND %d received from Binder driver\n", cmd); result = UNKNOWN_ERROR; break; } if (result != NO_ERROR) { mLastError = result; } return result; }
incStrongHandle
void IPCThreadState::incStrongHandle(int32_t handle) { mOut.writeInt32(BC_ACQUIRE); mOut.writeInt32(handle); }
decStrongHandle
void IPCThreadState::decStrongHandle(int32_t handle) { mOut.writeInt32(BC_RELEASE); mOut.writeInt32(handle); }
incWeakHandle
void IPCThreadState::incWeakHandle(int32_t handle) { mOut.writeInt32(BC_INCREFS); mOut.writeInt32(handle); }
decWeakHandle
void IPCThreadState::decWeakHandle(int32_t handle) { mOut.writeInt32(BC_DECREFS); mOut.writeInt32(handle); }